forked from grafana/loki
/
value.go
125 lines (108 loc) · 3.42 KB
/
value.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package queryrangebase
import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/pao214/loki/v2/pkg/logproto"
"github.com/pao214/loki/v2/pkg/querier/series"
)
// FromResult transforms a promql query result into a samplestream
func FromResult(res *promql.Result) ([]SampleStream, error) {
if res.Err != nil {
// The error could be wrapped by the PromQL engine. We get the error's cause in order to
// correctly parse the error in parent callers (eg. gRPC response status code extraction).
return nil, errors.Cause(res.Err)
}
switch v := res.Value.(type) {
case promql.Scalar:
return []SampleStream{
{
Samples: []logproto.LegacySample{
{
Value: v.V,
TimestampMs: v.T,
},
},
},
}, nil
case promql.Vector:
res := make([]SampleStream, 0, len(v))
for _, sample := range v {
res = append(res, SampleStream{
Labels: mapLabels(sample.Metric),
Samples: mapPoints(sample.Point),
})
}
return res, nil
case promql.Matrix:
res := make([]SampleStream, 0, len(v))
for _, series := range v {
res = append(res, SampleStream{
Labels: mapLabels(series.Metric),
Samples: mapPoints(series.Points...),
})
}
return res, nil
}
return nil, errors.Errorf("Unexpected value type: [%s]", res.Value.Type())
}
func mapLabels(ls labels.Labels) []logproto.LabelAdapter {
result := make([]logproto.LabelAdapter, 0, len(ls))
for _, l := range ls {
result = append(result, logproto.LabelAdapter(l))
}
return result
}
func mapPoints(pts ...promql.Point) []logproto.LegacySample {
result := make([]logproto.LegacySample, 0, len(pts))
for _, pt := range pts {
result = append(result, logproto.LegacySample{
Value: pt.V,
TimestampMs: pt.T,
})
}
return result
}
// ResponseToSamples is needed to map back from api response to the underlying series data
func ResponseToSamples(resp Response) ([]SampleStream, error) {
promRes, ok := resp.(*PrometheusResponse)
if !ok {
return nil, errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})
}
if promRes.Error != "" {
return nil, errors.New(promRes.Error)
}
switch promRes.Data.ResultType {
case string(parser.ValueTypeVector), string(parser.ValueTypeMatrix):
return promRes.Data.Result, nil
}
return nil, errors.Errorf(
"Invalid promql.Value type: [%s]. Only %s and %s supported",
promRes.Data.ResultType,
parser.ValueTypeVector,
parser.ValueTypeMatrix,
)
}
// NewSeriesSet returns an in memory storage.SeriesSet from a []SampleStream
// As NewSeriesSet uses NewConcreteSeriesSet to implement SeriesSet, result will be sorted by label names.
func NewSeriesSet(results []SampleStream) storage.SeriesSet {
set := make([]storage.Series, 0, len(results))
for _, stream := range results {
samples := make([]model.SamplePair, 0, len(stream.Samples))
for _, sample := range stream.Samples {
samples = append(samples, model.SamplePair{
Timestamp: model.Time(sample.TimestampMs),
Value: model.SampleValue(sample.Value),
})
}
ls := make([]labels.Label, 0, len(stream.Labels))
for _, l := range stream.Labels {
ls = append(ls, labels.Label(l))
}
set = append(set, series.NewConcreteSeries(ls, samples))
}
return series.NewConcreteSeriesSet(set)
}