Skip to content

Commit

Permalink
Implement query pushdown for a subset of aggregations
Browse files Browse the repository at this point in the history
Certain aggregations can be executed safely on leaf nodes without
worrying about data duplication or overlap. One such example is the max
function which can be computed on local data by the leaves before it is
computed globally by the querier.

This commit implements local aggregation in the Prometheus sidecar for
all functions which are safe to execute locally.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Dec 1, 2021
1 parent afd23cf commit 99e0d67
Show file tree
Hide file tree
Showing 7 changed files with 642 additions and 87 deletions.
11 changes: 11 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func storeHintsFromPromHints(hints *storage.SelectHints) storepb.QueryHints {
return storepb.QueryHints{
StepMillis: hints.Step,
Func: hints.Func,
Grouping: hints.Grouping,
By: hints.By,
Range: hints.Range,
}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Expand Down Expand Up @@ -274,6 +284,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
QueryHints: storeHintsFromPromHints(hints),
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
Step: hints.Step,
Expand Down
55 changes: 51 additions & 4 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return nil
}

if r.QueryHints.IsFuncDistributive() {
// YOLO(fpetkovski)
return p.queryPrometheus(s, r)
}

q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}
for _, m := range matchers {
pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value}
Expand Down Expand Up @@ -220,18 +225,60 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset)
}

func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error {
ctx := s.Context()
func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error {
ctx := context.Background()
opts := promclient.QueryOptions{}
matrix, _, err := p.client.QueryRange(ctx, p.base, r.ToPromQL(), r.MinTime, r.MaxTime, r.QueryHints.StepMillis/1000, opts)
if err != nil {
return err
}

for _, vector := range matrix {
var lbls []labels.Label
for k, v := range vector.Metric {
lbls = append(lbls, labels.Label{
Name: string(k),
Value: string(v),
})
}
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(lbls),
Samples: make([]prompb.Sample, len(vector.Values)),
}

for i, sample := range vector.Values {
series.Samples[i] = prompb.Sample{
Value: float64(sample.Value),
Timestamp: int64(sample.Timestamp),
}
}

chks, err := p.chunkSamples(series, MaxSamplesPerChunk)
if err != nil {
return err
}

if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{
Labels: series.Labels,
Chunks: chks,
})); err != nil {
return err
}
}

return nil
}

func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error {
level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.")

resp, err := p.fetchSampledResponse(ctx, httpResp)
resp, err := p.fetchSampledResponse(s.Context(), httpResp)
querySpan.Finish()
if err != nil {
return err
}

span, _ := tracing.StartSpan(ctx, "transform_and_respond")
span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
defer span.Finish()
span.SetTag("series_count", len(resp.Results[0].Timeseries))

Expand Down
1 change: 1 addition & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
Aggregates: r.Aggregates,
MaxResolutionWindow: r.MaxResolutionWindow,
SkipChunks: r.SkipChunks,
QueryHints: r.QueryHints,
PartialResponseDisabled: r.PartialResponseDisabled,
}
wg = &sync.WaitGroup{}
Expand Down
31 changes: 31 additions & 0 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,34 @@ func (c *SeriesStatsCounter) Count(series *Series) {
}
}
}

func (m *SeriesRequest) ToPromQL() string {
grouping := strings.Join(m.QueryHints.Grouping, ",")
matchers := MatchersToString(m.Matchers...)
return fmt.Sprintf("%s %s (%s) (%s)", m.QueryHints.Func, m.QueryHints.ByOrWithout(), grouping, matchers)
}

func (m *QueryHints) IsFuncDistributive() bool {
distributiveFuncs := []string{
"max",
"min",
"group",
"topk",
"bottomk",
"count_values",
}
for _, op := range distributiveFuncs {
if m.Func == op {
return true
}
}

return false
}

func (m *QueryHints) ByOrWithout() string {
if m.By {
return "by"
}
return "without"
}
80 changes: 80 additions & 0 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,83 @@ func TestMatchersToString_Translate(t *testing.T) {

}
}

func TestSeriesRequestToPromQL(t *testing.T) {
ts := []struct {
name string
r *SeriesRequest
expected string
}{
{
name: "Single matcher regular expression",
r: &SeriesRequest{
Matchers: []LabelMatcher{
{
Type: LabelMatcher_RE,
Name: "namespace",
Value: "kube-.+",
},
},
QueryHints: QueryHints{
Func: "max",
Grouping: []string{"container", "pod"},
By: false,
Range: 0,
},
},
expected: `max without (container,pod) ({namespace=~"kube-.+"})`,
},
{
name: "Single matcher with __name__ label",
r: &SeriesRequest{
Matchers: []LabelMatcher{
{
Type: LabelMatcher_EQ,
Name: "__name__",
Value: "kube_pod_info",
},
},
QueryHints: QueryHints{
Func: "max",
Grouping: []string{"container", "pod"},
By: false,
Range: 0,
},
},
expected: `max without (container,pod) ({__name__="kube_pod_info"})`,
},
{
name: "Multiple matchers",
r: &SeriesRequest{
Matchers: []LabelMatcher{
{
Type: LabelMatcher_EQ,
Name: "__name__",
Value: "kube_pod_info",
},
{
Type: LabelMatcher_RE,
Name: "namespace",
Value: "kube-.+",
},
},
QueryHints: QueryHints{
Func: "max",
Grouping: []string{"container", "pod"},
By: false,
Range: 0,
},
},
expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"})`,
},
}

for _, tc := range ts {
t.Run(t.Name(), func(t *testing.T) {
actual := tc.r.ToPromQL()
if tc.expected != actual {
t.Fatalf("invalid promql result, got %s, want %s", actual, tc.expected)
}
})
}
}

0 comments on commit 99e0d67

Please sign in to comment.