Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Promql changes to add support to extended functions throught Thanos #7338

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/cl
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\
github.com/NYTimes/gziphandler.{GzipHandler}=github.com/klauspost/compress/gzhttp.{GzipHandler},\
sync/atomic=go.uber.org/atomic,github.com/cortexproject/cortex=github.com/thanos-io/thanos/internal/cortex,\
github.com/prometheus/prometheus/promql/parser.{ParseExpr,ParseMetricSelector}=github.com/thanos-io/thanos/pkg/extpromql.{ParseExpr,ParseMetricSelector},\
io/ioutil.{Discard,NopCloser,ReadAll,ReadDir,ReadFile,TempDir,TempFile,Writefile}" $(shell go list ./... | grep -v "internal/cortex")
@$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./...
@echo ">> linting all of the Go files GOGC=${GOGC}"
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/info"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
Expand Down Expand Up @@ -951,7 +952,7 @@ func queryFuncCreator(
queryAPIClients := grpcEndpointSet.GetQueryAPIClients()
for _, i := range rand.Perm(len(queryAPIClients)) {
e := query.NewRemoteEngine(logger, queryAPIClients[i], query.Opts{})
expr, err := parser.ParseExpr(qs)
expr, err := extpromql.ParseExpr(qs)
if err != nil {
level.Error(logger).Log("err", err, "query", qs)
continue
Expand Down
5 changes: 3 additions & 2 deletions internal/cortex/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"flag"
"fmt"
"github.com/thanos-io/thanos/pkg/extpromql"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -325,7 +326,7 @@ func (s resultsCache) isAtModifierCachable(r Request, maxCacheTime int64) bool {
if !strings.Contains(query, "@") {
return true
}
expr, err := parser.ParseExpr(query)
expr, err := extpromql.ParseExpr(query)
if err != nil {
// We are being pessimistic in such cases.
level.Warn(s.logger).Log("msg", "failed to parse query, considering @ modifier as not cachable", "query", query, "err", err)
Expand Down Expand Up @@ -370,7 +371,7 @@ func (s resultsCache) isOffsetCachable(r Request) bool {
if !strings.Contains(query, "offset") {
return true
}
expr, err := parser.ParseExpr(query)
expr, err := extpromql.ParseExpr(query)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to parse query, considering offset as not cachable", "query", query, "err", err)
return false
Expand Down
3 changes: 2 additions & 1 deletion internal/cortex/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package queryrange

import (
"context"
"github.com/thanos-io/thanos/pkg/extpromql"
"net/http"
"time"

Expand Down Expand Up @@ -97,7 +98,7 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
// If the modifier is already a constant, it will be returned as is.
func EvaluateAtModifierFunction(query string, start, end int64) (string, error) {
expr, err := parser.ParseExpr(query)
expr, err := extpromql.ParseExpr(query)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, `{"status": "error", "error": "%s"}`, err)
}
Expand Down
9 changes: 7 additions & 2 deletions internal/cortex/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package queryrange

import (
"context"
"github.com/thanos-io/thanos/pkg/extpromql"
io "io"
"net/http"
"net/http/httptest"
Expand All @@ -13,7 +14,6 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -332,6 +332,11 @@ func Test_evaluateAtModifier(t *testing.T) {
in: "topk(5, rate(http_requests_total[1h] @ start()))",
expected: "topk(5, rate(http_requests_total[1h] @ 1546300.800))",
},
{
// extended functions
in: "topk(5, xrate(http_requests_total[1h] @ start()))",
expected: "topk(5, xrate(http_requests_total[1h] @ 1546300.800))",
},
{
in: "topk(5, rate(http_requests_total[1h] @ 0))",
expected: "topk(5, rate(http_requests_total[1h] @ 0.000))",
Expand Down Expand Up @@ -390,7 +395,7 @@ func Test_evaluateAtModifier(t *testing.T) {
require.Equal(t, tt.expectedErrorCode, int(httpResp.Code))
} else {
require.NoError(t, err)
expectedExpr, err := parser.ParseExpr(tt.expected)
expectedExpr, err := extpromql.ParseExpr(tt.expected)
require.NoError(t, err)
require.Equal(t, expectedExpr.String(), out)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/query/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/logicalplan"
equery "github.com/thanos-io/promql-engine/query"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store"
)
Expand All @@ -36,7 +36,7 @@ func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
}
api := NewGRPCAPI(time.Now, nil, queryableCreator, engineFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)

expr, err := parser.ParseExpr("metric")
expr, err := extpromql.ParseExpr("metric")
testutil.Ok(t, err)
lplan := logicalplan.NewFromAST(expr, &equery.Options{}, logicalplan.PlanOptions{})
testutil.Ok(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
Expand Down Expand Up @@ -374,7 +375,7 @@ func (qapi *QueryAPI) parseStoreDebugMatchersParam(r *http.Request) (storeMatche
}

for _, s := range r.Form[StoreMatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
matchers, err := extpromql.ParseMetricSelector(s)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil, false)
}, nil, true)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down Expand Up @@ -749,6 +749,30 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
QueryAnalysis: queryTelemetry{},
},
},
{
endpoint: api.queryRange,
query: url.Values{
"query": []string{"xrate(up[2m])"},
"start": []string{"0"},
"end": []string{"500"},
},
response: &queryData{
ResultType: parser.ValueTypeMatrix,
Result: promql.Matrix{
promql.Series{
Floats: func(end, step float64) []promql.FPoint {
var res []promql.FPoint
for v := float64(0); v <= end; v += step {
res = append(res, promql.FPoint{F: v, T: timestamp.FromTime(start.Add(time.Duration(v) * time.Second))})
}
return res
}(500, 1),
Metric: nil,
},
},
QueryAnalysis: queryTelemetry{},
},
},
{
endpoint: api.queryRange,
query: url.Values{
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tombstones"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/runutil"
)

Expand Down Expand Up @@ -136,7 +136,7 @@ type Rewrite struct {
type Matchers []*labels.Matcher

func (m *Matchers) UnmarshalYAML(value *yaml.Node) (err error) {
*m, err = parser.ParseMetricSelector(value.Value)
*m, err = extpromql.ParseMetricSelector(value.Value)
if err != nil {
return errors.Wrapf(err, "parse metric selector %v", value.Value)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/exemplars/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exemplars
import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/extpromql"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -26,7 +27,7 @@ func NewMultiTSDB(tsdbExemplarsServers func() map[string]*TSDB) *MultiTSDB {

// Exemplars returns all specified exemplars from a MultiTSDB instance.
func (m *MultiTSDB) Exemplars(r *exemplarspb.ExemplarsRequest, s exemplarspb.Exemplars_ExemplarsServer) error {
expr, err := parser.ParseExpr(r.Query)
expr, err := extpromql.ParseExpr(r.Query)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/exemplars/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
span, ctx := tracing.StartSpan(srv.Context(), "proxy_exemplars")
defer span.Finish()

expr, err := parser.ParseExpr(req.Query)
expr, err := extpromql.ParseExpr(req.Query)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/exemplars/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -21,9 +22,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down Expand Up @@ -54,7 +54,7 @@ func (t *testExemplarClient) Recv() (*exemplarspb.ExemplarsResponse, error) {
}

func (t *testExemplarClient) Exemplars(ctx context.Context, in *exemplarspb.ExemplarsRequest, opts ...grpc.CallOption) (exemplarspb.Exemplars_ExemplarsClient, error) {
expr, err := parser.ParseExpr(in.Query)
expr, err := extpromql.ParseExpr(in.Query)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
35 changes: 34 additions & 1 deletion pkg/extpromql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,45 @@
package extpromql

import (
"fmt"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/promql-engine/execution/function"
)

func ParserExpr(input string) (parser.Expr, error) {
// ParseExpr parses the input PromQL expression and returns the parsed representation.
func ParseExpr(input string) (parser.Expr, error) {
p := parser.NewParser(input, parser.WithFunctions(function.XFunctions))
defer p.Close()
return p.ParseExpr()
}

// ParseMetricSelector parses the provided textual metric selector into a list of
// label matchers.
func ParseMetricSelector(input string) ([]*labels.Matcher, error) {
// Parse the input string as a PromQL expression.
expr, err := ParseExpr(input)
if err != nil {
return nil, err
}

// The type of the expression should be *parser.VectorSelector.
vs, ok := expr.(*parser.VectorSelector)
if !ok {
return nil, fmt.Errorf("expected type *parser.VectorSelector, got %T", expr)
}

// Convert the label matchers from the vector selector to the desired type.
matchers := make([]*labels.Matcher, len(vs.LabelMatchers))
for i, lm := range vs.LabelMatchers {
matchers[i] = &labels.Matcher{
Type: labels.MatchType(lm.Type),
Name: lm.Name,
Value: lm.Value,
}
}

return matchers, nil
}
64 changes: 64 additions & 0 deletions pkg/extpromql/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extpromql_test

import (
"fmt"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/thanos/pkg/extpromql"
)

func TestParseMetricSelector(t *testing.T) {
testCases := []struct {
name string
input string
}{
{
name: "single selector",
input: `http_requests_total{method="GET"}`,
},
{
name: "empty selectors",
input: `process_cpu_seconds_total`,
},
{
name: "multiple selectors",
input: `http_requests_total{method="GET",code="200"}`,
},
{
name: "multiple selectors with different matchers",
input: `http_requests_total{method="GET",code!="200"}`,
},
{
name: "multiple selectors with regex",
input: `http_requests_total{method="GET",code=~"2.*"}`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
//lint:ignore faillint Testing against prometheus parser.
want, err := parser.ParseMetricSelector(tc.input)
if err != nil {
t.Fatalf("Prometheus ParseMetricSelector failed: %v", err)
}

got, err := extpromql.ParseMetricSelector(tc.input)
if err != nil {
t.Fatalf("ParseMetricSelector failed: %v", err)
}

testutil.Equals(t, stringFmt(want), stringFmt(got))
})
}
}

func stringFmt(got []*labels.Matcher) string {
return fmt.Sprintf("%v", got)
}