Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skipChunk option in series API #1904

Merged
merged 1 commit into from Jan 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Empty file removed .dep-finished
Empty file.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -23,6 +23,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts.
- [#1881](https://github.com/thanos-io/thanos/pull/1881) Store Gateway: memcached support for index cache. See [documentation](docs/components/store.md/#index-cache) for further information.
- [#1904](https://github.com/thanos-io/thanos/pull/1904) Add a skip-chunks option in Store Series API to improve the response time of `/api/v1/series` endpoint.

## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03

Expand Down
10 changes: 5 additions & 5 deletions pkg/query/api/v1.go
Expand Up @@ -293,7 +293,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -386,7 +386,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse),
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -426,7 +426,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -503,7 +503,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse).
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
Expand Down Expand Up @@ -607,7 +607,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/query/querier.go
Expand Up @@ -19,18 +19,19 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
}
}
}
Expand All @@ -42,11 +43,12 @@ type queryable struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.skipChunks), nil
}

type querier struct {
Expand All @@ -59,6 +61,7 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -72,6 +75,7 @@ func newQuerier(
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
skipChunks bool,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -93,6 +97,7 @@ func newQuerier(
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
}
}

Expand Down Expand Up @@ -185,6 +190,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
return nil, nil, errors.Wrap(err, "proxy Series()")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/query/querier_test.go
Expand Up @@ -28,7 +28,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, testProxy)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, oneHourMillis, false)
queryable := queryableCreator(false, nil, oneHourMillis, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand All @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
},
}

q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false)
q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false, false)

engine := promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestQuerier_Series(t *testing.T) {

// Querier clamps the range to [1,300], which should drop some samples of the result above.
// The store API allows endpoints to send more data then initially requested.
q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true)
q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true, false)
defer func() { testutil.Ok(t, q.Close()) }()

res, _, err := q.Select(&storage.SelectParams{})
Expand Down
13 changes: 9 additions & 4 deletions pkg/store/bucket.go
Expand Up @@ -946,11 +946,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
for set.Next() {
var series storepb.Series

series.Labels, series.Chunks = set.At()

stats.mergedSeriesCount++
stats.mergedChunksCount += len(series.Chunks)
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))

if req.SkipChunks {
series.Labels, _ = set.At()
} else {
series.Labels, series.Chunks = set.At()

stats.mergedChunksCount += len(series.Chunks)
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
}

if err := srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
Expand Down
34 changes: 31 additions & 3 deletions pkg/store/bucket_e2e_test.go
Expand Up @@ -173,8 +173,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {

// TODO(bwplotka): Add those test cases to TSDB querier_test.go as well, there are no tests for matching.
for i, tcase := range []struct {
req *storepb.SeriesRequest
expected [][]storepb.Label
req *storepb.SeriesRequest
expected [][]storepb.Label
expectedChunkLen int
}{
{
req: &storepb.SeriesRequest{
Expand All @@ -184,6 +185,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -203,6 +205,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -218,6 +221,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -233,6 +237,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -252,6 +257,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -271,6 +277,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -286,6 +293,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
Expand All @@ -309,6 +317,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -324,6 +333,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MinTime: mint,
MaxTime: maxt,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -347,6 +357,24 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
// Test no-chunk option.
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
SkipChunks: true,
},
expectedChunkLen: 0,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
} {
t.Log("Run ", i)

Expand All @@ -357,7 +385,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {

for i, s := range srv.SeriesSet {
testutil.Equals(t, tcase.expected[i], s.Labels)
testutil.Equals(t, 3, len(s.Chunks))
testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks))
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/store/matchers.go
Expand Up @@ -33,3 +33,21 @@ func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err er
}
return res, nil
}

// matchersToString converts label matchers to string format.
func matchersToString(ms []storepb.LabelMatcher) (string, error) {
var res string
matchers, err := translateMatchers(ms)
if err != nil {
return "", err
}

for i, m := range matchers {
res += m.String()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, we do strings.Join but this is fine for now (:

if i < len(matchers)-1 {
res += ", "
}
}

return "{" + res + "}", nil
}
86 changes: 86 additions & 0 deletions pkg/store/matchers_test.go
@@ -0,0 +1,86 @@
package store

import (
"testing"

"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMatchersToString(t *testing.T) {
cases := []struct {
ms []storepb.LabelMatcher
expected string
}{
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_EQ,
Value: "up",
}},
expected: `{__name__="up"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_NEQ,
Value: "up",
},
{
Name: "job",
Type: storepb.LabelMatcher_EQ,
Value: "test",
},
},
expected: `{__name__!="up", job="test"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_EQ,
Value: "up",
},
{
Name: "job",
Type: storepb.LabelMatcher_RE,
Value: "test",
},
},
expected: `{__name__="up", job=~"test"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "job",
Type: storepb.LabelMatcher_NRE,
Value: "test",
}},
expected: `{job!~"test"}`,
},
{
ms: []storepb.LabelMatcher{
{
Name: "__name__",
Type: storepb.LabelMatcher_EQ,
Value: "up",
},
{
Name: "__name__",
Type: storepb.LabelMatcher_NEQ,
Value: "up",
},
},
// We cannot use up{__name__!="up"} in this case.
expected: `{__name__="up", __name__!="up"}`,
},
}

for i, c := range cases {
actual, err := matchersToString(c.ms)
testutil.Ok(t, err)
testutil.Assert(t, actual == c.expected, "test case %d failed, expected %s, actual %s", i, c.expected, actual)
}
}