diff --git a/.dep-finished b/.dep-finished deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/CHANGELOG.md b/CHANGELOG.md index cab5a36852..e2a7706202 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts. - [#1881](https://github.com/thanos-io/thanos/pull/1881) Store Gateway: memcached support for index cache. See [documentation](docs/components/store.md/#index-cache) for further information. +- [#1904](https://github.com/thanos-io/thanos/pull/1904) Add a skip-chunks option in Store Series API to improve the response time of `/api/v1/series` endpoint. ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 9dd9a9ec83..17ebc07321 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -293,7 +293,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { span, ctx := tracing.StartSpan(ctx, "promql_instant_query") defer span.Finish() - qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts) + qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts) if err != nil { return nil, nil, &ApiError{errorBadData, err} } @@ -386,7 +386,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { defer span.Finish() qry, err := api.queryEngine.NewRangeQuery( - api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), + api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), start, end, @@ -426,7 +426,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } @@ -503,7 +503,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) { } // TODO(bwplotka): Support downsampling? - q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse). + q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, true). Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &ApiError{errorExec, err} @@ -607,7 +607,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, nil, &ApiError{errorExec, err} } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 4263d81a36..a8294b00fa 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -19,11 +19,11 @@ import ( // replicaLabels at query time. // maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy. -type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable +type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator { - return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable { + return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable { return &queryable{ logger: logger, replicaLabels: replicaLabels, @@ -31,6 +31,7 @@ func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) Queryable deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, partialResponse: partialResponse, + skipChunks: skipChunks, } } } @@ -42,11 +43,12 @@ type queryable struct { deduplicate bool maxResolutionMillis int64 partialResponse bool + skipChunks bool } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.skipChunks), nil } type querier struct { @@ -59,6 +61,7 @@ type querier struct { deduplicate bool maxResolutionMillis int64 partialResponse bool + skipChunks bool } // newQuerier creates implementation of storage.Querier that fetches data from the proxy @@ -72,6 +75,7 @@ func newQuerier( deduplicate bool, maxResolutionMillis int64, partialResponse bool, + skipChunks bool, ) *querier { if logger == nil { logger = log.NewNopLogger() @@ -93,6 +97,7 @@ func newQuerier( deduplicate: deduplicate, maxResolutionMillis: maxResolutionMillis, partialResponse: partialResponse, + skipChunks: skipChunks, } } @@ -185,6 +190,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s MaxResolutionWindow: q.maxResolutionMillis, Aggregates: queryAggrs, PartialResponseDisabled: !q.partialResponse, + SkipChunks: q.skipChunks, }, resp); err != nil { return nil, nil, errors.Wrap(err, "proxy Series()") } diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index bf84218c3e..57ea33bea9 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -28,7 +28,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { queryableCreator := NewQueryableCreator(nil, testProxy) oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) - queryable := queryableCreator(false, nil, oneHourMillis, false) + queryable := queryableCreator(false, nil, oneHourMillis, false, false) q, err := queryable.Querier(context.Background(), 0, 42) testutil.Ok(t, err) @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) { }, } - q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false) + q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false, false) engine := promql.NewEngine( promql.EngineOpts{ @@ -176,7 +176,7 @@ func TestQuerier_Series(t *testing.T) { // Querier clamps the range to [1,300], which should drop some samples of the result above. // The store API allows endpoints to send more data then initially requested. - q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true) + q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true, false) defer func() { testutil.Ok(t, q.Close()) }() res, _, err := q.Select(&storage.SelectParams{}) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ddb9edb114..6926da769e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -946,11 +946,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie for set.Next() { var series storepb.Series - series.Labels, series.Chunks = set.At() - stats.mergedSeriesCount++ - stats.mergedChunksCount += len(series.Chunks) - s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) + + if req.SkipChunks { + series.Labels, _ = set.At() + } else { + series.Labels, series.Chunks = set.At() + + stats.mergedChunksCount += len(series.Chunks) + s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) + } if err := srv.Send(storepb.NewSeriesResponse(&series)); err != nil { return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index d860887b3a..d7623e0f84 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -173,8 +173,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { // TODO(bwplotka): Add those test cases to TSDB querier_test.go as well, there are no tests for matching. for i, tcase := range []struct { - req *storepb.SeriesRequest - expected [][]storepb.Label + req *storepb.SeriesRequest + expected [][]storepb.Label + expectedChunkLen int }{ { req: &storepb.SeriesRequest{ @@ -184,6 +185,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -203,6 +205,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -218,6 +221,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -233,6 +237,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -252,6 +257,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -271,6 +277,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -286,6 +293,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, @@ -309,6 +317,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -324,6 +333,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MinTime: mint, MaxTime: maxt, }, + expectedChunkLen: 3, expected: [][]storepb.Label{ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, @@ -347,6 +357,24 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MaxTime: maxt, }, }, + // Test no-chunk option. + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + MinTime: mint, + MaxTime: maxt, + SkipChunks: true, + }, + expectedChunkLen: 0, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, } { t.Log("Run ", i) @@ -357,7 +385,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { for i, s := range srv.SeriesSet { testutil.Equals(t, tcase.expected[i], s.Labels) - testutil.Equals(t, 3, len(s.Chunks)) + testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks)) } } } diff --git a/pkg/store/matchers.go b/pkg/store/matchers.go index d9e4c4065b..66902f5116 100644 --- a/pkg/store/matchers.go +++ b/pkg/store/matchers.go @@ -33,3 +33,21 @@ func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err er } return res, nil } + +// matchersToString converts label matchers to string format. +func matchersToString(ms []storepb.LabelMatcher) (string, error) { + var res string + matchers, err := translateMatchers(ms) + if err != nil { + return "", err + } + + for i, m := range matchers { + res += m.String() + if i < len(matchers)-1 { + res += ", " + } + } + + return "{" + res + "}", nil +} diff --git a/pkg/store/matchers_test.go b/pkg/store/matchers_test.go new file mode 100644 index 0000000000..75caf1791b --- /dev/null +++ b/pkg/store/matchers_test.go @@ -0,0 +1,86 @@ +package store + +import ( + "testing" + + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMatchersToString(t *testing.T) { + cases := []struct { + ms []storepb.LabelMatcher + expected string + }{ + { + ms: []storepb.LabelMatcher{ + { + Name: "__name__", + Type: storepb.LabelMatcher_EQ, + Value: "up", + }}, + expected: `{__name__="up"}`, + }, + { + ms: []storepb.LabelMatcher{ + { + Name: "__name__", + Type: storepb.LabelMatcher_NEQ, + Value: "up", + }, + { + Name: "job", + Type: storepb.LabelMatcher_EQ, + Value: "test", + }, + }, + expected: `{__name__!="up", job="test"}`, + }, + { + ms: []storepb.LabelMatcher{ + { + Name: "__name__", + Type: storepb.LabelMatcher_EQ, + Value: "up", + }, + { + Name: "job", + Type: storepb.LabelMatcher_RE, + Value: "test", + }, + }, + expected: `{__name__="up", job=~"test"}`, + }, + { + ms: []storepb.LabelMatcher{ + { + Name: "job", + Type: storepb.LabelMatcher_NRE, + Value: "test", + }}, + expected: `{job!~"test"}`, + }, + { + ms: []storepb.LabelMatcher{ + { + Name: "__name__", + Type: storepb.LabelMatcher_EQ, + Value: "up", + }, + { + Name: "__name__", + Type: storepb.LabelMatcher_NEQ, + Value: "up", + }, + }, + // We cannot use up{__name__!="up"} in this case. + expected: `{__name__="up", __name__!="up"}`, + }, + } + + for i, c := range cases { + actual, err := matchersToString(c.ms) + testutil.Ok(t, err) + testutil.Assert(t, actual == c.expected, "test case %d failed, expected %s, actual %s", i, c.expected, actual) + } +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 442dc7b770..4012eceb9c 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -19,7 +19,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/common/version" "github.com/prometheus/prometheus/pkg/labels" @@ -59,6 +59,8 @@ type PrometheusStore struct { const ( initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size. + + SUCCESS = "success" ) // NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client @@ -150,7 +152,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie } if len(newMatchers) == 0 { - return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) + return status.Error(codes.InvalidArgument, "no matchers specified (excluding external labels)") } // Don't ask for more than available time. This includes potential `minTime` flag limit. @@ -159,6 +161,32 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie r.MinTime = availableMinTime } + if r.SkipChunks { + labelMaps, err := p.seriesLabels(s.Context(), newMatchers, r.MinTime, r.MaxTime) + if err != nil { + return err + } + for _, lbm := range labelMaps { + lset := make([]storepb.Label, 0, len(lbm)+len(externalLabels)) + for k, v := range lbm { + lset = append(lset, storepb.Label{Name: k, Value: v}) + } + for _, l := range externalLabels { + lset = append(lset, storepb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Slice(lset, func(i, j int) bool { + return lset[i].Name < lset[j].Name + }) + if err = s.Send(storepb.NewSeriesResponse(&storepb.Series{Labels: lset})); err != nil { + return err + } + } + return nil + } + q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} for _, m := range newMatchers { @@ -427,7 +455,7 @@ func (p *PrometheusStore) startPromSeries(ctx context.Context, q *prompb.Query) return presp, nil } -// matchesExternalLabels filters out external labels matching from matcher if exsits as the local storage does not have them. +// matchesExternalLabels filters out external labels matching from matcher if exists as the local storage does not have them. // It also returns false if given matchers are not matching external labels. func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []storepb.LabelMatcher, error) { if len(externalLabels) == 0 { @@ -474,7 +502,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin } // translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally -// attaches the given labels to it, overwriting existing ones on colllision. +// attaches the given labels to it, overwriting existing ones on collision. func (p *PrometheusStore) translateAndExtendLabels(m []prompb.Label, extend labels.Labels) []storepb.Label { lset := make([]storepb.Label, 0, len(m)+len(extend)) @@ -527,7 +555,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label names request body") if resp.StatusCode/100 != 2 { - return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) + return nil, status.Errorf(codes.Internal, "request Prometheus server failed, code %s", resp.Status) } if resp.StatusCode == http.StatusNoContent { @@ -549,7 +577,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR return nil, status.Error(codes.Internal, err.Error()) } - if m.Status != "success" { + if m.Status != SUCCESS { code, exists := statusToCode[resp.StatusCode] if !exists { return nil, status.Error(codes.Internal, m.Error) @@ -588,7 +616,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label values request body") if resp.StatusCode/100 != 2 { - return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) + return nil, status.Errorf(codes.Internal, "request Prometheus server failed, code %s", resp.Status) } if resp.StatusCode == http.StatusNoContent { @@ -611,7 +639,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue sort.Strings(m.Data) - if m.Status != "success" { + if m.Status != SUCCESS { code, exists := statusToCode[resp.StatusCode] if !exists { return nil, status.Error(codes.Internal, m.Error) @@ -621,3 +649,68 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue return &storepb.LabelValuesResponse{Values: m.Data}, nil } + +// seriesLabels returns the labels from Prometheus series API. +func (p *PrometheusStore) seriesLabels(ctx context.Context, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]map[string]string, error) { + u := *p.base + u.Path = path.Join(u.Path, "/api/v1/series") + q := u.Query() + + metric, err := matchersToString(matchers) + if err != nil { + return nil, errors.Wrap(err, "invalid matchers") + } + + q.Add("match[]", metric) + u.RawQuery = q.Encode() + q.Add("start", string(startTime)) + q.Add("end", string(endTime)) + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + req.Header.Set("User-Agent", userAgent) + + span, ctx := tracing.StartSpan(ctx, "/prom_series HTTP[client]") + defer span.Finish() + + resp, err := p.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "series request body") + + if resp.StatusCode/100 != 2 { + return nil, status.Errorf(codes.Internal, "request Prometheus server failed, code %s", resp.Status) + } + + if resp.StatusCode == http.StatusNoContent { + return nil, nil + } + + var m struct { + Data []map[string]string `json:"data"` + Status string `json:"status"` + Error string `json:"error"` + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + if err = json.Unmarshal(body, &m); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + if m.Status != SUCCESS { + code, exists := statusToCode[resp.StatusCode] + if !exists { + return nil, status.Error(codes.Internal, m.Error) + } + return nil, status.Error(code, m.Error) + } + + return m.Data, nil +} diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index bb44ab12d7..5db3ee75cc 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -139,6 +139,26 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.NotOk(t, err) testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error()) } + // Set no-chunk option to only get the series labels. + { + srv := newStoreSeriesServer(ctx) + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ + MinTime: 0, + MaxTime: baseT + 300, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, + SkipChunks: true, + }, srv)) + testutil.Equals(t, 1, len(srv.SeriesSet)) + + testutil.Equals(t, []storepb.Label{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + }, srv.SeriesSet[0].Labels) + + testutil.Equals(t, 0, len(srv.SeriesSet[0].Chunks)) + } } type sample struct { @@ -160,6 +180,86 @@ func getExternalLabels() labels.Labels { {Name: "ext_b", Value: "a"}} } +func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { + t.Helper() + + defer leaktest.CheckTimeout(t, 10*time.Second)() + + p, err := testutil.NewPrometheus() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() + + baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 + + a := p.Appender() + _, err = a.Add(labels.FromStrings("a", "b"), baseT+100, 1) + testutil.Ok(t, err) + _, err = a.Add(labels.FromStrings("a", "c", "job", "test"), baseT+200, 2) + testutil.Ok(t, err) + _, err = a.Add(labels.FromStrings("a", "d", "job", "test"), baseT+300, 3) + testutil.Ok(t, err) + testutil.Ok(t, a.Commit()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testutil.Ok(t, p.Start()) + + u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) + testutil.Ok(t, err) + + limitMinT := int64(0) + proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar, + func() labels.Labels { return labels.FromStrings("region", "eu-west") }, + func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter. + testutil.Ok(t, err) + + { + res, err := proxy.seriesLabels(ctx, []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, baseT, baseT+300) + testutil.Ok(t, err) + + testutil.Equals(t, len(res), 1) + testutil.Equals(t, labels.FromMap(res[0]), labels.Labels{{Name: "a", Value: "b"}}) + } + { + res, err := proxy.seriesLabels(ctx, []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "job", Value: "foo"}, + }, baseT, baseT+300) + testutil.Ok(t, err) + + testutil.Equals(t, len(res), 0) + } + { + res, err := proxy.seriesLabels(ctx, []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "b"}, + {Type: storepb.LabelMatcher_EQ, Name: "job", Value: "test"}, + }, baseT, baseT+300) + testutil.Ok(t, err) + + // Get two metrics {a="c", job="test"} and {a="d", job="test"}. + testutil.Equals(t, len(res), 2) + for _, r := range res { + testutil.Equals(t, labels.FromMap(r).Has("a"), true) + testutil.Equals(t, labels.FromMap(r).Get("job"), "test") + } + } + { + res, err := proxy.seriesLabels(ctx, []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "job", Value: "test"}, + }, baseT, baseT+300) + testutil.Ok(t, err) + + // Get two metrics. + testutil.Equals(t, len(res), 2) + for _, r := range res { + testutil.Equals(t, labels.FromMap(r).Has("a"), true) + testutil.Equals(t, labels.FromMap(r).Get("job"), "test") + } + } +} + func TestPrometheusStore_LabelValues_e2e(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 1817007da7..574c5e0e4b 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -211,6 +211,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe Matchers: newMatchers, Aggregates: r.Aggregates, MaxResolutionWindow: r.MaxResolutionWindow, + SkipChunks: r.SkipChunks, PartialResponseDisabled: r.PartialResponseDisabled, } wg = &sync.WaitGroup{} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index cc006a4a8e..d19bf58f9a 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -408,6 +408,32 @@ func TestProxyStore_Series(t *testing.T) { }, expectedErr: errors.New("fetch series for [name:\"ext\" value:\"1\" ] test: error!"), }, + { + title: "use no chunk to only get labels", + storeAPIs: []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a")), + }, + }, + minTime: 1, + maxTime: 300, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext", Value: "1"}}}}, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + SkipChunks: true, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "a"}}, + }, + }, + }, } { if ok := t.Run(tc.title, func(t *testing.T) { diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index a4a34bd29a..77b5b5fa03 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -26,7 +26,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type StoreType int32 @@ -262,6 +262,8 @@ type SeriesRequest struct { PartialResponseDisabled bool `protobuf:"varint,6,opt,name=partial_response_disabled,json=partialResponseDisabled,proto3" json:"partial_response_disabled,omitempty"` // TODO(bwplotka): Move Thanos components to use strategy instead. Including QueryAPI. PartialResponseStrategy PartialResponseStrategy `protobuf:"varint,7,opt,name=partial_response_strategy,json=partialResponseStrategy,proto3,enum=thanos.PartialResponseStrategy" json:"partial_response_strategy,omitempty"` + // skip_chunks controls whether sending chunks or not in series responses. + SkipChunks bool `protobuf:"varint,8,opt,name=skip_chunks,json=skipChunks,proto3" json:"skip_chunks,omitempty"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } @@ -344,10 +346,10 @@ type isSeriesResponse_Result interface { } type SeriesResponse_Series struct { - Series *Series `protobuf:"bytes,1,opt,name=series,proto3,oneof"` + Series *Series `protobuf:"bytes,1,opt,name=series,proto3,oneof" json:"series,omitempty"` } type SeriesResponse_Warning struct { - Warning string `protobuf:"bytes,2,opt,name=warning,proto3,oneof"` + Warning string `protobuf:"bytes,2,opt,name=warning,proto3,oneof" json:"warning,omitempty"` } func (*SeriesResponse_Series) isSeriesResponse_Result() {} @@ -374,76 +376,14 @@ func (m *SeriesResponse) GetWarning() string { return "" } -// XXX_OneofFuncs is for the internal use of the proto package. -func (*SeriesResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _SeriesResponse_OneofMarshaler, _SeriesResponse_OneofUnmarshaler, _SeriesResponse_OneofSizer, []interface{}{ +// XXX_OneofWrappers is for the internal use of the proto package. +func (*SeriesResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ (*SeriesResponse_Series)(nil), (*SeriesResponse_Warning)(nil), } } -func _SeriesResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*SeriesResponse) - // result - switch x := m.Result.(type) { - case *SeriesResponse_Series: - _ = b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.Series); err != nil { - return err - } - case *SeriesResponse_Warning: - _ = b.EncodeVarint(2<<3 | proto.WireBytes) - _ = b.EncodeStringBytes(x.Warning) - case nil: - default: - return fmt.Errorf("SeriesResponse.Result has unexpected type %T", x) - } - return nil -} - -func _SeriesResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*SeriesResponse) - switch tag { - case 1: // result.series - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(Series) - err := b.DecodeMessage(msg) - m.Result = &SeriesResponse_Series{msg} - return true, err - case 2: // result.warning - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - x, err := b.DecodeStringBytes() - m.Result = &SeriesResponse_Warning{x} - return true, err - default: - return false, nil - } -} - -func _SeriesResponse_OneofSizer(msg proto.Message) (n int) { - m := msg.(*SeriesResponse) - // result - switch x := m.Result.(type) { - case *SeriesResponse_Series: - s := proto.Size(x.Series) - n += 1 // tag and wire - n += proto.SizeVarint(uint64(s)) - n += s - case *SeriesResponse_Warning: - n += 1 // tag and wire - n += proto.SizeVarint(uint64(len(x.Warning))) - n += len(x.Warning) - case nil: - default: - panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) - } - return n -} - type LabelNamesRequest struct { PartialResponseDisabled bool `protobuf:"varint,1,opt,name=partial_response_disabled,json=partialResponseDisabled,proto3" json:"partial_response_disabled,omitempty"` // TODO(bwplotka): Move Thanos components to use strategy instead. Including QueryAPI. @@ -617,57 +557,58 @@ func init() { func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) } var fileDescriptor_77a6da22d6a3feb1 = []byte{ - // 791 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0xcd, 0x6e, 0xeb, 0x44, - 0x14, 0xf6, 0xd8, 0x89, 0x13, 0x9f, 0xdc, 0x46, 0xbe, 0xd3, 0xdc, 0x7b, 0x5d, 0x23, 0xe5, 0x46, - 0x91, 0x90, 0xa2, 0x82, 0x72, 0x21, 0x08, 0x10, 0xec, 0x92, 0xd4, 0x55, 0x23, 0xda, 0x04, 0x26, - 0x49, 0xc3, 0xcf, 0x22, 0x38, 0xed, 0xe0, 0x5a, 0x72, 0xec, 0xe0, 0x71, 0x68, 0xbb, 0xe5, 0x09, - 0x78, 0x10, 0xde, 0x82, 0x4d, 0x97, 0x5d, 0xc2, 0x06, 0x41, 0xfb, 0x12, 0x2c, 0x91, 0xc7, 0xe3, - 0x24, 0x86, 0xb6, 0xd2, 0x55, 0x76, 0x73, 0xbe, 0xef, 0xf8, 0x9c, 0xf9, 0xbe, 0x39, 0x33, 0x06, - 0x2d, 0x5c, 0x9c, 0x35, 0x17, 0x61, 0x10, 0x05, 0x58, 0x8d, 0x2e, 0x6c, 0x3f, 0x60, 0x66, 0x29, - 0xba, 0x5e, 0x50, 0x96, 0x80, 0x66, 0xc5, 0x09, 0x9c, 0x80, 0x2f, 0xdf, 0xc4, 0xab, 0x04, 0xad, - 0xef, 0x40, 0xa9, 0xe7, 0xff, 0x10, 0x10, 0xfa, 0xe3, 0x92, 0xb2, 0xa8, 0xfe, 0x07, 0x82, 0x67, - 0x49, 0xcc, 0x16, 0x81, 0xcf, 0x28, 0x7e, 0x0f, 0x54, 0xcf, 0x9e, 0x51, 0x8f, 0x19, 0xa8, 0xa6, - 0x34, 0x4a, 0xad, 0x9d, 0x66, 0x52, 0xbb, 0x79, 0x1c, 0xa3, 0x9d, 0xdc, 0xcd, 0x9f, 0xaf, 0x25, - 0x22, 0x52, 0xf0, 0x1e, 0x14, 0xe7, 0xae, 0x3f, 0x8d, 0xdc, 0x39, 0x35, 0xe4, 0x1a, 0x6a, 0x28, - 0xa4, 0x30, 0x77, 0xfd, 0x91, 0x3b, 0xa7, 0x9c, 0xb2, 0xaf, 0x12, 0x4a, 0x11, 0x94, 0x7d, 0xc5, - 0xa9, 0x37, 0xa0, 0xb1, 0x28, 0x08, 0xe9, 0xe8, 0x7a, 0x41, 0x8d, 0x5c, 0x0d, 0x35, 0xca, 0xad, - 0xe7, 0x69, 0x97, 0x61, 0x4a, 0x90, 0x75, 0x0e, 0xfe, 0x18, 0x80, 0x37, 0x9c, 0x32, 0x1a, 0x31, - 0x23, 0xcf, 0xf7, 0xa5, 0x67, 0xf6, 0x35, 0xa4, 0x91, 0xd8, 0x9a, 0xe6, 0x89, 0x98, 0xd5, 0x3f, - 0x85, 0x62, 0x4a, 0xbe, 0x95, 0xac, 0xfa, 0x3f, 0x32, 0xec, 0x0c, 0x69, 0xe8, 0x52, 0x26, 0x6c, - 0xca, 0x08, 0x45, 0x8f, 0x0b, 0x95, 0xb3, 0x42, 0x3f, 0x89, 0xa9, 0xe8, 0xec, 0x82, 0x86, 0xcc, - 0x50, 0x78, 0xdb, 0x4a, 0xa6, 0xed, 0x49, 0x42, 0x8a, 0xee, 0xab, 0x5c, 0xdc, 0x82, 0x17, 0x71, - 0xc9, 0x90, 0xb2, 0xc0, 0x5b, 0x46, 0x6e, 0xe0, 0x4f, 0x2f, 0x5d, 0xff, 0x3c, 0xb8, 0xe4, 0x66, - 0x29, 0x64, 0x77, 0x6e, 0x5f, 0x91, 0x15, 0x37, 0xe1, 0x14, 0x7e, 0x1f, 0xc0, 0x76, 0x9c, 0x90, - 0x3a, 0x76, 0x44, 0x13, 0x8f, 0xca, 0xad, 0x67, 0x69, 0xb7, 0xb6, 0xe3, 0x84, 0x64, 0x83, 0xc7, - 0x9f, 0xc3, 0xde, 0xc2, 0x0e, 0x23, 0xd7, 0xf6, 0xe2, 0x2e, 0xfc, 0xe4, 0xa7, 0xe7, 0x2e, 0xb3, - 0x67, 0x1e, 0x3d, 0x37, 0xd4, 0x1a, 0x6a, 0x14, 0xc9, 0x2b, 0x91, 0x90, 0x4e, 0xc6, 0x81, 0xa0, - 0xf1, 0x77, 0x0f, 0x7c, 0xcb, 0xa2, 0xd0, 0x8e, 0xa8, 0x73, 0x6d, 0x14, 0xf8, 0x71, 0xbe, 0x4e, - 0x1b, 0x7f, 0x99, 0xad, 0x31, 0x14, 0x69, 0xff, 0x2b, 0x9e, 0x12, 0xf5, 0xef, 0xa1, 0x9c, 0x3a, - 0x2f, 0x06, 0xb2, 0x01, 0x2a, 0xe3, 0x08, 0x37, 0xbe, 0xd4, 0x2a, 0xaf, 0x46, 0x85, 0xa3, 0x47, - 0x12, 0x11, 0x3c, 0x36, 0xa1, 0x70, 0x69, 0x87, 0xbe, 0xeb, 0x3b, 0xfc, 0x20, 0xb4, 0x23, 0x89, - 0xa4, 0x40, 0xa7, 0x08, 0x6a, 0x48, 0xd9, 0xd2, 0x8b, 0xea, 0xbf, 0x22, 0x78, 0xce, 0xdd, 0xef, - 0xdb, 0xf3, 0xf5, 0x01, 0x3f, 0x69, 0x08, 0xda, 0xc2, 0x10, 0x79, 0x4b, 0x43, 0x0e, 0x01, 0x6f, - 0xee, 0x56, 0x98, 0x52, 0x81, 0xbc, 0x1f, 0x03, 0x7c, 0x9a, 0x35, 0x92, 0x04, 0xd8, 0x84, 0xa2, - 0xd0, 0xcb, 0x0c, 0x99, 0x13, 0xab, 0xb8, 0xfe, 0x1b, 0x12, 0x85, 0x4e, 0x6d, 0x6f, 0xb9, 0xd6, - 0x5d, 0x81, 0x3c, 0x1f, 0x7a, 0xae, 0x51, 0x23, 0x49, 0xf0, 0xb4, 0x1b, 0xf2, 0x16, 0x6e, 0x28, - 0x5b, 0xba, 0xd1, 0x83, 0xdd, 0x8c, 0x08, 0x61, 0xc7, 0x4b, 0x50, 0x7f, 0xe2, 0x88, 0xf0, 0x43, - 0x44, 0x4f, 0x19, 0xb2, 0x4f, 0x40, 0x5b, 0x3d, 0x36, 0xb8, 0x04, 0x85, 0x71, 0xff, 0x8b, 0xfe, - 0x60, 0xd2, 0xd7, 0x25, 0xac, 0x41, 0xfe, 0xab, 0xb1, 0x45, 0xbe, 0xd1, 0x11, 0x2e, 0x42, 0x8e, - 0x8c, 0x8f, 0x2d, 0x5d, 0x8e, 0x33, 0x86, 0xbd, 0x03, 0xab, 0xdb, 0x26, 0xba, 0x12, 0x67, 0x0c, - 0x47, 0x03, 0x62, 0xe9, 0xb9, 0x18, 0x27, 0x56, 0xd7, 0xea, 0x9d, 0x5a, 0x7a, 0x7e, 0xbf, 0x09, - 0xaf, 0x1e, 0x91, 0x14, 0x57, 0x9a, 0xb4, 0x89, 0x28, 0xdf, 0xee, 0x0c, 0xc8, 0x48, 0x47, 0xfb, - 0x1d, 0xc8, 0xc5, 0x57, 0x13, 0x17, 0x40, 0x21, 0xed, 0x49, 0xc2, 0x75, 0x07, 0xe3, 0xfe, 0x48, - 0x47, 0x31, 0x36, 0x1c, 0x9f, 0xe8, 0x72, 0xbc, 0x38, 0xe9, 0xf5, 0x75, 0x85, 0x2f, 0xda, 0x5f, - 0x27, 0x3d, 0x79, 0x96, 0x45, 0xf4, 0x7c, 0xeb, 0x67, 0x19, 0xf2, 0x5c, 0x08, 0xfe, 0x10, 0x72, - 0xf1, 0x53, 0x8e, 0x77, 0x53, 0x7b, 0x37, 0x1e, 0x7a, 0xb3, 0x92, 0x05, 0x85, 0x71, 0x9f, 0x81, - 0x9a, 0x5c, 0x23, 0xfc, 0x22, 0x7b, 0xad, 0xd2, 0xcf, 0x5e, 0xfe, 0x17, 0x4e, 0x3e, 0xfc, 0x00, - 0xe1, 0x2e, 0xc0, 0x7a, 0x30, 0xf1, 0x5e, 0xe6, 0x61, 0xdb, 0xbc, 0x5a, 0xa6, 0xf9, 0x10, 0x25, - 0xfa, 0x1f, 0x42, 0x69, 0xe3, 0x3c, 0x71, 0x36, 0x35, 0x33, 0xa9, 0xe6, 0x3b, 0x0f, 0x72, 0x49, - 0x9d, 0xce, 0xbb, 0x37, 0x7f, 0x57, 0xa5, 0x9b, 0xbb, 0x2a, 0xba, 0xbd, 0xab, 0xa2, 0xbf, 0xee, - 0xaa, 0xe8, 0x97, 0xfb, 0xaa, 0x74, 0x7b, 0x5f, 0x95, 0x7e, 0xbf, 0xaf, 0x4a, 0xdf, 0x16, 0xf8, - 0xaf, 0x64, 0x31, 0x9b, 0xa9, 0xfc, 0x1f, 0xf8, 0xd1, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x5c, - 0x24, 0xf3, 0x5b, 0x3b, 0x07, 0x00, 0x00, + // 814 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x55, 0x4f, 0x6f, 0xe3, 0x44, + 0x14, 0xf7, 0xd8, 0x89, 0x13, 0xbf, 0x6c, 0x2b, 0xef, 0x34, 0xbb, 0xeb, 0x1a, 0x29, 0xad, 0x2c, + 0x21, 0x45, 0x05, 0x65, 0x21, 0x08, 0x10, 0xdc, 0x92, 0xac, 0x57, 0x1b, 0xb1, 0x4d, 0x60, 0x92, + 0x6c, 0xf8, 0x73, 0x08, 0x4e, 0x3b, 0xb8, 0xd6, 0x3a, 0xb6, 0xf1, 0x38, 0xb4, 0xbd, 0xf2, 0x09, + 0xb8, 0xf2, 0x1d, 0xf8, 0x16, 0x5c, 0x7a, 0xdc, 0x23, 0x5c, 0x10, 0xb4, 0x5f, 0x04, 0x79, 0x3c, + 0x4e, 0x62, 0x68, 0x2b, 0xad, 0x72, 0x9b, 0xf7, 0xfb, 0xbd, 0x79, 0x6f, 0xde, 0xef, 0xbd, 0x99, + 0x01, 0x2d, 0x8e, 0x4e, 0x5a, 0x51, 0x1c, 0x26, 0x21, 0x56, 0x93, 0x33, 0x27, 0x08, 0x99, 0x59, + 0x4b, 0x2e, 0x23, 0xca, 0x32, 0xd0, 0xac, 0xbb, 0xa1, 0x1b, 0xf2, 0xe5, 0xd3, 0x74, 0x95, 0xa1, + 0xd6, 0x0e, 0xd4, 0xfa, 0xc1, 0x0f, 0x21, 0xa1, 0x3f, 0x2e, 0x29, 0x4b, 0xac, 0x3f, 0x11, 0x3c, + 0xc8, 0x6c, 0x16, 0x85, 0x01, 0xa3, 0xf8, 0x3d, 0x50, 0x7d, 0x67, 0x4e, 0x7d, 0x66, 0xa0, 0x43, + 0xa5, 0x59, 0x6b, 0xef, 0xb4, 0xb2, 0xd8, 0xad, 0x97, 0x29, 0xda, 0x2d, 0x5d, 0xfd, 0x75, 0x20, + 0x11, 0xe1, 0x82, 0xf7, 0xa1, 0xba, 0xf0, 0x82, 0x59, 0xe2, 0x2d, 0xa8, 0x21, 0x1f, 0xa2, 0xa6, + 0x42, 0x2a, 0x0b, 0x2f, 0x18, 0x7b, 0x0b, 0xca, 0x29, 0xe7, 0x22, 0xa3, 0x14, 0x41, 0x39, 0x17, + 0x9c, 0x7a, 0x0a, 0x1a, 0x4b, 0xc2, 0x98, 0x8e, 0x2f, 0x23, 0x6a, 0x94, 0x0e, 0x51, 0x73, 0xb7, + 0xfd, 0x30, 0xcf, 0x32, 0xca, 0x09, 0xb2, 0xf6, 0xc1, 0x1f, 0x03, 0xf0, 0x84, 0x33, 0x46, 0x13, + 0x66, 0x94, 0xf9, 0xb9, 0xf4, 0xc2, 0xb9, 0x46, 0x34, 0x11, 0x47, 0xd3, 0x7c, 0x61, 0x33, 0xeb, + 0x53, 0xa8, 0xe6, 0xe4, 0x5b, 0x95, 0x65, 0xfd, 0xaa, 0xc0, 0xce, 0x88, 0xc6, 0x1e, 0x65, 0x42, + 0xa6, 0x42, 0xa1, 0xe8, 0xee, 0x42, 0xe5, 0x62, 0xa1, 0x9f, 0xa4, 0x54, 0x72, 0x72, 0x46, 0x63, + 0x66, 0x28, 0x3c, 0x6d, 0xbd, 0x90, 0xf6, 0x38, 0x23, 0x45, 0xf6, 0x95, 0x2f, 0x6e, 0xc3, 0xa3, + 0x34, 0x64, 0x4c, 0x59, 0xe8, 0x2f, 0x13, 0x2f, 0x0c, 0x66, 0xe7, 0x5e, 0x70, 0x1a, 0x9e, 0x73, + 0xb1, 0x14, 0xb2, 0xb7, 0x70, 0x2e, 0xc8, 0x8a, 0x9b, 0x72, 0x0a, 0xbf, 0x0f, 0xe0, 0xb8, 0x6e, + 0x4c, 0x5d, 0x27, 0xa1, 0x99, 0x46, 0xbb, 0xed, 0x07, 0x79, 0xb6, 0x8e, 0xeb, 0xc6, 0x64, 0x83, + 0xc7, 0x9f, 0xc3, 0x7e, 0xe4, 0xc4, 0x89, 0xe7, 0xf8, 0x69, 0x16, 0xde, 0xf9, 0xd9, 0xa9, 0xc7, + 0x9c, 0xb9, 0x4f, 0x4f, 0x0d, 0xf5, 0x10, 0x35, 0xab, 0xe4, 0x89, 0x70, 0xc8, 0x27, 0xe3, 0x99, + 0xa0, 0xf1, 0x77, 0xb7, 0xec, 0x65, 0x49, 0xec, 0x24, 0xd4, 0xbd, 0x34, 0x2a, 0xbc, 0x9d, 0x07, + 0x79, 0xe2, 0x2f, 0x8b, 0x31, 0x46, 0xc2, 0xed, 0x7f, 0xc1, 0x73, 0x02, 0x1f, 0x40, 0x8d, 0xbd, + 0xf6, 0xa2, 0xd9, 0xc9, 0xd9, 0x32, 0x78, 0xcd, 0x8c, 0x2a, 0x3f, 0x0a, 0xa4, 0x50, 0x8f, 0x23, + 0xd6, 0xf7, 0xb0, 0x9b, 0xb7, 0x46, 0x4c, 0x6c, 0x13, 0x54, 0xc6, 0x11, 0xde, 0x99, 0x5a, 0x7b, + 0x77, 0x35, 0x4b, 0x1c, 0x7d, 0x21, 0x11, 0xc1, 0x63, 0x13, 0x2a, 0xe7, 0x4e, 0x1c, 0x78, 0x81, + 0xcb, 0x3b, 0xa5, 0xbd, 0x90, 0x48, 0x0e, 0x74, 0xab, 0xa0, 0xc6, 0x94, 0x2d, 0xfd, 0xc4, 0xfa, + 0x0d, 0xc1, 0x43, 0xde, 0x9e, 0x81, 0xb3, 0x58, 0x4f, 0xc0, 0xbd, 0x8a, 0xa1, 0x2d, 0x14, 0x93, + 0xb7, 0x53, 0xcc, 0x7a, 0x0e, 0x78, 0xf3, 0xb4, 0x42, 0x94, 0x3a, 0x94, 0x83, 0x14, 0xe0, 0xe3, + 0xae, 0x91, 0xcc, 0xc0, 0x26, 0x54, 0x45, 0xbd, 0xcc, 0x90, 0x39, 0xb1, 0xb2, 0xad, 0xdf, 0x91, + 0x08, 0xf4, 0xca, 0xf1, 0x97, 0xeb, 0xba, 0xeb, 0x50, 0xe6, 0xb7, 0x82, 0xd7, 0xa8, 0x91, 0xcc, + 0xb8, 0x5f, 0x0d, 0x79, 0x0b, 0x35, 0x94, 0x2d, 0xd5, 0xe8, 0xc3, 0x5e, 0xa1, 0x08, 0x21, 0xc7, + 0x63, 0x50, 0x7f, 0xe2, 0x88, 0xd0, 0x43, 0x58, 0xf7, 0x09, 0x72, 0x44, 0x40, 0x5b, 0xbd, 0x46, + 0xb8, 0x06, 0x95, 0xc9, 0xe0, 0x8b, 0xc1, 0x70, 0x3a, 0xd0, 0x25, 0xac, 0x41, 0xf9, 0xab, 0x89, + 0x4d, 0xbe, 0xd1, 0x11, 0xae, 0x42, 0x89, 0x4c, 0x5e, 0xda, 0xba, 0x9c, 0x7a, 0x8c, 0xfa, 0xcf, + 0xec, 0x5e, 0x87, 0xe8, 0x4a, 0xea, 0x31, 0x1a, 0x0f, 0x89, 0xad, 0x97, 0x52, 0x9c, 0xd8, 0x3d, + 0xbb, 0xff, 0xca, 0xd6, 0xcb, 0x47, 0x2d, 0x78, 0x72, 0x47, 0x49, 0x69, 0xa4, 0x69, 0x87, 0x88, + 0xf0, 0x9d, 0xee, 0x90, 0x8c, 0x75, 0x74, 0xd4, 0x85, 0x52, 0x7a, 0x77, 0x71, 0x05, 0x14, 0xd2, + 0x99, 0x66, 0x5c, 0x6f, 0x38, 0x19, 0x8c, 0x75, 0x94, 0x62, 0xa3, 0xc9, 0xb1, 0x2e, 0xa7, 0x8b, + 0xe3, 0xfe, 0x40, 0x57, 0xf8, 0xa2, 0xf3, 0x75, 0x96, 0x93, 0x7b, 0xd9, 0x44, 0x2f, 0xb7, 0x7f, + 0x96, 0xa1, 0xcc, 0x0b, 0xc1, 0x1f, 0x42, 0x29, 0x7d, 0xeb, 0xf1, 0x5e, 0x2e, 0xef, 0xc6, 0x4f, + 0x60, 0xd6, 0x8b, 0xa0, 0x10, 0xee, 0x33, 0x50, 0xb3, 0x6b, 0x84, 0x1f, 0x15, 0xaf, 0x55, 0xbe, + 0xed, 0xf1, 0x7f, 0xe1, 0x6c, 0xe3, 0x07, 0x08, 0xf7, 0x00, 0xd6, 0x83, 0x89, 0xf7, 0x0b, 0x2f, + 0xdf, 0xe6, 0xd5, 0x32, 0xcd, 0xdb, 0x28, 0x91, 0xff, 0x39, 0xd4, 0x36, 0xfa, 0x89, 0x8b, 0xae, + 0x85, 0x49, 0x35, 0xdf, 0xb9, 0x95, 0xcb, 0xe2, 0x74, 0xdf, 0xbd, 0xfa, 0xa7, 0x21, 0x5d, 0x5d, + 0x37, 0xd0, 0x9b, 0xeb, 0x06, 0xfa, 0xfb, 0xba, 0x81, 0x7e, 0xb9, 0x69, 0x48, 0x6f, 0x6e, 0x1a, + 0xd2, 0x1f, 0x37, 0x0d, 0xe9, 0xdb, 0x0a, 0xff, 0x6b, 0xa2, 0xf9, 0x5c, 0xe5, 0x9f, 0xe4, 0x47, + 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xa5, 0x34, 0x17, 0xba, 0x5c, 0x07, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1054,6 +995,16 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.SkipChunks { + i-- + if m.SkipChunks { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x40 + } if m.PartialResponseStrategy != 0 { i = encodeVarintRpc(dAtA, i, uint64(m.PartialResponseStrategy)) i-- @@ -1152,7 +1103,8 @@ func (m *SeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { } func (m *SeriesResponse_Series) MarshalTo(dAtA []byte) (int, error) { - return m.MarshalToSizedBuffer(dAtA[:m.Size()]) + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) } func (m *SeriesResponse_Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { @@ -1172,7 +1124,8 @@ func (m *SeriesResponse_Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } func (m *SeriesResponse_Warning) MarshalTo(dAtA []byte) (int, error) { - return m.MarshalToSizedBuffer(dAtA[:m.Size()]) + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) } func (m *SeriesResponse_Warning) MarshalToSizedBuffer(dAtA []byte) (int, error) { @@ -1448,6 +1401,9 @@ func (m *SeriesRequest) Size() (n int) { if m.PartialResponseStrategy != 0 { n += 1 + sovRpc(uint64(m.PartialResponseStrategy)) } + if m.SkipChunks { + n += 2 + } return n } @@ -2113,6 +2069,26 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { break } } + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SkipChunks", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SkipChunks = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -2710,6 +2686,7 @@ func (m *LabelValuesResponse) Unmarshal(dAtA []byte) error { func skipRpc(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 + depth := 0 for iNdEx < l { var wire uint64 for shift := uint(0); ; shift += 7 { @@ -2741,10 +2718,8 @@ func skipRpc(dAtA []byte) (n int, err error) { break } } - return iNdEx, nil case 1: iNdEx += 8 - return iNdEx, nil case 2: var length int for shift := uint(0); ; shift += 7 { @@ -2765,55 +2740,30 @@ func skipRpc(dAtA []byte) (n int, err error) { return 0, ErrInvalidLengthRpc } iNdEx += length - if iNdEx < 0 { - return 0, ErrInvalidLengthRpc - } - return iNdEx, nil case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowRpc - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipRpc(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - if iNdEx < 0 { - return 0, ErrInvalidLengthRpc - } - } - return iNdEx, nil + depth++ case 4: - return iNdEx, nil + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupRpc + } + depth-- case 5: iNdEx += 4 - return iNdEx, nil default: return 0, fmt.Errorf("proto: illegal wireType %d", wireType) } + if iNdEx < 0 { + return 0, ErrInvalidLengthRpc + } + if depth == 0 { + return iNdEx, nil + } } - panic("unreachable") + return 0, io.ErrUnexpectedEOF } var ( - ErrInvalidLengthRpc = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") + ErrInvalidLengthRpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupRpc = fmt.Errorf("proto: unexpected end of group") ) diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 278d41f350..219262f207 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -92,6 +92,9 @@ message SeriesRequest { // TODO(bwplotka): Move Thanos components to use strategy instead. Including QueryAPI. PartialResponseStrategy partial_response_strategy = 7; + + // skip_chunks controls whether sending chunks or not in series responses. + bool skip_chunks = 8; } enum Aggr { diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index 5a8ae93fce..89e3787d24 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -22,7 +22,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type Chunk_Encoding int32 @@ -1509,6 +1509,7 @@ func (m *LabelMatcher) Unmarshal(dAtA []byte) error { func skipTypes(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 + depth := 0 for iNdEx < l { var wire uint64 for shift := uint(0); ; shift += 7 { @@ -1540,10 +1541,8 @@ func skipTypes(dAtA []byte) (n int, err error) { break } } - return iNdEx, nil case 1: iNdEx += 8 - return iNdEx, nil case 2: var length int for shift := uint(0); ; shift += 7 { @@ -1564,55 +1563,30 @@ func skipTypes(dAtA []byte) (n int, err error) { return 0, ErrInvalidLengthTypes } iNdEx += length - if iNdEx < 0 { - return 0, ErrInvalidLengthTypes - } - return iNdEx, nil case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowTypes - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipTypes(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - if iNdEx < 0 { - return 0, ErrInvalidLengthTypes - } - } - return iNdEx, nil + depth++ case 4: - return iNdEx, nil + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupTypes + } + depth-- case 5: iNdEx += 4 - return iNdEx, nil default: return 0, fmt.Errorf("proto: illegal wireType %d", wireType) } + if iNdEx < 0 { + return 0, ErrInvalidLengthTypes + } + if depth == 0 { + return iNdEx, nil + } } - panic("unreachable") + return 0, io.ErrUnexpectedEOF } var ( - ErrInvalidLengthTypes = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowTypes = fmt.Errorf("proto: integer overflow") + ErrInvalidLengthTypes = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowTypes = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupTypes = fmt.Errorf("proto: unexpected end of group") ) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index db818d67cd..40de141811 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -107,20 +107,23 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer for set.Next() { series := set.At() - // TODO(fabxc): An improvement over this trivial approach would be to directly - // use the chunks provided by TSDB in the response. - // But since the sidecar has a similar approach, optimizing here has only - // limited benefit for now. - // NOTE: XOR encoding supports a max size of 2^16 - 1 samples, so we need - // to chunk all samples into groups of no more than 2^16 - 1 - // See: https://github.com/thanos-io/thanos/pull/1038. - c, err := s.encodeChunks(series.Iterator(), math.MaxUint16) - if err != nil { - return status.Errorf(codes.Internal, "encode chunk: %s", err) - } - respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels) - respSeries.Chunks = append(respSeries.Chunks[:0], c...) + + if !r.SkipChunks { + // TODO(fabxc): An improvement over this trivial approach would be to directly + // use the chunks provided by TSDB in the response. + // But since the sidecar has a similar approach, optimizing here has only + // limited benefit for now. + // NOTE: XOR encoding supports a max size of 2^16 - 1 samples, so we need + // to chunk all samples into groups of no more than 2^16 - 1 + // See: https://github.com/thanos-io/thanos/pull/1038. + c, err := s.encodeChunks(series.Iterator(), math.MaxUint16) + if err != nil { + return status.Errorf(codes.Internal, "encode chunk: %s", err) + } + + respSeries.Chunks = append(respSeries.Chunks[:0], c...) + } if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { return status.Error(codes.Aborted, err.Error()) diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 3b1483f194..6e0354df36 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -34,6 +34,131 @@ func TestTSDBStore_Info(t *testing.T) { testutil.Equals(t, int64(math.MaxInt64), resp.MaxTime) } +func TestTSDBStore_Series(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := testutil.NewTSDB() + defer func() { testutil.Ok(t, db.Close()) }() + testutil.Ok(t, err) + + tsdbStore := NewTSDBStore(nil, nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + + appender := db.Appender() + + for i := 1; i <= 3; i++ { + _, err = appender.Add(labels.FromStrings("a", "1"), int64(i), float64(i)) + testutil.Ok(t, err) + } + err = appender.Commit() + testutil.Ok(t, err) + + for _, tc := range []struct { + title string + req *storepb.SeriesRequest + expectedSeries []rawSeries + expectedError string + }{ + { + title: "total match series", + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "region", Value: "eu-west"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + }, + { + title: "partially match time range series", + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 2, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "region", Value: "eu-west"}}, + chunks: [][]sample{{{1, 1}, {2, 2}}}, + }, + }, + }, + { + title: "dont't match time range series", + req: &storepb.SeriesRequest{ + MinTime: 4, + MaxTime: 6, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + }, + expectedSeries: []rawSeries{}, + }, + { + title: "only match external label", + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, + }, + }, + expectedError: "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", + }, + { + title: "dont't match labels", + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "b", Value: "1"}, + }, + }, + expectedSeries: []rawSeries{}, + }, + { + title: "no chunk", + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + SkipChunks: true, + }, + expectedSeries: []rawSeries{ + { + lset: []storepb.Label{{Name: "a", Value: "1"}, {Name: "region", Value: "eu-west"}}, + }, + }, + }, + } { + if ok := t.Run(tc.title, func(t *testing.T) { + srv := newStoreSeriesServer(ctx) + err := tsdbStore.Series(tc.req, srv) + if len(tc.expectedError) > 0 { + testutil.NotOk(t, err) + testutil.Equals(t, tc.expectedError, err.Error()) + } else { + testutil.Ok(t, err) + seriesEquals(t, tc.expectedSeries, srv.SeriesSet) + } + }); !ok { + return + } + } +} + func TestTSDBStore_LabelNames(t *testing.T) { var err error defer leaktest.CheckTimeout(t, 10*time.Second)()