Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Frontend: add query instant tripperware for query sharding #5561

Merged
merged 9 commits into from Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cortex/cortexpb/cortex.proto
Expand Up @@ -7,7 +7,7 @@ package cortexpb;

option go_package = "cortexpb";

import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "gogoproto/gogo.proto";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can do this rewrite in go.mod or with some sed magic in the Makefile.

My concern is that the change can be lost the next time we update Cortex.


option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
Expand Down
138 changes: 130 additions & 8 deletions internal/cortex/querier/queryrange/query_range.go
Expand Up @@ -22,6 +22,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -103,6 +104,8 @@ type Response interface {
proto.Message
// GetHeaders returns the HTTP headers in the response.
GetHeaders() []*PrometheusResponseHeader
// GetStats returns the Prometheus query stats in the response.
GetStats() *PrometheusResponseStats
}

type prometheusCodec struct{}
Expand Down Expand Up @@ -156,6 +159,14 @@ func (resp *PrometheusResponse) minTime() int64 {
return result[0].Samples[0].TimestampMs
}

func (resp *PrometheusResponse) GetStats() *PrometheusResponseStats {
return resp.Data.Stats
}

func (resp *PrometheusInstantQueryResponse) GetStats() *PrometheusResponseStats {
return resp.Data.Stats
}

// NewEmptyPrometheusResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusResponse() *PrometheusResponse {
return &PrometheusResponse{
Expand All @@ -167,6 +178,19 @@ func NewEmptyPrometheusResponse() *PrometheusResponse {
}
}

// NewEmptyPrometheusInstantQueryResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse {
return &PrometheusInstantQueryResponse{
Status: StatusSuccess,
Data: PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Samples{},
},
},
}
}

func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
if len(responses) == 0 {
return NewEmptyPrometheusResponse(), nil
Expand All @@ -189,7 +213,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
Result: matrixMerge(promResponses),
Stats: statsMerge(promResponses),
Stats: StatsMerge(responses),
},
}

Expand Down Expand Up @@ -302,7 +326,7 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

buf, err := bodyBuffer(r)
buf, err := BodyBuffer(r)
if err != nil {
log.Error(err)
return nil, err
Expand All @@ -326,7 +350,7 @@ type Buffer interface {
Bytes() []byte
}

func bodyBuffer(res *http.Response) ([]byte, error) {
func BodyBuffer(res *http.Response) ([]byte, error) {
// Attempt to cast the response body to a Buffer and use it if possible.
// This is because the frontend may have already read the body and buffered it.
if buffer, ok := res.Body.(Buffer); ok {
Expand Down Expand Up @@ -398,22 +422,120 @@ func (s *SampleStream) MarshalJSON() ([]byte, error) {
return json.Marshal(stream)
}

// statsMerge merge the stats from 2 responses
// UnmarshalJSON implements json.Unmarshaler.
func (s *Sample) UnmarshalJSON(data []byte) error {
var sample struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}
if err := json.Unmarshal(data, &sample); err != nil {
return err
}
s.Labels = cortexpb.FromMetricsToLabelAdapters(sample.Metric)
s.Sample = sample.Value
return nil
}

// MarshalJSON implements json.Marshaler.
func (s *Sample) MarshalJSON() ([]byte, error) {
sample := struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}{
Metric: cortexpb.FromLabelAdaptersToMetric(s.Labels),
Value: s.Sample,
}
return json.Marshal(sample)
}

// UnmarshalJSON implements json.Unmarshaler.
func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {
var queryData struct {
ResultType string `json:"resultType"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}

if err := json.Unmarshal(data, &queryData); err != nil {
return err
}
s.ResultType = queryData.ResultType
s.Stats = queryData.Stats
switch s.ResultType {
case model.ValVector.String():
var result struct {
Vector []*Sample `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Samples{Samples: &Vector{
Result: result.Vector,
}},
}
case model.ValMatrix.String():
return errors.New("matrix result type not supported for PrometheusInstantQueryData")
default:
var result struct {
Sample cortexpb.Sample `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Sample{Sample: &result.Sample},
}
}
return nil
}

// MarshalJSON implements json.Marshaler.
func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
if s.ResultType == model.ValVector.String() {
res := struct {
ResultType string `json:"resultType"`
Data []*Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetSamples().Result,
Stats: s.Stats,
}
return json.Marshal(res)
} else if s.ResultType == model.ValMatrix.String() {
return nil, errors.New("matrix result type not supported for PrometheusInstantQueryData")
}

// Other cases only include scalar and string.
res := struct {
ResultType string `json:"resultType"`
Data *cortexpb.Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetSample(),
Stats: s.Stats,
}
return json.Marshal(res)
}

// StatsMerge merge the stats from 2 responses
// this function is similar to matrixMerge
func statsMerge(resps []*PrometheusResponse) *PrometheusResponseStats {
func StatsMerge(resps []Response) *PrometheusResponseStats {
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
hasStats := false
for _, resp := range resps {
if resp.Data.Stats == nil {
stats := resp.GetStats()
if stats == nil {
continue
}

hasStats = true
if resp.Data.Stats.Samples == nil {
if stats.Samples == nil {
continue
}

for _, s := range resp.Data.Stats.Samples.TotalQueryableSamplesPerStep {
for _, s := range stats.Samples.TotalQueryableSamplesPerStep {
output[s.GetTimestampMs()] = s
}
}
Expand Down