Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query frontend, query UI: Native histogram support #6071

Merged
merged 6 commits into from Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -37,6 +37,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6228](https://github.com/thanos-io/thanos/pull/6228) Conditionally generate debug messages in ProxyStore to avoid memory bloat.
- [#6231](https://github.com/thanos-io/thanos/pull/6231) mixins: Add code/grpc-code dimension to error widgets.
- [#6244](https://github.com/thanos-io/thanos/pull/6244) mixin(Rule): Add rule evaluation failures to the Rule dashboard.
- [#6071](https://github.com/thanos-io/thanos/pull/6071) Query Frontend: *breaking :warning:* Add experimental native histogram support for which we updated and aligned with the [Prometheus common](https://github.com/prometheus/common) model, which is used for caching so a cache reset required.

### Removed

Expand Down
64 changes: 64 additions & 0 deletions internal/cortex/querier/queryrange/compat.go
@@ -0,0 +1,64 @@
// Copyright (c) The Cortex Authors.
// Licensed under the Apache License 2.0.

package queryrange

import (
"github.com/prometheus/common/model"
)

// The following code will allow us to use JSON marshal and unmarshal functions from the Prometheus common package in
// query_range.go: https://github.com/prometheus/common/blob/846591a166358c7048ef197e84501ca688dda920/model/value.go
// Please see the link above for more details on Sample, SampleStream, HistogramPair and SampleHistogramPair.

func toModelSampleHistogramPair(s SampleHistogramPair) model.SampleHistogramPair {
return model.SampleHistogramPair{
Timestamp: model.Time(s.Timestamp),
Histogram: toModelSampleHistogram(s.Histogram),
}
}

func fromModelSampleHistogramPair(modelSampleHistogram model.SampleHistogramPair) (s SampleHistogramPair) {
return SampleHistogramPair{
Timestamp: int64(modelSampleHistogram.Timestamp),
Histogram: fromModelSampleHistogram(modelSampleHistogram.Histogram),
}
}

func fromModelSampleHistogram(modelSampleHistogram *model.SampleHistogram) (s SampleHistogram) {
buckets := make([]*HistogramBucket, len(modelSampleHistogram.Buckets))

for i, b := range modelSampleHistogram.Buckets {
buckets[i] = &HistogramBucket{
Lower: float64(b.Lower),
Upper: float64(b.Upper),
Count: float64(b.Count),
Boundaries: b.Boundaries,
}
}

return SampleHistogram{
Count: float64(modelSampleHistogram.Count),
Sum: float64(modelSampleHistogram.Sum),
Buckets: buckets,
}
}

func toModelSampleHistogram(s SampleHistogram) *model.SampleHistogram {
modelBuckets := make([]*model.HistogramBucket, len(s.Buckets))

for i, b := range s.Buckets {
modelBuckets[i] = &model.HistogramBucket{
Lower: model.FloatString(b.Lower),
Upper: model.FloatString(b.Upper),
Count: model.FloatString(b.Count),
Boundaries: b.Boundaries,
}
}

return &model.SampleHistogram{
Count: model.FloatString(s.Count),
Sum: model.FloatString(s.Sum),
Buckets: modelBuckets,
}
}
139 changes: 110 additions & 29 deletions internal/cortex/querier/queryrange/query_range.go
Expand Up @@ -154,10 +154,26 @@ func (resp *PrometheusResponse) minTime() int64 {
if len(result) == 0 {
return -1
}
if len(result[0].Samples) == 0 {
if len(result[0].Samples) == 0 && len(result[0].Histograms) == 0 {
return -1
}
return result[0].Samples[0].TimestampMs

if len(result[0].Samples) == 0 {
return result[0].Histograms[0].Timestamp
}

if len(result[0].Histograms) == 0 {
return result[0].Samples[0].TimestampMs
}

return minInt64(result[0].Samples[0].TimestampMs, result[0].Histograms[0].Timestamp)
}

func minInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}

func (resp *PrometheusResponse) GetStats() *PrometheusResponseStats {
Expand Down Expand Up @@ -397,54 +413,86 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
return &resp, nil
}

// UnmarshalJSON implements json.Unmarshaler.
// UnmarshalJSON implements json.Unmarshaler and is used for unmarshalling
// a Prometheus range query response (matrix).
func (s *SampleStream) UnmarshalJSON(data []byte) error {
var stream struct {
Metric model.Metric `json:"metric"`
Values []cortexpb.Sample `json:"values"`
}
if err := json.Unmarshal(data, &stream); err != nil {
var sampleStream model.SampleStream
if err := json.Unmarshal(data, &sampleStream); err != nil {
return err
}
s.Labels = cortexpb.FromMetricsToLabelAdapters(stream.Metric)
s.Samples = stream.Values

s.Labels = cortexpb.FromMetricsToLabelAdapters(sampleStream.Metric)

if len(sampleStream.Values) > 0 {
s.Samples = make([]cortexpb.Sample, 0, len(sampleStream.Values))
for _, sample := range sampleStream.Values {
s.Samples = append(s.Samples, cortexpb.Sample{
Value: float64(sample.Value),
TimestampMs: int64(sample.Timestamp),
})
}
}

if len(sampleStream.Histograms) > 0 {
s.Histograms = make([]SampleHistogramPair, 0, len(sampleStream.Histograms))
for _, h := range sampleStream.Histograms {
s.Histograms = append(s.Histograms, fromModelSampleHistogramPair(h))
}
}

return nil
}

// MarshalJSON implements json.Marshaler.
func (s *SampleStream) MarshalJSON() ([]byte, error) {
stream := struct {
Metric model.Metric `json:"metric"`
Values []cortexpb.Sample `json:"values"`
}{
Metric: cortexpb.FromLabelAdaptersToMetric(s.Labels),
Values: s.Samples,
var sampleStream model.SampleStream
sampleStream.Metric = cortexpb.FromLabelAdaptersToMetric(s.Labels)

sampleStream.Values = make([]model.SamplePair, 0, len(s.Samples))
for _, sample := range s.Samples {
sampleStream.Values = append(sampleStream.Values, model.SamplePair{
Value: model.SampleValue(sample.Value),
Timestamp: model.Time(sample.TimestampMs),
})
}

sampleStream.Histograms = make([]model.SampleHistogramPair, 0, len(s.Histograms))
for _, h := range s.Histograms {
sampleStream.Histograms = append(sampleStream.Histograms, toModelSampleHistogramPair(h))
}
return json.Marshal(stream)

return json.Marshal(sampleStream)
}

// UnmarshalJSON implements json.Unmarshaler.
// UnmarshalJSON implements json.Unmarshaler and is used for unmarshalling
// a Prometheus instant query response (vector).
func (s *Sample) UnmarshalJSON(data []byte) error {
var sample struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}
var sample model.Sample
if err := json.Unmarshal(data, &sample); err != nil {
return err
}
s.Labels = cortexpb.FromMetricsToLabelAdapters(sample.Metric)
s.Sample = sample.Value
s.SampleValue = float64(sample.Value)
s.Timestamp = int64(sample.Timestamp)

if sample.Histogram != nil {
sh := fromModelSampleHistogram(sample.Histogram)
s.Histogram = &sh
} else {
s.Histogram = nil
}

return nil
}

// MarshalJSON implements json.Marshaler.
func (s *Sample) MarshalJSON() ([]byte, error) {
sample := struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}{
Metric: cortexpb.FromLabelAdaptersToMetric(s.Labels),
Value: s.Sample,
var sample model.Sample
sample.Metric = cortexpb.FromLabelAdaptersToMetric(s.Labels)
sample.Value = model.SampleValue(s.SampleValue)
sample.Timestamp = model.Time(s.Timestamp)
if s.Histogram != nil {
sample.Histogram = toModelSampleHistogram(*s.Histogram)
}
return json.Marshal(sample)
}
Expand Down Expand Up @@ -657,7 +705,20 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream {
stream.Samples = SliceSamples(stream.Samples, existingEndTs)
} // else there is no overlap, yay!
}
// Same for histograms as for samples above.
if len(existing.Histograms) > 0 && len(stream.Histograms) > 0 {
existingEndTs := existing.Histograms[len(existing.Histograms)-1].GetTimestamp()
if existingEndTs == stream.Histograms[0].GetTimestamp() {
stream.Histograms = stream.Histograms[1:]
} else if existingEndTs > stream.Histograms[0].GetTimestamp() {
stream.Histograms = SliceHistogram(stream.Histograms, existingEndTs)
}
}

existing.Samples = append(existing.Samples, stream.Samples...)

existing.Histograms = append(existing.Histograms, stream.Histograms...)

output[metric] = existing
}
}
Expand Down Expand Up @@ -696,6 +757,26 @@ func SliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample {
return samples[searchResult:]
}

// SliceHistogram assumes given histogram are sorted by timestamp in ascending order and
// return a sub slice whose first element's is the smallest timestamp that is strictly
// bigger than the given minTs. Empty slice is returned if minTs is bigger than all the
// timestamps in histogram.
func SliceHistogram(histograms []SampleHistogramPair, minTs int64) []SampleHistogramPair {
if len(histograms) <= 0 || minTs < histograms[0].GetTimestamp() {
return histograms
}

if len(histograms) > 0 && minTs > histograms[len(histograms)-1].GetTimestamp() {
return histograms[len(histograms):]
}

searchResult := sort.Search(len(histograms), func(i int) bool {
return histograms[i].GetTimestamp() > minTs
})

return histograms[searchResult:]
}

func parseDurationMs(s string) (int64, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second/time.Millisecond)
Expand Down
14 changes: 11 additions & 3 deletions internal/cortex/querier/queryrange/query_range_test.go
Expand Up @@ -86,15 +86,17 @@ func TestRequest(t *testing.T) {
}

func TestResponse(t *testing.T) {
r := *parsedResponse
r.Headers = respHeaders
for i, tc := range []struct {
body string
expected *PrometheusResponse
}{
{
body: responseBody,
expected: &r,
expected: withHeaders(parsedResponse, respHeaders),
},
{
body: histogramResponseBody,
expected: withHeaders(parsedHistogramResponse, respHeaders),
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
Expand Down Expand Up @@ -667,3 +669,9 @@ func mustParse(t *testing.T, response string) Response {
require.NoError(t, json.Unmarshal([]byte(response), &resp))
return &resp
}

func withHeaders(response *PrometheusResponse, headers []*PrometheusResponseHeader) *PrometheusResponse {
r := *response
r.Headers = headers
return &r
}