Skip to content

Commit

Permalink
Implement query pushdown for a subset of aggregations
Browse files Browse the repository at this point in the history
Certain aggregations can be executed safely on leaf nodes without
worrying about data duplication or overlap. One such example is the max
function which can be computed on local data by the leaves before it is
computed globally by the querier.

This commit implements local aggregation in the Prometheus sidecar for
all functions which are safe to execute locally. The feature can be
enabled by passing the `--enable-feature evaluate-queries` flag to the sidecar.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Dec 16, 2021
1 parent f527d26 commit d8c5c71
Show file tree
Hide file tree
Showing 16 changed files with 1,610 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased
- [#4917](https://github.com/thanos-io/thanos/pull/4917) Sidecar: Add an `evaluate-queries` feature to Thanos sidecar to enable local execution of certain queries.

### Added

Expand Down
11 changes: 9 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
queryPushdown = "query-pushdown"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -160,7 +161,7 @@ func registerQuery(app *extkingpin.App) {
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+" and "+promqlAtModifier+".").Default("").Strings()
featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+", "+promqlAtModifier+" and "+queryPushdown+".").Default("").Strings()

enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling.").
Hidden().Default("true").Bool()
Expand All @@ -181,14 +182,17 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "parse federation labels")
}

var enableNegativeOffset, enableAtModifier bool
var enableNegativeOffset, enableAtModifier, enableQueryPushdown bool
for _, feature := range *featureList {
if feature == promqlNegativeOffset {
enableNegativeOffset = true
}
if feature == promqlAtModifier {
enableAtModifier = true
}
if feature == queryPushdown {
enableQueryPushdown = true
}
}

httpLogOpts, err := logging.ParseHTTPOptions(*reqLogDecision, reqLogConfig)
Expand Down Expand Up @@ -278,6 +282,7 @@ func registerQuery(app *extkingpin.App) {
*webDisableCORS,
enableAtModifier,
enableNegativeOffset,
enableQueryPushdown,
*alertQueryURL,
component.Query,
)
Expand Down Expand Up @@ -346,6 +351,7 @@ func runQuery(
disableCORS bool,
enableAtModifier bool,
enableNegativeOffset bool,
enableQueryPushdown bool,
alertQueryURL string,
comp component.Component,
) error {
Expand Down Expand Up @@ -614,6 +620,7 @@ func runQuery(
enableTargetPartialResponse,
enableMetricMetadataPartialResponse,
enableExemplarPartialResponse,
enableQueryPushdown,
queryReplicaLabels,
flagsMap,
defaultRangeQueryStep,
Expand Down
3 changes: 2 additions & 1 deletion docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ Flags:
in all alerts 'Source' field.
--enable-feature= ... Comma separated experimental feature names to
enable.The current list of features is
promql-negative-offset and promql-at-modifier.
promql-negative-offset, promql-at-modifier and
query-pushdown.
--endpoint=<endpoint> ... Addresses of statically configured Thanos API
servers (repeatable). The scheme may be
prefixed with 'dns+' or 'dnssrv+' to detect
Expand Down
13 changes: 8 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type QueryAPI struct {
enableTargetPartialResponse bool
enableMetricMetadataPartialResponse bool
enableExemplarPartialResponse bool
enableQueryPushdown bool
disableCORS bool

replicaLabels []string
Expand Down Expand Up @@ -120,6 +121,7 @@ func NewQueryAPI(
enableTargetPartialResponse bool,
enableMetricMetadataPartialResponse bool,
enableExemplarPartialResponse bool,
enableQueryPushdown bool,
replicaLabels []string,
flagsMap map[string]string,
defaultRangeQueryStep time.Duration,
Expand All @@ -146,6 +148,7 @@ func NewQueryAPI(
enableTargetPartialResponse: enableTargetPartialResponse,
enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse,
enableExemplarPartialResponse: enableExemplarPartialResponse,
enableQueryPushdown: enableQueryPushdown,
replicaLabels: replicaLabels,
endpointStatus: endpointStatus,
defaultRangeQueryStep: defaultRangeQueryStep,
Expand Down Expand Up @@ -342,7 +345,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -459,7 +462,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false),
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -532,7 +535,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -619,7 +622,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, true).
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -669,7 +672,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down
31 changes: 27 additions & 4 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
duration := promauto.With(
extprom.WrapRegistererWithPrefix("concurrent_selects_", reg),
).NewHistogram(gate.DurationHistogramOpts)

return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, enableQueryPushdown, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
Expand All @@ -56,6 +56,7 @@ func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy sto
},
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
enableQueryPushdown: enableQueryPushdown,
}
}
}
Expand All @@ -72,11 +73,12 @@ type queryable struct {
gateProviderFn func() gate.Gate
maxConcurrentSelects int
selectTimeout time.Duration
enableQueryPushdown bool
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout), nil
}

type querier struct {
Expand All @@ -90,6 +92,7 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
enableQueryPushdown bool
skipChunks bool
selectGate gate.Gate
selectTimeout time.Duration
Expand All @@ -106,7 +109,7 @@ func newQuerier(
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
partialResponse, skipChunks bool,
partialResponse, enableQueryPushdown bool, skipChunks bool,
selectGate gate.Gate,
selectTimeout time.Duration,
) *querier {
Expand Down Expand Up @@ -135,6 +138,7 @@ func newQuerier(
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
enableQueryPushdown: enableQueryPushdown,
}
}

Expand Down Expand Up @@ -193,6 +197,20 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) storepb.QueryHints {
return storepb.QueryHints{
StepMillis: hints.Step,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -268,12 +286,17 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// TODO(bwplotka): Use inprocess gRPC.
resp := &seriesServer{ctx: ctx}
var queryHints storepb.QueryHints
if q.enableQueryPushdown {
queryHints = storeHintsFromPromHints(hints)
}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
QueryHints: &queryHints,
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
Step: hints.Step,
Expand Down
12 changes: 6 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand All @@ -71,7 +71,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
}

timeout := 10 * time.Second
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false)
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false, false)
engine := promql.NewEngine(
promql.EngineOpts{
MaxSamples: math.MaxInt32,
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) {
g := gate.New(2)
mq := &mockedQueryable{
Creator: func(mint, maxt int64) storage.Querier {
return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout)
},
}
t.Cleanup(func() {
Expand Down Expand Up @@ -606,7 +606,7 @@ func TestQuerier_Select(t *testing.T) {
{dedup: true, expected: []series{tcase.expectedAfterDedup}},
} {
g := gate.New(2)
q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
q := newQuerier(context.Background(), nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout)
t.Cleanup(func() { testutil.Ok(t, q.Close()) })

t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) {
Expand Down Expand Up @@ -835,7 +835,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 100 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 5 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestQuerier_Proxy(t *testing.T) {
name: fmt.Sprintf("store number %v", i),
})
}
return q(true, nil, nil, 0, false, false)
return q(true, nil, nil, 0, false, false, false)
}

for _, fn := range files {
Expand Down

0 comments on commit d8c5c71

Please sign in to comment.