diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e933c180f..3b421564bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase - [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and querier Store API Series, which were leaking on errors. - [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks. +- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block. ### Added diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 3e3007daf4..65aa469227 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -63,7 +63,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { Default("2GB").Bytes() maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", - "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). + "Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() @@ -296,7 +296,7 @@ func runStore( indexCache, queriesGate, chunkPoolSizeBytes, - maxSampleCount, + store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. verbose, blockSyncConcurrency, filterConf, diff --git a/docs/components/store.md b/docs/components/store.md index 7bb7700356..132da27f1c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -93,11 +93,14 @@ Flags: memory. --store.grpc.series-sample-limit=0 Maximum amount of samples returned via a single - Series call. 0 means no limit. NOTE: For - efficiency we take 120 as the number of samples - in chunk (it cannot be bigger than that), so - the actual number of samples might be lower, - even though the maximum could be hit. + Series call. The Series call fails if this + limit is exceeded. 0 means no limit. NOTE: For + efficiency the limit is internally implemented + as 'chunks limit' considering each chunk + contains 120 samples (it's the max number of + samples each chunk can contain), so the actual + number of samples might be lower, even though + the maximum could be hit. --store.grpc.series-max-concurrency=20 Maximum number of concurrent Series calls. --objstore.config-file= diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 01bafe582d..d49dc51a3d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -54,13 +54,13 @@ import ( ) const ( - // maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed + // MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed // for precalculating the number of samples that we may have to retrieve and decode for any given query // without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know // where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way // because you barely get any improvements in compression when the number of samples is beyond this. // Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. - maxSamplesPerChunk = 120 + MaxSamplesPerChunk = 120 maxChunkSize = 16000 maxSeriesSize = 64 * 1024 @@ -240,9 +240,9 @@ type BucketStore struct { // Query gate which limits the maximum amount of concurrent queries. queryGate gate.Gate - // samplesLimiter limits the number of samples per each Series() call. - samplesLimiter SampleLimiter - partitioner partitioner + // chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call. + chunksLimiterFactory ChunksLimiterFactory + partitioner partitioner filterConfig *FilterConfig advLabelSets []storepb.LabelSet @@ -269,7 +269,7 @@ func NewBucketStore( indexCache storecache.IndexCache, queryGate gate.Gate, maxChunkPoolBytes uint64, - maxSampleCount uint64, + chunksLimiterFactory ChunksLimiterFactory, debugLogging bool, blockSyncConcurrency int, filterConfig *FilterConfig, @@ -287,7 +287,6 @@ func NewBucketStore( return nil, errors.Wrap(err, "create chunk pool") } - metrics := newBucketStoreMetrics(reg) s := &BucketStore{ logger: logger, bkt: bkt, @@ -301,14 +300,14 @@ func NewBucketStore( blockSyncConcurrency: blockSyncConcurrency, filterConfig: filterConfig, queryGate: queryGate, - samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), + chunksLimiterFactory: chunksLimiterFactory, partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, enablePostingsCompression: enablePostingsCompression, postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, + metrics: newBucketStoreMetrics(reg), } - s.metrics = metrics if err := os.MkdirAll(dir, 0777); err != nil { return nil, errors.Wrap(err, "create dir") @@ -649,7 +648,7 @@ func blockSeries( chunkr *bucketChunkReader, matchers []*labels.Matcher, req *storepb.SeriesRequest, - samplesLimiter SampleLimiter, + chunksLimiter ChunksLimiter, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(matchers) if err != nil { @@ -722,12 +721,16 @@ func blockSeries( s.refs = append(s.refs, meta.Ref) } if len(s.chks) > 0 { + if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil { + return nil, nil, errors.Wrap(err, "exceeded chunks limit") + } + res = append(res, s) } } // Preload all chunks that were marked in the previous stage. - if err := chunkr.preload(samplesLimiter); err != nil { + if err := chunkr.preload(); err != nil { return nil, nil, errors.Wrap(err, "preload chunks") } @@ -858,6 +861,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped) ) if req.Hints != nil { @@ -909,7 +913,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie chunkr, blockMatchers, req, - s.samplesLimiter, + chunksLimiter, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -1983,19 +1987,9 @@ func (r *bucketChunkReader) addPreload(id uint64) error { } // preload all added chunk IDs. Must be called before the first call to Chunk is made. -func (r *bucketChunkReader) preload(samplesLimiter SampleLimiter) error { +func (r *bucketChunkReader) preload() error { g, ctx := errgroup.WithContext(r.ctx) - numChunks := uint64(0) - for _, offsets := range r.preloads { - for range offsets { - numChunks++ - } - } - if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil { - return errors.Wrap(err, "exceeded samples limit") - } - for seq, offsets := range r.preloads { sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 1b25d6b9d8..f7847619fa 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" @@ -123,7 +124,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -161,7 +162,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m s.cache, nil, 0, - maxSampleCount, + NewChunksLimiterFactory(maxChunksLimit), false, 20, filterConf, @@ -504,7 +505,10 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { hourAfter := time.Now().Add(1 * time.Hour) filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter} - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 241, emptyRelabelConfig, &FilterConfig{ + // The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks. + expectedChunks := uint64(2 * 2) + + s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{ MinTime: minTimeDuration, MaxTime: filterMaxTime, }) @@ -543,3 +547,55 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { testutil.Equals(t, 1, len(s.Chunks)) } } + +func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { + // The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks. + expectedChunks := uint64(2 * 6) + + cases := map[string]struct { + maxChunksLimit uint64 + expectedErr string + }{ + "should succeed if the max chunks limit is not exceeded": { + maxChunksLimit: expectedChunks, + }, + "should fail if the max chunks limit is exceeded": { + maxChunksLimit: expectedChunks - 1, + expectedErr: "exceeded chunks limit", + }, + } + + for testName, testData := range cases { + t.Run(testName, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bkt := objstore.NewInMemBucket() + + dir, err := ioutil.TempDir("", "test_bucket_chunks_limiter_e2e") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf) + testutil.Ok(t, s.store.SyncBlocks(ctx)) + + req := &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + MinTime: minTimeDuration.PrometheusTimestamp(), + MaxTime: maxTimeDuration.PrometheusTimestamp(), + } + + s.cache.SwapWith(noopCache{}) + srv := newStoreSeriesServer(ctx) + err = s.store.Series(req, srv) + + if testData.expectedErr == "" { + testutil.Ok(t, err) + } else { + testutil.NotOk(t, err) + testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr)) + } + }) + } +} diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c8ad24b375..951b83f732 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -574,7 +574,7 @@ func TestBucketStore_Info(t *testing.T) { noopCache{}, nil, 2e5, - 0, + NewChunksLimiterFactory(0), false, 20, allowAllFilterConf, @@ -823,7 +823,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul noopCache{}, nil, 0, - 0, + NewChunksLimiterFactory(0), false, 20, allowAllFilterConf, @@ -1248,8 +1248,8 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}}, }, - queryGate: noopGate{}, - samplesLimiter: noopLimiter{}, + queryGate: noopGate{}, + chunksLimiterFactory: NewChunksLimiterFactory(0), } for _, block := range blocks { @@ -1330,10 +1330,6 @@ type noopGate struct{} func (noopGate) Start(context.Context) error { return nil } func (noopGate) Done() {} -type noopLimiter struct{} - -func (noopLimiter) Check(uint64) error { return nil } - // Regression test against: https://github.com/thanos-io/thanos/issues/2147. func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { tmpDir, err := ioutil.TempDir("", "segfault-series") @@ -1456,8 +1452,8 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1.meta.ULID: b1, b2.meta.ULID: b2, }, - queryGate: noopGate{}, - samplesLimiter: noopLimiter{}, + queryGate: noopGate{}, + chunksLimiterFactory: NewChunksLimiterFactory(0), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -1571,7 +1567,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { indexCache, nil, 1000000, - 10000, + NewChunksLimiterFactory(10000/MaxSamplesPerChunk), false, 10, nil, @@ -1680,7 +1676,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { indexCache, nil, 1000000, - 10000, + NewChunksLimiterFactory(10000/MaxSamplesPerChunk), false, 10, nil, diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index 5c23752d73..1e354721c2 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -4,20 +4,32 @@ package store import ( + "sync" + "sync/atomic" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) -type SampleLimiter interface { - Check(num uint64) error +type ChunksLimiter interface { + // Reserve num chunks out of the total number of chunks enforced by the limiter. + // Returns an error if the limit has been exceeded. This function must be + // goroutine safe. + Reserve(num uint64) error } +// ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for +// projects depending on Thanos (eg. Cortex) which have dynamic limits. +type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter + // Limiter is a simple mechanism for checking if something has passed a certain threshold. type Limiter struct { - limit uint64 + limit uint64 + reserved uint64 - // Counter metric which we will increase if Check() fails. + // Counter metric which we will increase if limit is exceeded. failedCounter prometheus.Counter + failedOnce sync.Once } // NewLimiter returns a new limiter with a specified limit. 0 disables the limit. @@ -25,14 +37,23 @@ func NewLimiter(limit uint64, ctr prometheus.Counter) *Limiter { return &Limiter{limit: limit, failedCounter: ctr} } -// Check checks if the passed number exceeds the limits or not. -func (l *Limiter) Check(num uint64) error { +// Reserve implements ChunksLimiter. +func (l *Limiter) Reserve(num uint64) error { if l.limit == 0 { return nil } - if num > l.limit { - l.failedCounter.Inc() - return errors.Errorf("limit %v violated (got %v)", l.limit, num) + if reserved := atomic.AddUint64(&l.reserved, num); reserved > l.limit { + // We need to protect from the counter being incremented twice due to concurrency + // while calling Reserve(). + l.failedOnce.Do(l.failedCounter.Inc) + return errors.Errorf("limit %v violated (got %v)", l.limit, reserved) } return nil } + +// NewChunksLimiterFactory makes a new ChunksLimiterFactory with a static limit. +func NewChunksLimiterFactory(limit uint64) ChunksLimiterFactory { + return func(failedCounter prometheus.Counter) ChunksLimiter { + return NewLimiter(limit, failedCounter) + } +} diff --git a/pkg/store/limiter_test.go b/pkg/store/limiter_test.go new file mode 100644 index 0000000000..3e3fc677d4 --- /dev/null +++ b/pkg/store/limiter_test.go @@ -0,0 +1,30 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestLimiter(t *testing.T) { + c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + l := NewLimiter(10, c) + + testutil.Ok(t, l.Reserve(5)) + testutil.Equals(t, float64(0), prom_testutil.ToFloat64(c)) + + testutil.Ok(t, l.Reserve(5)) + testutil.Equals(t, float64(0), prom_testutil.ToFloat64(c)) + + testutil.NotOk(t, l.Reserve(1)) + testutil.Equals(t, float64(1), prom_testutil.ToFloat64(c)) + + testutil.NotOk(t, l.Reserve(2)) + testutil.Equals(t, float64(1), prom_testutil.ToFloat64(c)) +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index d6fa6710dc..6d5a737a90 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -239,7 +239,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_Series continue } - aggregatedChunks, err := p.chunkSamples(e, maxSamplesPerChunk) + aggregatedChunks, err := p.chunkSamples(e, MaxSamplesPerChunk) if err != nil { return err } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index ff229fb323..70454a39f5 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -127,7 +127,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer if !r.SkipChunks { // TODO(fabxc): An improvement over this trivial approach would be to directly // use the chunks provided by TSDB in the response. - c, err := s.encodeChunks(series.Iterator(), maxSamplesPerChunk) + c, err := s.encodeChunks(series.Iterator(), MaxSamplesPerChunk) if err != nil { return status.Errorf(codes.Internal, "encode chunk: %s", err) }