From c20fdaf25adba0f8ac83a7a863d43c4cbc4ec3d3 Mon Sep 17 00:00:00 2001 From: Alibi Shalgymbay Date: Mon, 12 Feb 2024 19:24:31 +0600 Subject: [PATCH] add index header idle delete timeout Signed-off-by: Alibi Shalgymbay --- CHANGELOG.md | 2 + cmd/thanos/store.go | 71 +++---- pkg/block/indexheader/lazy_binary_reader.go | 70 +++++++ .../indexheader/lazy_binary_reader_test.go | 174 ++++++++++++++++++ pkg/block/indexheader/reader_pool.go | 65 +++++-- pkg/block/indexheader/reader_pool_test.go | 72 +++++++- pkg/store/acceptance_test.go | 1 + pkg/store/bucket.go | 3 +- pkg/store/bucket_e2e_test.go | 1 + pkg/store/bucket_test.go | 12 +- 10 files changed, 420 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ede881b983f..ed6a4e5c23f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#7118](https://github.com/thanos-io/thanos/pull/7118) Store Gateway: Added `--store.index-header-lazy-reader-idle-delete-timeout` to periodically delete idle index header files, when lazy reader is enabled and idle timeout for unloading is set > 0. + ### Changed - [#7123](https://github.com/thanos-io/thanos/pull/7123) Rule: Change default Alertmanager API version to v2. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9ad3960ff5b..b8986a269d7 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -57,39 +57,40 @@ const ( ) type storeConfig struct { - indexCacheConfigs extflag.PathOrContent - objStoreConfig extflag.PathOrContent - dataDir string - cacheIndexHeader bool - grpcConfig grpcConfig - httpConfig httpConfig - indexCacheSizeBytes units.Base2Bytes - chunkPoolSize units.Base2Bytes - estimatedMaxSeriesSize uint64 - estimatedMaxChunkSize uint64 - seriesBatchSize int - storeRateLimits store.SeriesSelectLimits - maxDownloadedBytes units.Base2Bytes - maxConcurrency int - component component.StoreAPI - debugLogging bool - syncInterval time.Duration - blockSyncConcurrency int - blockMetaFetchConcurrency int - filterConf *store.FilterConfig - selectorRelabelConf extflag.PathOrContent - advertiseCompatibilityLabel bool - consistencyDelay commonmodel.Duration - ignoreDeletionMarksDelay commonmodel.Duration - disableWeb bool - webConfig webConfig - label string - postingOffsetsInMemSampling int - cachingBucketConfig extflag.PathOrContent - reqLogConfig *extflag.PathOrContent - lazyIndexReaderEnabled bool - lazyIndexReaderIdleTimeout time.Duration - lazyExpandedPostingsEnabled bool + indexCacheConfigs extflag.PathOrContent + objStoreConfig extflag.PathOrContent + dataDir string + cacheIndexHeader bool + grpcConfig grpcConfig + httpConfig httpConfig + indexCacheSizeBytes units.Base2Bytes + chunkPoolSize units.Base2Bytes + estimatedMaxSeriesSize uint64 + estimatedMaxChunkSize uint64 + seriesBatchSize int + storeRateLimits store.SeriesSelectLimits + maxDownloadedBytes units.Base2Bytes + maxConcurrency int + component component.StoreAPI + debugLogging bool + syncInterval time.Duration + blockSyncConcurrency int + blockMetaFetchConcurrency int + filterConf *store.FilterConfig + selectorRelabelConf extflag.PathOrContent + advertiseCompatibilityLabel bool + consistencyDelay commonmodel.Duration + ignoreDeletionMarksDelay commonmodel.Duration + disableWeb bool + webConfig webConfig + label string + postingOffsetsInMemSampling int + cachingBucketConfig extflag.PathOrContent + reqLogConfig *extflag.PathOrContent + lazyIndexReaderEnabled bool + lazyIndexReaderIdleTimeout time.Duration + lazyExpandedPostingsEnabled bool + lazyIndexReaderIdleDeleteTimeout time.Duration indexHeaderLazyDownloadStrategy string } @@ -186,6 +187,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout) + cmd.Flag("store.index-header-lazy-reader-idle-delete-timeout", "If index-header lazy reader is enabled, index-header-lazy-reader-idle-timeout is > 0 and this idle timeout is > 0, index header files will be automatically deleted after 'idle timeout' inactivity"). + Hidden().Default("24h").DurationVar(&sc.lazyIndexReaderIdleDeleteTimeout) + cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) @@ -418,6 +422,7 @@ func runStore( false, conf.lazyIndexReaderEnabled, conf.lazyIndexReaderIdleTimeout, + conf.lazyIndexReaderIdleDeleteTimeout, options..., ) if err != nil { diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 2b36bf80259..87cb8fa5623 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -26,6 +26,8 @@ import ( var ( errNotIdle = errors.New("the reader is not idle") errUnloadedWhileLoading = errors.New("the index-header has been concurrently unloaded") + errNotIdleForDelete = errors.New("the reader is not idle for delete") + errLoadedForDelete = errors.New("the reader is loaded for delete") ) // LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader. @@ -34,6 +36,8 @@ type LazyBinaryReaderMetrics struct { loadFailedCount prometheus.Counter unloadCount prometheus.Counter unloadFailedCount prometheus.Counter + deleteCount prometheus.Counter + deleteFailedCount prometheus.Counter loadDuration prometheus.Histogram } @@ -56,6 +60,14 @@ func NewLazyBinaryReaderMetrics(reg prometheus.Registerer) *LazyBinaryReaderMetr Name: "indexheader_lazy_unload_failed_total", Help: "Total number of failed index-header lazy unload operations.", }), + deleteCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "indexheader_lazy_delete_total", + Help: "Total number of index-header lazy delete operations.", + }), + deleteFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "indexheader_lazy_delete_failed_total", + Help: "Total number of failed index-header lazy delete operations.", + }), loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "indexheader_lazy_load_duration_seconds", Help: "Duration of the index-header lazy loading in seconds.", @@ -319,3 +331,61 @@ func (r *LazyBinaryReader) isIdleSince(ts int64) bool { return loaded } + +// deleteIfIdleSince deletes index header file if the reader is idle since given time (as unix nano). if idleSince is 0, +// the check on the last usage is skipped. Removal of index header file occurs only if reader is unloaded. +// Calling this function on an already deleted index header file is a no-op. +func (r *LazyBinaryReader) deleteIfIdleSince(ts int64) error { + // Nothing to do if reader is loaded. + r.readerMx.RLock() + loaded := r.reader != nil + r.readerMx.RUnlock() + + if loaded { + return errLoadedForDelete + } + + if ts > 0 && r.usedAt.Load() > ts { + return errNotIdleForDelete + } + + indexHeaderFile := filepath.Join(r.dir, r.id.String(), block.IndexHeaderFilename) + + // Nothing to do if already deleted. + if _, err := os.Stat(indexHeaderFile); os.IsNotExist(err) { + return errors.Wrap(err, "read index header") + } + + r.metrics.deleteCount.Inc() + if err := os.Remove(indexHeaderFile); err != nil { + r.metrics.deleteFailedCount.Inc() + return errors.Wrap(err, "remove index header") + } + + return nil +} + +// isIdleForDeleteSince returns true if the reader is idle since given time (as unix nano), unloaded +// and index header file is present. +func (r *LazyBinaryReader) isIdleForDeleteSince(ts int64) bool { + if r.usedAt.Load() > ts { + return false + } + + // A reader can be considered idle for delete only if it's unloaded. + r.readerMx.RLock() + loaded := r.reader != nil + r.readerMx.RUnlock() + + if loaded { + return false + } + + // A reader can be considered idle for delete only if it's present. + indexHeaderFile := filepath.Join(r.dir, r.id.String(), block.IndexHeaderFilename) + if _, err := os.Stat(indexHeaderFile); os.IsNotExist(err) { + return false + } + + return true +} diff --git a/pkg/block/indexheader/lazy_binary_reader_test.go b/pkg/block/indexheader/lazy_binary_reader_test.go index d740da99abd..223ce8b4449 100644 --- a/pkg/block/indexheader/lazy_binary_reader_test.go +++ b/pkg/block/indexheader/lazy_binary_reader_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" + "github.com/pkg/errors" promtestutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore/providers/filesystem" @@ -320,3 +321,176 @@ func TestLazyBinaryReader_LoadUnloadRaceCondition(t *testing.T) { }) } } + +func TestLazyBinaryReader_delete_ShouldReturnErrorIfNotIdle(t *testing.T) { + ctx := context.Background() + + tmpDir := t.TempDir() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + + indexHeaderFile := filepath.Join(r.dir, r.id.String(), block.IndexHeaderFilename) + + // Should lazy load the index upon first usage. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount)) + // Index header file is present + f, err := os.Stat(indexHeaderFile) + testutil.Ok(t, err) + testutil.Assert(t, f != nil) + + // Try to unload (not enough time) and delete (not enough time). + testutil.Equals(t, errNotIdle, r.unloadIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, errLoadedForDelete, r.deleteIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount)) + // Index header file is present + f, err = os.Stat(indexHeaderFile) + testutil.Ok(t, err) + testutil.Assert(t, f != nil) + + // Try to unload (enough time) and delete (not enough time). + testutil.Ok(t, r.unloadIfIdleSince(time.Now().UnixNano())) + testutil.Equals(t, errNotIdleForDelete, r.deleteIfIdleSince(time.Now().Add(-time.Minute).UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount)) + // Index header file is present + f, err = os.Stat(indexHeaderFile) + testutil.Ok(t, err) + testutil.Assert(t, f != nil) + + // Try to delete (enough time). + testutil.Ok(t, r.deleteIfIdleSince(time.Now().UnixNano())) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.loadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.unloadFailedCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(m.deleteCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(m.deleteFailedCount)) + // Index header file is present + f, err = os.Stat(indexHeaderFile) + testutil.NotOk(t, err) + testutil.Assert(t, f == nil) + }) + } +} + +func TestLazyBinaryReader_LoadUnloadDeleteRaceCondition(t *testing.T) { + // Run the test for a fixed amount of time. + const runDuration = 5 * time.Second + + ctx := context.Background() + + tmpDir := t.TempDir() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + + for _, lazyDownload := range []bool{false, true} { + t.Run(fmt.Sprintf("lazyDownload=%v", lazyDownload), func(t *testing.T) { + m := NewLazyBinaryReaderMetrics(nil) + bm := NewBinaryReaderMetrics(nil) + r, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, m, bm, nil, lazyDownload) + testutil.Ok(t, err) + testutil.Assert(t, r.reader == nil) + t.Cleanup(func() { + testutil.Ok(t, r.Close()) + }) + + done := make(chan struct{}) + time.AfterFunc(runDuration, func() { close(done) }) + wg := sync.WaitGroup{} + wg.Add(3) + + // Start a goroutine which continuously try to unload the reader. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + testutil.Ok(t, r.unloadIfIdleSince(0)) + } + } + }() + + // Start a goroutine which continuously try to delete the index header. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + err := r.deleteIfIdleSince(0) + testutil.Assert(t, err == nil || errors.Is(err, errLoadedForDelete) || errors.Is(err, os.ErrNotExist)) + } + } + }() + + // Try to read multiple times, while the other goroutines continuously try to unload and delete it. + go func() { + defer wg.Done() + + for { + select { + case <-done: + return + default: + _, err := r.PostingsOffset("a", "1") + testutil.Assert(t, err == nil || errors.Is(err, errUnloadedWhileLoading)) + + } + } + }() + + // Wait until all goroutines have done. + wg.Wait() + }) + } +} diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index e9fe5eb7dca..2970a0210c2 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -37,10 +37,11 @@ func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics { // and automatically close them once the idle timeout is reached. A closed lazy reader // will be automatically re-opened upon next usage. type ReaderPool struct { - lazyReaderEnabled bool - lazyReaderIdleTimeout time.Duration - logger log.Logger - metrics *ReaderPoolMetrics + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + lazyReaderIdleDeleteTimeout time.Duration + logger log.Logger + metrics *ReaderPoolMetrics // Channel used to signal once the pool is closing. close chan struct{} @@ -88,15 +89,16 @@ func AlwaysLazyDownloadIndexHeader(meta *metadata.Meta) bool { } // NewReaderPool makes a new ReaderPool. -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics, lazyDownloadFunc LazyDownloadIndexHeaderFunc) *ReaderPool { +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout, lazyReaderIdleDeleteTimeout time.Duration, metrics *ReaderPoolMetrics, lazyDownloadFunc LazyDownloadIndexHeaderFunc) *ReaderPool { p := &ReaderPool{ - logger: logger, - metrics: metrics, - lazyReaderEnabled: lazyReaderEnabled, - lazyReaderIdleTimeout: lazyReaderIdleTimeout, - lazyReaders: make(map[*LazyBinaryReader]struct{}), - close: make(chan struct{}), - lazyDownloadFunc: lazyDownloadFunc, + logger: logger, + metrics: metrics, + lazyReaderEnabled: lazyReaderEnabled, + lazyReaderIdleTimeout: lazyReaderIdleTimeout, + lazyReaderIdleDeleteTimeout: lazyReaderIdleDeleteTimeout, + lazyReaders: make(map[*LazyBinaryReader]struct{}), + close: make(chan struct{}), + lazyDownloadFunc: lazyDownloadFunc, } // Start a goroutine to close idle readers (only if required). @@ -113,6 +115,21 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime } } }() + + if p.lazyReaderIdleDeleteTimeout > 0 { + deleteCheckFreq := p.lazyReaderIdleDeleteTimeout / 10 + + go func() { + for { + select { + case <-p.close: + return + case <-time.After(deleteCheckFreq): + p.deleteIdleReaders() + } + } + }() + } } return p @@ -192,3 +209,27 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) { // be used anymore, so we can automatically remove it from the pool. delete(p.lazyReaders, r) } + +func (p *ReaderPool) deleteIdleReaders() { + idleTimeoutAgo := time.Now().Add(-p.lazyReaderIdleDeleteTimeout).UnixNano() + + for _, r := range p.getIdleReadersForDeleteSince(idleTimeoutAgo) { + if err := r.deleteIfIdleSince(idleTimeoutAgo); err != nil && !errors.Is(err, errNotIdleForDelete) { + level.Warn(p.logger).Log("msg", "failed to delete index-header file", "err", err) + } + } +} + +func (p *ReaderPool) getIdleReadersForDeleteSince(ts int64) []*LazyBinaryReader { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + var idle []*LazyBinaryReader + for r := range p.lazyReaders { + if r.isIdleForDeleteSince(ts) { + idle = append(idle, r) + } + } + + return idle +} diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index a7445f0fed2..10e49249c78 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -22,8 +22,9 @@ import ( func TestReaderPool_NewBinaryReader(t *testing.T) { tests := map[string]struct { - lazyReaderEnabled bool - lazyReaderIdleTimeout time.Duration + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + lazyReaderIdleDeleteTimeout time.Duration }{ "lazy reader is disabled": { lazyReaderEnabled: false, @@ -59,7 +60,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil), AlwaysEagerDownloadIndexHeader) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, testData.lazyReaderIdleDeleteTimeout, NewReaderPoolMetrics(nil), AlwaysEagerDownloadIndexHeader) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) @@ -96,7 +97,7 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { testutil.Ok(t, err) metrics := NewReaderPoolMetrics(nil) - pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics, AlwaysEagerDownloadIndexHeader) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, 0, metrics, AlwaysEagerDownloadIndexHeader) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) @@ -132,3 +133,66 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) } + +func TestReaderPool_ShouldDeleteIdleLazyReaders(t *testing.T) { + const idleTimeout = time.Second + + ctx := context.Background() + + tmpDir := t.TempDir() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block. + blockID, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) + + metrics := NewReaderPoolMetrics(nil) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, idleTimeout, metrics, AlwaysEagerDownloadIndexHeader) + defer pool.Close() + + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, r.Close()) }() + + // Ensure it can read data. + labelNames, err := r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.deleteCount)) + + // Wait enough time before checking it. + time.Sleep(idleTimeout * 2) + + // We expect the reader has been closed, but not released from the pool. + testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.deleteCount)) + + // Ensure it can still read data (will be re-opened). + labelNames, err = r.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, []string{"a"}, labelNames) + testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.deleteCount)) + + // We expect an explicit call to Close() to close the reader and release it from the pool too. + testutil.Ok(t, r.Close()) + testutil.Assert(t, !pool.isTracking(r.(*LazyBinaryReader))) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.deleteCount)) +} diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index be1a1179f1f..ba952e72327 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -921,6 +921,7 @@ func TestBucketStore_Acceptance(t *testing.T) { false, false, 1*time.Minute, + 1*time.Minute, WithChunkPool(chunkPool), WithFilterConfig(allowAllFilterConf), WithLazyExpandedPostings(lazyExpandedPostings), diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 20cf10e657d..fd04802d798 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -556,6 +556,7 @@ func NewBucketStore( enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. lazyIndexReaderEnabled bool, lazyIndexReaderIdleTimeout time.Duration, + lazyIndexReaderIdleDeleteTimeout time.Duration, options ...BucketStoreOption, ) (*BucketStore, error) { s := &BucketStore{ @@ -592,7 +593,7 @@ func NewBucketStore( // Depend on the options indexReaderPoolMetrics := indexheader.NewReaderPoolMetrics(extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics, s.indexHeaderLazyDownloadStrategy) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, lazyIndexReaderIdleDeleteTimeout, indexReaderPoolMetrics, s.indexHeaderLazyDownloadStrategy) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too if err := s.validate(); err != nil { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 02c182cd0b5..4a0efd7a4cc 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -176,6 +176,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, true, time.Minute, + time.Minute, WithLogger(s.logger), WithIndexCache(s.cache), WithFilterConfig(filterConf), diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index d38e14587d7..ca0d76a746d 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -660,6 +660,7 @@ func TestBucketStore_Info(t *testing.T) { false, false, 0, + 0, WithChunkPool(chunkPool), WithFilterConfig(allowAllFilterConf), ) @@ -904,6 +905,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul false, false, 0, + 0, WithLogger(logger), WithFilterConfig(allowAllFilterConf), ) @@ -1462,6 +1464,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, false, false, 0, + 0, WithLogger(logger), WithChunkPool(chunkPool), WithLazyExpandedPostings(lazyExpandedPostings), @@ -1693,7 +1696,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, 0, indexheader.NewReaderPoolMetrics(nil), indexheader.AlwaysEagerDownloadIndexHeader), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.FromStrings("ext1", "1").Hash(): {blocks: [][]*bucketBlock{{b1, b2}}}, @@ -1912,6 +1915,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { true, false, 0, + 0, WithLogger(logger), WithIndexCache(indexCache), ) @@ -2004,6 +2008,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { true, false, 0, + 0, WithLogger(logger), WithIndexCache(indexCache), ) @@ -2163,6 +2168,7 @@ func TestSeries_SeriesSortedWithoutReplicaLabels(t *testing.T) { true, false, 0, + 0, WithLogger(logger), WithIndexCache(indexCache), ) @@ -2350,6 +2356,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb true, false, 0, + 0, WithLogger(logger), WithIndexCache(indexCache), ) @@ -2567,6 +2574,7 @@ func TestSeries_ChunksHaveHashRepresentation(t *testing.T) { true, false, 0, + 0, WithLogger(logger), WithIndexCache(indexCache), ) @@ -3542,6 +3550,7 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { false, false, 1*time.Minute, + 1*time.Minute, WithChunkPool(chunkPool), WithFilterConfig(allowAllFilterConf), ) @@ -3760,6 +3769,7 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { false, false, 1*time.Minute, + 1*time.Minute, WithChunkPool(chunkPool), WithFilterConfig(allowAllFilterConf), WithLazyExpandedPostings(true),