Skip to content

Commit

Permalink
Returning custom grpc code when series/chunk limits are reached
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed Mar 10, 2021
1 parent 84c73dc commit 9d32ef7
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 11 deletions.
6 changes: 5 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
err = g.Wait()
})
if err != nil {
return status.Error(codes.Aborted, err.Error())
code := codes.Aborted
if s, ok := status.FromError(errors.Cause(err)); ok {
code = s.Code()
}
return status.Error(code, err.Error())
}
stats.blocksQueried = len(res)
stats.getAllDuration = time.Since(begin)
Expand Down
74 changes: 64 additions & 10 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/gogo/status"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc/codes"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -45,6 +50,20 @@ type swappableCache struct {
ptr storecache.IndexCache
}

type customLimiter struct {
limiter *Limiter
code codes.Code
}

func (c *customLimiter) Reserve(num uint64) error {
err := c.limiter.Reserve(num)
if err != nil {
return httpgrpc.Errorf(int(c.code), err.Error())
}

return nil
}

func (c *swappableCache) SwapWith(ptr2 storecache.IndexCache) {
c.ptr = ptr2
}
Expand Down Expand Up @@ -113,7 +132,25 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
return
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func newCustomChunksLimiterFactory(limit uint64, code codes.Code) ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) ChunksLimiter {
return &customLimiter{
limiter: NewLimiter(limit, failedCounter),
code: code,
}
}
}

func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) SeriesLimiter {
return &customLimiter{
limiter: NewLimiter(limit, failedCounter),
code: code,
}
}
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand Down Expand Up @@ -151,8 +188,8 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
s.cache,
nil,
nil,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
chunksLimiterFactory,
seriesLimiterFactory,
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
20,
Expand Down Expand Up @@ -425,7 +462,7 @@ func TestBucketStore_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -480,7 +517,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand Down Expand Up @@ -508,7 +545,7 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
Expand Down Expand Up @@ -554,14 +591,28 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

cases := map[string]struct {
maxChunksLimit uint64
maxSeriesLimit uint64
expectedErr string
code codes.Code
}{
"should succeed if the max chunks limit is not exceeded": {
maxChunksLimit: expectedChunks,
},
"should fail if the max chunks limit is exceeded": {
"should fail if the max chunks limit is exceeded - ResourceExhausted": {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
code: codes.ResourceExhausted,
},
"should fail if the max chunks limit is exceeded - 422": {
maxChunksLimit: expectedChunks - 1,
expectedErr: "exceeded chunks limit",
code: 422,
},
"should fail if the max series limit is exceeded - 422": {
maxChunksLimit: expectedChunks,
expectedErr: "exceeded series limit",
maxSeriesLimit: 1,
code: 422,
},
}

Expand All @@ -575,7 +626,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand All @@ -595,6 +646,9 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
} else {
testutil.NotOk(t, err)
testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr))
status, ok := status.FromError(err)
testutil.Equals(t, true, ok)
testutil.Equals(t, testData.code, status.Code())
}
})
}
Expand All @@ -609,7 +663,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
Expand Down Expand Up @@ -642,7 +696,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
Expand Down

0 comments on commit 9d32ef7

Please sign in to comment.