Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lookback_delta on query frontend #5854

Merged
merged 2 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

- [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval.
- [#5854](https://github.com/thanos-io/thanos/pull/5854) Query Frontend: Handles `lookback_delta` param in query frontend.

### Added

Expand Down
2 changes: 1 addition & 1 deletion pkg/queryfrontend/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (t thanosCacheKeyGenerator) GenerateCacheKey(userID string, r queryrange.Re
for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ {
}
shardInfoKey := generateShardInfoKey(tr)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey)
return fmt.Sprintf("fe:%s:%s:%d:%d:%d:%s:%d", userID, tr.Query, tr.Step, currentInterval, i, shardInfoKey, tr.LookbackDelta)
case *ThanosLabelsRequest:
return fmt.Sprintf("fe:%s:%s:%s:%d", userID, tr.Label, tr.Matchers, currentInterval)
case *ThanosSeriesRequest:
Expand Down
21 changes: 16 additions & 5 deletions pkg/queryfrontend/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 60 * seconds,
},
expected: "fe::up:60000:0:2:-",
expected: "fe::up:60000:0:2:-:0",
},
{
name: "10s step",
Expand All @@ -48,7 +48,7 @@ func TestGenerateCacheKey(t *testing.T) {
Start: 0,
Step: 10 * seconds,
},
expected: "fe::up:10000:0:2:-",
expected: "fe::up:10000:0:2:-:0",
},
{
name: "1m downsampling resolution",
Expand All @@ -58,7 +58,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 60 * seconds,
},
expected: "fe::up:10000:0:2:-",
expected: "fe::up:10000:0:2:-:0",
},
{
name: "5m downsampling resolution, different cache key",
Expand All @@ -68,7 +68,7 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: 300 * seconds,
},
expected: "fe::up:10000:0:1:-",
expected: "fe::up:10000:0:1:-:0",
},
{
name: "1h downsampling resolution, different cache key",
Expand All @@ -78,7 +78,18 @@ func TestGenerateCacheKey(t *testing.T) {
Step: 10 * seconds,
MaxSourceResolution: hour,
},
expected: "fe::up:10000:0:0:-",
expected: "fe::up:10000:0:0:-:0",
},
{
name: "1h downsampling resolution with lookback delta",
req: &ThanosQueryRangeRequest{
Query: "up",
Start: 0,
Step: 10 * seconds,
MaxSourceResolution: hour,
LookbackDelta: 1000,
},
expected: "fe::up:10000:0:0:-:1000",
},
{
name: "label names, no matcher",
Expand Down
9 changes: 9 additions & 0 deletions pkg/queryfrontend/queryinstant_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (c queryInstantCodec) DecodeRequest(_ context.Context, r *http.Request, for
return nil, err
}

result.LookbackDelta, err = parseLookbackDelta(r.Form, queryv1.LookbackDeltaParam)
if err != nil {
return nil, err
}

result.Query = r.FormValue("query")
result.Path = r.URL.Path

Expand Down Expand Up @@ -161,6 +166,10 @@ func (c queryInstantCodec) EncodeRequest(ctx context.Context, r queryrange.Reque
params[queryv1.ShardInfoParam] = []string{data}
}

if thanosReq.LookbackDelta > 0 {
params[queryv1.LookbackDeltaParam] = []string{encodeDurationMillis(thanosReq.LookbackDelta)}
}

req, err := http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
Expand Down
11 changes: 11 additions & 0 deletions pkg/queryfrontend/queryinstant_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func TestQueryInstantCodec_DecodeRequest(t *testing.T) {
},
},
},
{
name: "lookback_delta",
url: "/api/v1/query?lookback_delta=1000",
partialResponse: false,
expectedRequest: &ThanosQueryInstantRequest{
Path: "/api/v1/query",
Dedup: true,
LookbackDelta: 1000000,
StoreMatchers: [][]*labels.Matcher{},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
r, err := http.NewRequest(http.MethodGet, tc.url, nil)
Expand Down
22 changes: 18 additions & 4 deletions pkg/queryfrontend/queryrange_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, forwa
return nil, err
}

result.LookbackDelta, err = parseLookbackDelta(r.Form, queryv1.LookbackDeltaParam)
if err != nil {
return nil, err
}

result.Query = r.FormValue("query")
result.Path = r.URL.Path

Expand Down Expand Up @@ -179,6 +184,10 @@ func (c queryRangeCodec) EncodeRequest(ctx context.Context, r queryrange.Request
params[queryv1.ShardInfoParam] = []string{data}
}

if thanosReq.LookbackDelta > 0 {
params[queryv1.LookbackDeltaParam] = []string{encodeDurationMillis(thanosReq.LookbackDelta)}
}

req, err := http.NewRequest(http.MethodPost, thanosReq.Path, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
Expand Down Expand Up @@ -260,13 +269,18 @@ func parseMatchersParam(ss url.Values, matcherParam string) ([][]*labels.Matcher
return matchers, nil
}

func parseShardInfo(ss url.Values, key string) (*storepb.ShardInfo, error) {
func parseLookbackDelta(ss url.Values, key string) (int64, error) {
data, ok := ss[key]
if !ok {
return nil, nil
if !ok || len(data) == 0 {
return 0, nil
}

if len(data) == 0 {
return parseDurationMillis(data[0])
}

func parseShardInfo(ss url.Values, key string) (*storepb.ShardInfo, error) {
data, ok := ss[key]
if !ok || len(data) == 0 {
return nil, nil
}

Expand Down
29 changes: 29 additions & 0 deletions pkg/queryfrontend/queryrange_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ func TestQueryRangeCodec_DecodeRequest(t *testing.T) {
},
},
},
{
name: "lookback_delta",
url: `/api/v1/query_range?start=123&end=456&step=1&lookback_delta=1000`,
partialResponse: false,
expectedRequest: &ThanosQueryRangeRequest{
Path: "/api/v1/query_range",
Start: 123000,
End: 456000,
Step: 1000,
Dedup: true,
LookbackDelta: 1000000,
StoreMatchers: [][]*labels.Matcher{},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
r, err := http.NewRequest(http.MethodGet, tc.url, nil)
Expand Down Expand Up @@ -269,6 +283,21 @@ func TestQueryRangeCodec_EncodeRequest(t *testing.T) {
r.FormValue(queryv1.MaxSourceResolutionParam) == "3600"
},
},
{
name: "Lookback delta",
req: &ThanosQueryRangeRequest{
Start: 123000,
End: 456000,
Step: 1000,
LookbackDelta: 1000,
},
checkFunc: func(r *http.Request) bool {
return r.FormValue("start") == "123" &&
r.FormValue("end") == "456" &&
r.FormValue("step") == "1" &&
r.FormValue(queryv1.LookbackDeltaParam) == "1"
},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
Expand Down
2 changes: 2 additions & 0 deletions pkg/queryfrontend/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ThanosQueryRangeRequest struct {
Headers []*RequestHeader
Stats string
ShardInfo *storepb.ShardInfo
LookbackDelta int64
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand Down Expand Up @@ -152,6 +153,7 @@ type ThanosQueryInstantRequest struct {
Headers []*RequestHeader
Stats string
ShardInfo *storepb.ShardInfo
LookbackDelta int64 // in milliseconds.
}

// IsDedupEnabled returns true if deduplication is enabled.
Expand Down