Skip to content

Commit

Permalink
Add test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
fpetkovski committed Dec 6, 2021
1 parent 99e0d67 commit b5a8700
Show file tree
Hide file tree
Showing 11 changed files with 1,074 additions and 200 deletions.
26 changes: 15 additions & 11 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func registerSidecar(app *extkingpin.App) {
RetryInterval: conf.reloader.retryInterval,
})

return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts)
return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts, conf.evaluateQueries)
})
}

Expand All @@ -84,6 +84,7 @@ func runSidecar(
conf sidecarConfig,
grpcLogOpts []grpc_logging.Option,
tagOpts []tags.Option,
evaluateQueries bool,
) error {
httpConfContentYaml, err := conf.prometheus.httpClient.Content()
if err != nil {
Expand Down Expand Up @@ -244,7 +245,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version, evaluateQueries)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -467,15 +468,16 @@ func (s *promMetadata) Version() string {
}

type sidecarConfig struct {
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
http httpConfig
grpc grpcConfig
prometheus prometheusConfig
tsdb tsdbConfig
reloader reloaderConfig
reqLogConfig *extflag.PathOrContent
objStore extflag.PathOrContent
shipper shipperConfig
limitMinTime thanosmodel.TimeOrDurationValue
evaluateQueries bool
}

func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand All @@ -489,4 +491,6 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.shipper.registerFlag(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
cmd.Flag("evaluate-queries", "When set, Thanos sidecar will evaluate expressions that are safe to execute locally before returning series to the Querier.").
BoolVar(&sc.evaluateQueries)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ replace (
github.com/minio/minio-go/v7 => github.com/bwplotka/minio-go/v7 v7.0.11-0.20210324165441-f9927e5255a6
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20211119115433-692a54649ed7
//github.com/prometheus/prometheus => /home/fpetkovski/Projects/prometheus
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.40.0

Expand Down
12 changes: 8 additions & 4 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,14 @@ func aggrsFromFunc(f string) []storepb.Aggr {
func storeHintsFromPromHints(hints *storage.SelectHints) storepb.QueryHints {
return storepb.QueryHints{
StepMillis: hints.Step,
Func: hints.Func,
Grouping: hints.Grouping,
By: hints.By,
Range: hints.Range,
Func: &storepb.Func{
Name: hints.Func,
},
Grouping: &storepb.Grouping{
By: hints.By,
Labels: hints.Grouping,
},
Range: &storepb.Range{Millis: hints.Range},
}
}

Expand Down
19 changes: 15 additions & 4 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"path"
"sort"
"strings"
"sync"
"time"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
Expand Down Expand Up @@ -54,6 +56,8 @@ type PrometheusStore struct {
remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType

framesRead prometheus.Histogram

evaluateQueries bool
}

// Label{Values,Names} call with matchers is supported for Prometheus versions >= 2.24.0.
Expand All @@ -74,6 +78,7 @@ func NewPrometheusStore(
externalLabelsFn func() labels.Labels,
timestamps func() (mint int64, maxt int64),
promVersion func() string,
evaluateQueries bool,
) (*PrometheusStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -98,6 +103,7 @@ func NewPrometheusStore(
Buckets: prometheus.ExponentialBuckets(10, 10, 5),
},
),
evaluateQueries: evaluateQueries,
}
return p, nil
}
Expand Down Expand Up @@ -179,8 +185,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return nil
}

if r.QueryHints.IsFuncDistributive() {
// YOLO(fpetkovski)
if p.evaluateQueries && r.QueryHints.IsSafeToExecute() {
return p.queryPrometheus(s, r)
}

Expand Down Expand Up @@ -226,9 +231,10 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
}

func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error {
ctx := context.Background()
step := math.Max(float64(r.QueryHints.StepMillis/1000), 30)
minTime := math.Max(float64(r.MinTime), float64(r.MaxTime-5*time.Minute.Milliseconds()))
opts := promclient.QueryOptions{}
matrix, _, err := p.client.QueryRange(ctx, p.base, r.ToPromQL(), r.MinTime, r.MaxTime, r.QueryHints.StepMillis/1000, opts)
matrix, _, err := p.client.QueryRange(s.Context(), p.base, r.ToPromQL(), int64(minTime), r.MaxTime, int64(step), opts)
if err != nil {
return err
}
Expand All @@ -241,6 +247,11 @@ func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *store
Value: string(v),
})
}
// Attach external labels for compatibility with remote read
for _, lbl := range p.externalLabelsFn() {
lbls = append(lbls, lbl)
}

series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(lbls),
Samples: make([]prompb.Sample, len(vector.Values)),
Expand Down
27 changes: 10 additions & 17 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,32 +519,25 @@ func (c *SeriesStatsCounter) Count(series *Series) {
}

func (m *SeriesRequest) ToPromQL() string {
grouping := strings.Join(m.QueryHints.Grouping, ",")
matchers := MatchersToString(m.Matchers...)
return fmt.Sprintf("%s %s (%s) (%s)", m.QueryHints.Func, m.QueryHints.ByOrWithout(), grouping, matchers)
return m.QueryHints.toPromQL(m.Matchers)
}

func (m *QueryHints) IsFuncDistributive() bool {
distributiveFuncs := []string{
// IsSafeToExecute returns true if the function or aggregation from the query hint
// can be safely executed by the underlying Prometheus instance without affecting the
// result of the query.
func (m *QueryHints) IsSafeToExecute() bool {
distributiveOperations := []string{
"max",
"max_over_time",
"min",
"min_over_time",
"group",
"topk",
"bottomk",
"count_values",
}
for _, op := range distributiveFuncs {
if m.Func == op {
for _, op := range distributiveOperations {
if m.Func.Name == op {
return true
}
}

return false
}

func (m *QueryHints) ByOrWithout() string {
if m.By {
return "by"
}
return "without"
}
96 changes: 79 additions & 17 deletions pkg/store/storepb/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"sort"
"testing"
"time"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -544,35 +545,90 @@ func TestSeriesRequestToPromQL(t *testing.T) {
},
},
QueryHints: QueryHints{
Func: "max",
Grouping: []string{"container", "pod"},
By: false,
Range: 0,
Func: &Func{
Name: "max",
},
},
},
expected: `max ({namespace=~"kube-.+"})`,
},
{
name: "Single matcher regular expression with grouping",
r: &SeriesRequest{
Matchers: []LabelMatcher{
{
Type: LabelMatcher_RE,
Name: "namespace",
Value: "kube-.+",
},
},
QueryHints: QueryHints{
Func: &Func{
Name: "max",
},
Grouping: &Grouping{
By: false,
Labels: []string{"container", "pod"},
},
},
},
expected: `max without (container,pod) ({namespace=~"kube-.+"})`,
},
{
name: "Single matcher with __name__ label",
name: "Multiple matchers with grouping",
r: &SeriesRequest{
Matchers: []LabelMatcher{
{
Type: LabelMatcher_EQ,
Name: "__name__",
Value: "kube_pod_info",
},
{
Type: LabelMatcher_RE,
Name: "namespace",
Value: "kube-.+",
},
},
QueryHints: QueryHints{
Func: &Func{
Name: "max",
},
Grouping: &Grouping{
By: false,
Labels: []string{"container", "pod"},
},
},
},
expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"})`,
},
{
name: "Query with vector range selector",
r: &SeriesRequest{
Matchers: []LabelMatcher{
{
Type: LabelMatcher_EQ,
Name: "__name__",
Value: "kube_pod_info",
},
{
Type: LabelMatcher_RE,
Name: "namespace",
Value: "kube-.+",
},
},
QueryHints: QueryHints{
Func: "max",
Grouping: []string{"container", "pod"},
By: false,
Range: 0,
Func: &Func{
Name: "max_over_time",
},
Range: &Range{
Millis: 10 * time.Minute.Milliseconds(),
},
},
},
expected: `max without (container,pod) ({__name__="kube_pod_info"})`,
expected: `max_over_time ({__name__="kube_pod_info", namespace=~"kube-.+"}[600000ms])`,
},
{
name: "Multiple matchers",
name: "Query with grouping and vector range selector",
r: &SeriesRequest{
Matchers: []LabelMatcher{
{
Expand All @@ -587,18 +643,24 @@ func TestSeriesRequestToPromQL(t *testing.T) {
},
},
QueryHints: QueryHints{
Func: "max",
Grouping: []string{"container", "pod"},
By: false,
Range: 0,
Func: &Func{
Name: "max",
},
Grouping: &Grouping{
By: false,
Labels: []string{"container", "pod"},
},
Range: &Range{
Millis: 10 * time.Minute.Milliseconds(),
},
},
},
expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"})`,
expected: `max without (container,pod) ({__name__="kube_pod_info", namespace=~"kube-.+"}[600000ms])`,
},
}

for _, tc := range ts {
t.Run(t.Name(), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
actual := tc.r.ToPromQL()
if tc.expected != actual {
t.Fatalf("invalid promql result, got %s, want %s", actual, tc.expected)
Expand Down
45 changes: 45 additions & 0 deletions pkg/store/storepb/query_hints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package storepb

import (
"fmt"
"strings"
)

func (m *QueryHints) toPromQL(labelMatchers []LabelMatcher) string {
grouping := m.Grouping.toPromQL()
matchers := MatchersToString(labelMatchers...)
queryRange := m.Range.toPromQL()

query := fmt.Sprintf("%s %s (%s%s)", m.Func.Name, grouping, matchers, queryRange)
// Remove double spaces if some expressions are missing
return strings.Join(strings.Fields(query), " ")
}

func (m *Grouping) toPromQL() string {
if m == nil {
return ""
}

if len(m.Labels) == 0 {
return ""
}
var op string
if m.By {
op = "by"
} else {
op = "without"
}

return fmt.Sprintf("%s (%s)", op, strings.Join(m.Labels, ","))
}

func (m *Range) toPromQL() string {
if m == nil {
return ""
}

if m.Millis == 0 {
return ""
}
return fmt.Sprintf("[%dms]", m.Millis)
}

0 comments on commit b5a8700

Please sign in to comment.