Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --query-range.request-downsampled flag to Query Frontend (#2641) #3723

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
### Added

- [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks
- [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request.

### Changed

Expand Down
3 changes: 3 additions & 0 deletions cmd/thanos/query_frontend.go
Expand Up @@ -65,6 +65,9 @@ func registerQueryFrontend(app *extkingpin.App) {
cmd.Flag("query-range.align-range-with-step", "Mutate incoming queries to align their start and end with their step for better cache-ability. Note: Grafana dashboards do that by default.").
Default("true").BoolVar(&cfg.QueryRangeConfig.AlignRangeWithStep)

cmd.Flag("query-range.request-downsampled", "Make additional query for downsampled data in case of empty or incomplete response to range request.").
Default("true").BoolVar(&cfg.QueryRangeConfig.RequestDownsampled)

cmd.Flag("query-range.split-interval", "Split query range requests by an interval and execute in parallel, it should be greater than 0 when query-range.response-cache-config is configured.").
Default("24h").DurationVar(&cfg.QueryRangeConfig.SplitQueriesByInterval)

Expand Down
4 changes: 4 additions & 0 deletions docs/components/query-frontend.md
Expand Up @@ -147,6 +147,10 @@ Flags:
and end with their step for better
cache-ability. Note: Grafana dashboards do that
by default.
--query-range.request-downsampled
Make additional query for downsampled data in
case of empty or incomplete response to range
request.
--query-range.split-interval=24h
Split query range requests by an interval and
execute in parallel, it should be greater than
Expand Down
1 change: 1 addition & 0 deletions pkg/queryfrontend/config.go
Expand Up @@ -162,6 +162,7 @@ type QueryRangeConfig struct {
CachePathOrContent extflag.PathOrContent

AlignRangeWithStep bool
RequestDownsampled bool
SplitQueriesByInterval time.Duration
MaxRetries int
Limits *cortexvalidation.Limits
Expand Down
103 changes: 103 additions & 0 deletions pkg/queryfrontend/downsampled.go
@@ -0,0 +1,103 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package queryfrontend

import (
"context"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

// DownsampledMiddleware creates a new Middleware that requests downsampled data
// should response to original request with auto max_source_resolution not contain data points.
func DownsampledMiddleware(merger queryrange.Merger, registerer prometheus.Registerer) queryrange.Middleware {
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return downsampled{
next: next,
merger: merger,
extraCounter: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "thanos",
Name: "frontend_downsampled_extra_queries_total",
Help: "Total number of extra queries for downsampled data",
}),
}
})
}

type downsampled struct {
next queryrange.Handler
merger queryrange.Merger

// Metrics.
extraCounter prometheus.Counter
krya-kryak marked this conversation as resolved.
Show resolved Hide resolved
}

var resolutions = []int64{downsample.ResLevel1, downsample.ResLevel2}

func (d downsampled) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
tqrr, ok := req.(*ThanosQueryRangeRequest)
if !ok || !tqrr.AutoDownsampling {
return d.next.Do(ctx, req)
}

var (
resps = make([]queryrange.Response, 0)
resp queryrange.Response
err error
i int
)

forLoop:
for i < len(resolutions) {
if i > 0 {
d.extraCounter.Inc()
}
r := *tqrr
resp, err = d.next.Do(ctx, &r)
if err != nil {
return nil, err
}
resps = append(resps, resp)
// Set MaxSourceResolution for next request, if any.
for i < len(resolutions) {
if tqrr.MaxSourceResolution < resolutions[i] {
tqrr.AutoDownsampling = false
tqrr.MaxSourceResolution = resolutions[i]
break
}
i++
}
m := minResponseTime(resp)
switch m {
case tqrr.Start: // Response not impacted by retention policy.
break forLoop
case -1: // Empty response, retry with higher MaxSourceResolution.
continue
default: // Data partially present, query for empty part with higher MaxSourceResolution.
tqrr.End = m - tqrr.Step
}
if tqrr.Start > tqrr.End {
break forLoop
}
}
response, err := d.merger.MergeResponse(resps...)
if err != nil {
return nil, err
}
return response, nil
}

func minResponseTime(r queryrange.Response) int64 {
var res = r.(*queryrange.PrometheusResponse).Data.Result
if len(res) == 0 {
return -1
}
if len(res[0].Samples) == 0 {
return -1
}
return res[0].Samples[0].TimestampMs
}
10 changes: 9 additions & 1 deletion pkg/queryfrontend/roundtrip.go
Expand Up @@ -131,7 +131,7 @@ func getOperation(r *http.Request) string {
}

// newQueryRangeTripperware returns a Tripperware for range queries configured with middlewares of
// limit, step align, split by interval, cache requests and retry.
// limit, step align, downsampled, split by interval, cache requests and retry.
func newQueryRangeTripperware(
config QueryRangeConfig,
limits queryrange.Limits,
Expand All @@ -151,6 +151,14 @@ func newQueryRangeTripperware(
)
}

if config.RequestDownsampled {
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrange.InstrumentMiddleware("downsampled", m),
DownsampledMiddleware(codec, reg),
)
}

queryIntervalFn := func(_ queryrange.Request) time.Duration {
return config.SplitQueriesByInterval
}
Expand Down