Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic on concurrent index-header lazy reader usage and unload #3760

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -36,6 +36,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#3527](https://github.com/thanos-io/thanos/pull/3527) Query Frontend: Fix query_range behavior when start/end times are the same
- [#3560](https://github.com/thanos-io/thanos/pull/3560) query-frontend: Allow separate label cache
- [#3672](https://github.com/thanos-io/thanos/pull/3672) rule: prevent rule crash from no such host error when using `dnssrv+` or `dnssrvnoa+`.
- [#3760](https://github.com/thanos-io/thanos/pull/3760) Store: Fix panic caused by a race condition happening on concurrent index-header reader usage and unload, when `--store.enable-index-header-lazy-reader` is enabled.

### Changed

Expand Down
36 changes: 27 additions & 9 deletions pkg/block/indexheader/lazy_binary_reader.go
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
)

var (
errNotIdle = errors.New("the reader is not idle")
)

// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader.
type LazyBinaryReaderMetrics struct {
loadCount prometheus.Counter
Expand Down Expand Up @@ -133,7 +137,8 @@ func (r *LazyBinaryReader) Close() error {
defer r.onClosed(r)
}

return r.unload()
// Unload without checking if idle.
return r.unloadIfIdleSince(0)
}

// IndexVersion implements Reader.
Expand Down Expand Up @@ -245,19 +250,22 @@ func (r *LazyBinaryReader) load() error {
return nil
}

// unload closes underlying BinaryReader. Calling this function on a already unloaded reader is a no-op.
func (r *LazyBinaryReader) unload() error {
// Always update the used timestamp so that the pool will not call unload() again until the next
// idle timeout is hit.
r.usedAt.Store(time.Now().UnixNano())

// unloadIfIdleSince closes underlying BinaryReader if the reader is idle since given time (as unix nano). If idleSince is 0,
// the check on the last usage is skipped. Calling this function on a already unloaded reader is a no-op.
func (r *LazyBinaryReader) unloadIfIdleSince(ts int64) error {
r.readerMx.Lock()
defer r.readerMx.Unlock()

// Nothing to do if already unloaded.
if r.reader == nil {
return nil
}

// Do not unloadIfIdleSince if not idle.
if ts > 0 && r.usedAt.Load() > ts {
return errNotIdle
}

r.metrics.unloadCount.Inc()
if err := r.reader.Close(); err != nil {
r.metrics.unloadFailedCount.Inc()
Expand All @@ -268,6 +276,16 @@ func (r *LazyBinaryReader) unload() error {
return nil
}

func (r *LazyBinaryReader) lastUsedAt() int64 {
return r.usedAt.Load()
// isIdleSince returns true if the reader is idle since given time (as unix nano).
func (r *LazyBinaryReader) isIdleSince(ts int64) bool {
if r.usedAt.Load() > ts {
return false
}

// A reader can be considered idle only if it's loaded.
r.readerMx.RLock()
loaded := r.reader != nil
r.readerMx.RUnlock()

return loaded
}
49 changes: 49 additions & 0 deletions pkg/block/indexheader/lazy_binary_reader_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -168,3 +169,51 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) {
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
}
}

func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) {
ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-indexheader")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

// Create block.
blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String())))

m := NewLazyBinaryReaderMetrics(nil)
r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil)
testutil.Ok(t, err)
testutil.Assert(t, r.reader == nil)

// Should lazy load the index upon first usage.
labelNames, err := r.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, []string{"a"}, labelNames)
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))

// Try to unload but not idle since enough time.
testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano()))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))

// Try to unload and idle since enough time.
testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano()))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount))
}
20 changes: 7 additions & 13 deletions pkg/block/indexheader/reader_pool.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -98,29 +99,22 @@ func (p *ReaderPool) Close() {
}

func (p *ReaderPool) closeIdleReaders() {
for _, r := range p.getIdleReaders() {
// Closing an already closed reader is a no-op, so we close it and just update
// the last timestamp on success. If it will be still be idle the next time this
// function is called, we'll try to close it again and will just be a no-op.
//
// Due to concurrency, the current implementation may close a reader which was
// use between when the list of idle readers has been computed and now. This is
// an edge case we're willing to accept, to not further complicate the logic.
if err := r.unload(); err != nil {
idleTimeoutAgo := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano()

for _, r := range p.getIdleReadersSince(idleTimeoutAgo) {
if err := r.unloadIfIdleSince(idleTimeoutAgo); err != nil && !errors.Is(err, errNotIdle) {
level.Warn(p.logger).Log("msg", "failed to close idle index-header reader", "err", err)
}
}
}

func (p *ReaderPool) getIdleReaders() []*LazyBinaryReader {
func (p *ReaderPool) getIdleReadersSince(ts int64) []*LazyBinaryReader {
p.lazyReadersMx.Lock()
defer p.lazyReadersMx.Unlock()

var idle []*LazyBinaryReader
threshold := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano()

for r := range p.lazyReaders {
if r.lastUsedAt() < threshold {
if r.isIdleSince(ts) {
idle = append(idle, r)
}
}
Expand Down