Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sorted queries do not produce sorted results for shardable queries #6125

Merged
merged 9 commits into from Feb 23, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -34,6 +34,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6103](https://github.com/thanos-io/thanos/pull/6103) Mixins(Rule): Fix query for long rule evaluations.
- [#6121](https://github.com/thanos-io/thanos/pull/6121) Receive: Deduplicate metamonitoring queries.
- [#6137](https://github.com/thanos-io/thanos/pull/6137) Downsample: Repair of non-empty XOR chunks during 1h downsampling.
- [#6125](https://github.com/thanos-io/thanos/pull/6125) Query Frontend: Fix vertical shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk` and `bottomk` functions.

### Changed

Expand Down
4 changes: 2 additions & 2 deletions internal/cortex/querier/queryrange/query_range.go
Expand Up @@ -72,7 +72,7 @@ type Codec interface {
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
type Merger interface {
// MergeResponse merges responses from multiple requests into a single Response
MergeResponse(...Response) (Response, error)
MergeResponse(Request, ...Response) (Response, error)
}

// Request represents a query range request that can be process by middlewares.
Expand Down Expand Up @@ -192,7 +192,7 @@ func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse {
}
}

func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response, error) {
if len(responses) == 0 {
return NewEmptyPrometheusResponse(), nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/cortex/querier/queryrange/query_range_test.go
Expand Up @@ -653,7 +653,7 @@ func TestMergeAPIResponses(t *testing.T) {
},
}} {
t.Run(tc.name, func(t *testing.T) {
output, err := PrometheusCodec.MergeResponse(tc.input...)
output, err := PrometheusCodec.MergeResponse(nil, tc.input...)
require.NoError(t, err)
require.Equal(t, tc.expected, output)
})
Expand Down
6 changes: 3 additions & 3 deletions internal/cortex/querier/queryrange/results_cache.go
Expand Up @@ -404,7 +404,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
return nil, nil, err
}
if len(requests) == 0 {
response, err := s.merger.MergeResponse(responses...)
response, err := s.merger.MergeResponse(r, responses...)
// No downstream requests so no need to write back to the cache.
return response, nil, err
}
Expand Down Expand Up @@ -466,7 +466,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
if err != nil {
return nil, nil, err
}
merged, err := s.merger.MergeResponse(accumulator.Response, currentRes)
merged, err := s.merger.MergeResponse(r, accumulator.Response, currentRes)
if err != nil {
return nil, nil, err
}
Expand All @@ -478,7 +478,7 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
return nil, nil, err
}

response, err := s.merger.MergeResponse(responses...)
response, err := s.merger.MergeResponse(r, responses...)
return response, mergedExtents, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/cortex/querier/queryrange/split_by_interval.go
Expand Up @@ -62,7 +62,7 @@ func (s splitByInterval) Do(ctx context.Context, r Request) (Response, error) {
resps = append(resps, reqResp.Response)
}

response, err := s.merger.MergeResponse(resps...)
response, err := s.merger.MergeResponse(r, resps...)
if err != nil {
return nil, err
}
Expand Down
Expand Up @@ -267,7 +267,7 @@ func TestSplitQuery(t *testing.T) {
}

func TestSplitByDay(t *testing.T) {
mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse)
mergedResponse, err := PrometheusCodec.MergeResponse(nil, parsedResponse, parsedResponse)
require.NoError(t, err)

mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse)
Expand Down
2 changes: 1 addition & 1 deletion pkg/queryfrontend/downsampled.go
Expand Up @@ -85,7 +85,7 @@ forLoop:
break forLoop
}
}
response, err := d.merger.MergeResponse(resps...)
response, err := d.merger.MergeResponse(req, resps...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/queryfrontend/labels_codec.go
Expand Up @@ -48,7 +48,7 @@ func NewThanosLabelsCodec(partialResponse bool, defaultMetadataTimeRange time.Du
}

// MergeResponse merges multiple responses into a single Response. It needs to dedup the responses and ensure the order.
func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
func (c labelsCodec) MergeResponse(_ queryrange.Request, responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
// Empty response for label_names, label_values and series API.
return &ThanosLabelsResponse{
Expand Down
6 changes: 3 additions & 3 deletions pkg/queryfrontend/labels_codec_test.go
Expand Up @@ -513,7 +513,7 @@ func TestLabelsCodec_MergeResponse(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Default partial response value doesn't matter when encoding requests.
codec := NewThanosLabelsCodec(false, time.Hour*2)
r, err := codec.MergeResponse(tc.responses...)
r, err := codec.MergeResponse(nil, tc.responses...)
if tc.expectedError != nil {
testutil.Equals(t, err, tc.expectedError)
} else {
Expand Down Expand Up @@ -677,7 +677,7 @@ func benchmarkMergeResponses(b *testing.B, size int) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, _ = codec.MergeResponse(queryResSeries...)
_, _ = codec.MergeResponse(nil, queryResSeries...)
}
})

Expand All @@ -686,7 +686,7 @@ func benchmarkMergeResponses(b *testing.B, size int) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, _ = codec.MergeResponse(queryResLabel...)
_, _ = codec.MergeResponse(nil, queryResLabel...)
}
})

Expand Down
125 changes: 110 additions & 15 deletions pkg/queryfrontend/queryinstant_codec.go
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"

"github.com/prometheus/prometheus/promql/parser"
promqlparser "github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/internal/cortex/cortexpb"
"github.com/thanos-io/thanos/internal/cortex/querier/queryrange"
cortexutil "github.com/thanos-io/thanos/internal/cortex/util"
Expand All @@ -41,7 +43,7 @@ func NewThanosQueryInstantCodec(partialResponse bool) *queryInstantCodec {
// MergeResponse merges multiple responses into a single response. For instant query
// only vector and matrix responses will be merged because other types of queries
// are not shardable like number literal, string literal, scalar, etc.
func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
func (c queryInstantCodec) MergeResponse(req queryrange.Request, responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
return queryrange.NewEmptyPrometheusInstantQueryResponse(), nil
} else if len(responses) == 1 {
Expand All @@ -68,13 +70,17 @@ func (c queryInstantCodec) MergeResponse(responses ...queryrange.Response) (quer
},
}
default:
v, err := vectorMerge(req, promResponses)
if err != nil {
return nil, err
}
res = &queryrange.PrometheusInstantQueryResponse{
Status: queryrange.StatusSuccess,
Data: queryrange.PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: queryrange.PrometheusInstantQueryResult{
Result: &queryrange.PrometheusInstantQueryResult_Vector{
Vector: vectorMerge(promResponses),
Vector: v,
},
},
Stats: queryrange.StatsMerge(responses),
Expand Down Expand Up @@ -228,7 +234,7 @@ func (c queryInstantCodec) EncodeResponse(ctx context.Context, res queryrange.Re
return &resp, nil
}

func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, _ queryrange.Request) (queryrange.Response, error) {
func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) {
if r.StatusCode/100 != 2 {
body, _ := io.ReadAll(r.Body)
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
Expand All @@ -254,8 +260,13 @@ func (c queryInstantCodec) DecodeResponse(ctx context.Context, r *http.Response,
return &resp, nil
}

func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Vector {
func vectorMerge(req queryrange.Request, resps []*queryrange.PrometheusInstantQueryResponse) (*queryrange.Vector, error) {
output := map[string]*queryrange.Sample{}
metrics := []string{} // Used to preserve the order for topk and bottomk.
sortPlan, err := sortPlanForQuery(req.GetQuery())
if err != nil {
return nil, err
}
for _, resp := range resps {
if resp == nil {
continue
Expand All @@ -273,32 +284,116 @@ func vectorMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange
metric := cortexpb.FromLabelAdaptersToLabels(sample.Labels).String()
if existingSample, ok := output[metric]; !ok {
output[metric] = s
metrics = append(metrics, metric) // Preserve the order of metric.
} else if existingSample.GetSample().TimestampMs < s.GetSample().TimestampMs {
// Choose the latest sample if we see overlap.
output[metric] = s
}
}
}

result := &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0, len(output)),
}

if len(output) == 0 {
return &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0),
return result, nil
}

if sortPlan == mergeOnly {
for _, k := range metrics {
result.Samples = append(result.Samples, output[k])
}
return result, nil
}

keys := make([]string, 0, len(output))
for key := range output {
keys = append(keys, key)
type pair struct {
metric string
s *queryrange.Sample
}
sort.Strings(keys)

result := &queryrange.Vector{
Samples: make([]*queryrange.Sample, 0, len(output)),
samples := make([]*pair, 0, len(output))
for k, v := range output {
samples = append(samples, &pair{
metric: k,
s: v,
})
}
for _, key := range keys {
result.Samples = append(result.Samples, output[key])

sort.Slice(samples, func(i, j int) bool {
// Order is determined by vector
switch sortPlan {
case sortByValuesAsc:
return samples[i].s.Sample.Value < samples[j].s.Sample.Value
case sortByValuesDesc:
return samples[i].s.Sample.Value > samples[j].s.Sample.Value
}
return samples[i].metric < samples[j].metric
})

for _, p := range samples {
result.Samples = append(result.Samples, p.s)
}
return result
return result, nil
}

type sortPlan int

const (
mergeOnly sortPlan = 0
sortByValuesAsc sortPlan = 1
sortByValuesDesc sortPlan = 2
sortByLabels sortPlan = 3
)

func sortPlanForQuery(q string) (sortPlan, error) {
expr, err := promqlparser.ParseExpr(q)
if err != nil {
return 0, err
}
// Check if the root expression is topk or bottomk
if aggr, ok := expr.(*parser.AggregateExpr); ok {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK {
return mergeOnly, nil
}
}
checkForSort := func(expr promqlparser.Expr) (sortAsc, sortDesc bool) {
if n, ok := expr.(*promqlparser.Call); ok {
if n.Func != nil {
if n.Func.Name == "sort" {
sortAsc = true
}
if n.Func.Name == "sort_desc" {
sortDesc = true
}
}
}
return sortAsc, sortDesc
}
// Check the root expression for sort
if sortAsc, sortDesc := checkForSort(expr); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}

// If the root expression is a binary expression, check the LHS and RHS for sort
if bin, ok := expr.(*parser.BinaryExpr); ok {
if sortAsc, sortDesc := checkForSort(bin.LHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
if sortAsc, sortDesc := checkForSort(bin.RHS); sortAsc || sortDesc {
if sortAsc {
return sortByValuesAsc, nil
}
return sortByValuesDesc, nil
}
}
return sortByLabels, nil
}

func matrixMerge(resps []*queryrange.PrometheusInstantQueryResponse) *queryrange.Matrix {
Expand Down