diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b1d6411be..cf3013e82d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3527](https://github.com/thanos-io/thanos/pull/3527) Query Frontend: Fix query_range behavior when start/end times are the same - [#3560](https://github.com/thanos-io/thanos/pull/3560) query-frontend: Allow separate label cache - [#3672](https://github.com/thanos-io/thanos/pull/3672) rule: prevent rule crash from no such host error when using `dnssrv+` or `dnssrvnoa+`. +- [#3760](https://github.com/thanos-io/thanos/pull/3760) Store: Fix panic caused by a race condition happening on concurrent index-header reader usage and unload, when `--store.enable-index-header-lazy-reader` is enabled. ### Changed diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index e9b9dc20bd..0598e46f49 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -23,6 +23,10 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" ) +var ( + errNotIdle = errors.New("the reader is not idle") +) + // LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader. type LazyBinaryReaderMetrics struct { loadCount prometheus.Counter @@ -133,7 +137,8 @@ func (r *LazyBinaryReader) Close() error { defer r.onClosed(r) } - return r.unload() + // Unload without checking if idle. + return r.unloadIfIdleSince(0) } // IndexVersion implements Reader. @@ -245,19 +250,22 @@ func (r *LazyBinaryReader) load() error { return nil } -// unload closes underlying BinaryReader. Calling this function on a already unloaded reader is a no-op. -func (r *LazyBinaryReader) unload() error { - // Always update the used timestamp so that the pool will not call unload() again until the next - // idle timeout is hit. - r.usedAt.Store(time.Now().UnixNano()) - +// unloadIfIdleSince closes underlying BinaryReader if the reader is idle since given time (as unix nano). If idleSince is 0, +// the check on the last usage is skipped. Calling this function on a already unloaded reader is a no-op. +func (r *LazyBinaryReader) unloadIfIdleSince(ts int64) error { r.readerMx.Lock() defer r.readerMx.Unlock() + // Nothing to do if already unloaded. if r.reader == nil { return nil } + // Do not unloadIfIdleSince if not idle. + if ts > 0 && r.usedAt.Load() > ts { + return errNotIdle + } + r.metrics.unloadCount.Inc() if err := r.reader.Close(); err != nil { r.metrics.unloadFailedCount.Inc() @@ -268,6 +276,16 @@ func (r *LazyBinaryReader) unload() error { return nil } -func (r *LazyBinaryReader) lastUsedAt() int64 { - return r.usedAt.Load() +// isIdleSince returns true if the reader is idle since given time (as unix nano). +func (r *LazyBinaryReader) isIdleSince(ts int64) bool { + if r.usedAt.Load() > ts { + return false + } + + // A reader can be considered idle only if it's loaded. + r.readerMx.RLock() + loaded := r.reader != nil + r.readerMx.RUnlock() + + return loaded } diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index 07b8883cfd..0b130199b6 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/go-kit/kit/log" "github.com/oklog/ulid" @@ -168,3 +169,51 @@ func TestLazyBinaryReader_ShouldReopenOnUsageAfterClose(t *testing.T) { testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) } } + +func TestLazyBinaryReader_unload_ShouldReturnErrorIfNotIdle(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()))) + + m := NewLazyBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, nil) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload but not idle since enough time. + testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + + // Try to unload and idle since enough time. + testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) +} diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index 660ae4853a..93f1fd88b3 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/objstore" @@ -98,29 +99,22 @@ func (p *ReaderPool) Close() { } func (p *ReaderPool) closeIdleReaders() { - for _, r := range p.getIdleReaders() { - // Closing an already closed reader is a no-op, so we close it and just update - // the last timestamp on success. If it will be still be idle the next time this - // function is called, we'll try to close it again and will just be a no-op. - // - // Due to concurrency, the current implementation may close a reader which was - // use between when the list of idle readers has been computed and now. This is - // an edge case we're willing to accept, to not further complicate the logic. - if err := r.unload(); err != nil { + idleTimeoutAgo := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano() + + for _, r := range p.getIdleReadersSince(idleTimeoutAgo) { + if err := r.unloadIfIdleSince(idleTimeoutAgo); err != nil && !errors.Is(err, errNotIdle) { level.Warn(p.logger).Log("msg", "failed to close idle index-header reader", "err", err) } } } -func (p *ReaderPool) getIdleReaders() []*LazyBinaryReader { +func (p *ReaderPool) getIdleReadersSince(ts int64) []*LazyBinaryReader { p.lazyReadersMx.Lock() defer p.lazyReadersMx.Unlock() var idle []*LazyBinaryReader - threshold := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano() - for r := range p.lazyReaders { - if r.lastUsedAt() < threshold { + if r.isIdleSince(ts) { idle = append(idle, r) } }