From dd6781add2390eb2f2251e108f2ef835eba44b05 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Mon, 4 Jun 2018 14:47:45 +0100 Subject: [PATCH] Optimise PromQL (#3966) * Move range logic to 'eval' Signed-off-by: Brian Brazil * Make aggregegate range aware Signed-off-by: Brian Brazil * PromQL is statically typed, so don't eval to find the type. Signed-off-by: Brian Brazil * Extend rangewrapper to multiple exprs Signed-off-by: Brian Brazil * Start making function evaluation ranged Signed-off-by: Brian Brazil * Make instant queries a special case of range queries Signed-off-by: Brian Brazil * Eliminate evalString Signed-off-by: Brian Brazil * Evaluate range vector functions one series at a time Signed-off-by: Brian Brazil * Make unary operators range aware Signed-off-by: Brian Brazil * Make binops range aware Signed-off-by: Brian Brazil * Pass time to range-aware functions. Signed-off-by: Brian Brazil * Make simple _over_time functions range aware Signed-off-by: Brian Brazil * Reduce allocs when working with matrix selectors Signed-off-by: Brian Brazil * Add basic benchmark for range evaluation Signed-off-by: Brian Brazil * Reuse objects for function arguments Signed-off-by: Brian Brazil * Do dropmetricname and allocating output vector only once. Signed-off-by: Brian Brazil * Add range-aware support for range vector functions with params Signed-off-by: Brian Brazil * Optimise holt_winters, cut cpu and allocs by ~25% Signed-off-by: Brian Brazil * Make rate&friends range aware Signed-off-by: Brian Brazil * Make more functions range aware. Document calling convention. Signed-off-by: Brian Brazil * Make date functions range aware Signed-off-by: Brian Brazil * Make simple math functions range aware Signed-off-by: Brian Brazil * Convert more functions to be range aware Signed-off-by: Brian Brazil * Make more functions range aware Signed-off-by: Brian Brazil * Specialcase timestamp() with vector selector arg for range awareness Signed-off-by: Brian Brazil * Remove transition code for functions Signed-off-by: Brian Brazil * Remove the rest of the engine transition code Signed-off-by: Brian Brazil * Remove more obselete code Signed-off-by: Brian Brazil * Remove the last uses of the eval* functions Signed-off-by: Brian Brazil * Remove engine finalizers to prevent corruption The finalizers set by matrixSelector were being called just before the value they were retruning to the pool was then being provided to the caller. Thus a concurrent query could corrupt the data that the user has just been returned. Signed-off-by: Brian Brazil * Add new benchmark suite for range functinos Signed-off-by: Brian Brazil * Migrate existing benchmarks to new system Signed-off-by: Brian Brazil * Expand promql benchmarks Signed-off-by: Brian Brazil * Simply test by removing unused range code Signed-off-by: Brian Brazil * When testing instant queries, check range queries too. To protect against subsequent steps in a range query being affected by the previous steps, add a test that evaluates an instant query that we know works again as a range query with the tiimestamp we care about not being the first step. Signed-off-by: Brian Brazil * Reuse ring for matrix iters. Put query results back in pool. Signed-off-by: Brian Brazil * Reuse buffer when iterating over matrix selectors Signed-off-by: Brian Brazil * Unary minus should remove metric name Cut down benchmarks for faster runs. Signed-off-by: Brian Brazil * Reduce repetition in benchmark test cases Signed-off-by: Brian Brazil * Work series by series when doing normal vectorSelectors Signed-off-by: Brian Brazil * Optimise benchmark setup, cuts time by 60% Signed-off-by: Brian Brazil * Have rangeWrapper use an evalNodeHelper to cache across steps Signed-off-by: Brian Brazil * Use evalNodeHelper with functions Signed-off-by: Brian Brazil * Cache dropMetricName within a node evaluation. This saves both the calculations and allocs done by dropMetricName across steps. Signed-off-by: Brian Brazil * Reuse input vectors in rangewrapper Signed-off-by: Brian Brazil * Reuse the point slices in the matrixes input/output by rangeWrapper Signed-off-by: Brian Brazil * Make benchmark setup faster using AddFast Signed-off-by: Brian Brazil * Simplify benchmark code. Signed-off-by: Brian Brazil * Add caching in VectorBinop Signed-off-by: Brian Brazil * Use xor to have one-level resultMetric hash key Signed-off-by: Brian Brazil * Add more benchmarks Signed-off-by: Brian Brazil * Call Query.Close in apiv1 This allows point slices allocated for the response data to be reused by later queries, saving allocations. Signed-off-by: Brian Brazil * Optimise histogram_quantile It's now 5-10% faster with 97% less garbage generated for 1k steps Signed-off-by: Brian Brazil * Make the input collection in rangeVector linear rather than quadratic Signed-off-by: Brian Brazil * Optimise label_replace, for 1k steps 15x fewer allocs and 3x faster Signed-off-by: Brian Brazil * Optimise label_join, 1.8x faster and 11x less memory for 1k steps Signed-off-by: Brian Brazil * Expand benchmarks, cleanup comments, simplify numSteps logic. Signed-off-by: Brian Brazil * Address Fabian's comments Signed-off-by: Brian Brazil * Comments from Alin. Signed-off-by: Brian Brazil * Address jrv's comments Signed-off-by: Brian Brazil * Remove dead code Signed-off-by: Brian Brazil * Address Simon's comments. Signed-off-by: Brian Brazil * Rename populateIterators, pre-init some sizes Signed-off-by: Brian Brazil * Handle case where function has non-matrix args first Signed-off-by: Brian Brazil * Split rangeWrapper out to rangeEval function, improve comments Signed-off-by: Brian Brazil * Cleanup and make things more consistent Signed-off-by: Brian Brazil * Make EvalNodeHelper public Signed-off-by: Brian Brazil * Fabian's comments. Signed-off-by: Brian Brazil --- promql/ast.go | 10 +- promql/bench_test.go | 197 +++++++- promql/engine.go | 883 ++++++++++++++++++++------------- promql/functions.go | 712 +++++++++++++------------- promql/functions_test.go | 57 --- promql/test.go | 82 ++- promql/test_test.go | 40 -- promql/testdata/operators.test | 13 + storage/buffer.go | 26 +- util/stats/query_stats.go | 6 - web/api/v1/api.go | 144 +++--- web/api/v1/api_test.go | 2 +- 12 files changed, 1204 insertions(+), 968 deletions(-) delete mode 100644 promql/test_test.go diff --git a/promql/ast.go b/promql/ast.go index ccfd9cac98d..af3ed4189f2 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -132,9 +132,8 @@ type MatrixSelector struct { Offset time.Duration LabelMatchers []*labels.Matcher - // The series iterators are populated at query preparation time. - series []storage.Series - iterators []*storage.BufferedSeriesIterator + // The series are populated at query preparation time. + series []storage.Series } // NumberLiteral represents a number. @@ -166,9 +165,8 @@ type VectorSelector struct { Offset time.Duration LabelMatchers []*labels.Matcher - // The series iterators are populated at query preparation time. - series []storage.Series - iterators []*storage.BufferedSeriesIterator + // The series are populated at query preparation time. + series []storage.Series } func (e *AggregateExpr) Type() ValueType { return ValueTypeVector } diff --git a/promql/bench_test.go b/promql/bench_test.go index cd91854513a..c109500217d 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -13,36 +13,183 @@ package promql -import "testing" +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" -// A Benchmark holds context for running a unit test as a benchmark. -type Benchmark struct { - b *testing.B - t *Test - iterCount int -} + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/util/testutil" +) + +func BenchmarkRangeQuery(b *testing.B) { + storage := testutil.NewStorage(b) + defer storage.Close() + engine := NewEngine(nil, nil, 10, 100*time.Second) -// NewBenchmark returns an initialized empty Benchmark. -func NewBenchmark(b *testing.B, input string) *Benchmark { - t, err := NewTest(b, input) - if err != nil { - b.Fatalf("Unable to run benchmark: %s", err) + metrics := []labels.Labels{} + metrics = append(metrics, labels.FromStrings("__name__", "a_one")) + metrics = append(metrics, labels.FromStrings("__name__", "b_one")) + for j := 0; j < 10; j++ { + metrics = append(metrics, labels.FromStrings("__name__", "h_one", "le", strconv.Itoa(j))) } - return &Benchmark{ - b: b, - t: t, + metrics = append(metrics, labels.FromStrings("__name__", "h_one", "le", "+Inf")) + + for i := 0; i < 10; i++ { + metrics = append(metrics, labels.FromStrings("__name__", "a_ten", "l", strconv.Itoa(i))) + metrics = append(metrics, labels.FromStrings("__name__", "b_ten", "l", strconv.Itoa(i))) + for j := 0; j < 10; j++ { + metrics = append(metrics, labels.FromStrings("__name__", "h_ten", "l", strconv.Itoa(i), "le", strconv.Itoa(j))) + } + metrics = append(metrics, labels.FromStrings("__name__", "h_ten", "l", strconv.Itoa(i), "le", "+Inf")) } -} -// Run runs the benchmark. -func (b *Benchmark) Run() { - defer b.t.Close() - b.b.ReportAllocs() - b.b.ResetTimer() - for i := 0; i < b.b.N; i++ { - if err := b.t.RunAsBenchmark(b); err != nil { - b.b.Error(err) + for i := 0; i < 100; i++ { + metrics = append(metrics, labels.FromStrings("__name__", "a_hundred", "l", strconv.Itoa(i))) + metrics = append(metrics, labels.FromStrings("__name__", "b_hundred", "l", strconv.Itoa(i))) + for j := 0; j < 10; j++ { + metrics = append(metrics, labels.FromStrings("__name__", "h_hundred", "l", strconv.Itoa(i), "le", strconv.Itoa(j))) + } + metrics = append(metrics, labels.FromStrings("__name__", "h_hundred", "l", strconv.Itoa(i), "le", "+Inf")) + } + refs := make([]uint64, len(metrics)) + + // A day of data plus 10k steps. + numIntervals := 8640 + 10000 + + for s := 0; s < numIntervals; s += 1 { + a, err := storage.Appender() + if err != nil { + b.Fatal(err) + } + ts := int64(s * 10000) // 10s interval. + for i, metric := range metrics { + err := a.AddFast(metric, refs[i], ts, float64(s)) + if err != nil { + refs[i], _ = a.Add(metric, ts, float64(s)) + } + } + if err := a.Commit(); err != nil { + b.Fatal(err) } - b.iterCount++ + } + + type benchCase struct { + expr string + steps int + } + cases := []benchCase{ + // Simple rate. + { + expr: "rate(a_X[1m])", + }, + { + expr: "rate(a_X[1m])", + steps: 10000, + }, + // Holt-Winters and long ranges. + { + expr: "holt_winters(a_X[1d], 0.3, 0.3)", + }, + { + expr: "changes(a_X[1d])", + }, + { + expr: "rate(a_X[1d])", + }, + // Unary operators. + { + expr: "-a_X", + }, + // Binary operators. + { + expr: "a_X - b_X", + }, + { + expr: "a_X - b_X", + steps: 10000, + }, + { + expr: "a_X and b_X{l=~'.*[0-4]$'}", + }, + { + expr: "a_X or b_X{l=~'.*[0-4]$'}", + }, + { + expr: "a_X unless b_X{l=~'.*[0-4]$'}", + }, + // Simple functions. + { + expr: "abs(a_X)", + }, + { + expr: "label_replace(a_X, 'l2', '$1', 'l', '(.*)')", + }, + { + expr: "label_join(a_X, 'l2', '-', 'l', 'l')", + }, + // Combinations. + { + expr: "rate(a_X[1m]) + rate(b_X[1m])", + }, + { + expr: "sum without (l)(rate(a_X[1m]))", + }, + { + expr: "sum without (l)(rate(a_X[1m])) / sum without (l)(rate(b_X[1m]))", + }, + { + expr: "histogram_quantile(0.9, rate(h_X[5m]))", + }, + } + + // X in an expr will be replaced by different metric sizes. + tmp := []benchCase{} + for _, c := range cases { + if !strings.Contains(c.expr, "X") { + tmp = append(tmp, c) + } else { + tmp = append(tmp, benchCase{expr: strings.Replace(c.expr, "X", "one", -1), steps: c.steps}) + tmp = append(tmp, benchCase{expr: strings.Replace(c.expr, "X", "ten", -1), steps: c.steps}) + tmp = append(tmp, benchCase{expr: strings.Replace(c.expr, "X", "hundred", -1), steps: c.steps}) + } + } + cases = tmp + + // No step will be replaced by cases with the standard step. + tmp = []benchCase{} + for _, c := range cases { + if c.steps != 0 { + tmp = append(tmp, c) + } else { + tmp = append(tmp, benchCase{expr: c.expr, steps: 1}) + tmp = append(tmp, benchCase{expr: c.expr, steps: 10}) + tmp = append(tmp, benchCase{expr: c.expr, steps: 100}) + tmp = append(tmp, benchCase{expr: c.expr, steps: 1000}) + } + } + cases = tmp + for _, c := range cases { + name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps) + b.Run(name, func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + qry, err := engine.NewRangeQuery( + storage, c.expr, + time.Unix(int64((numIntervals-c.steps)*10), 0), + time.Unix(int64(numIntervals*10), 0), time.Second*10) + if err != nil { + b.Fatal(err) + } + res := qry.Exec(context.Background()) + if res.Err != nil { + b.Fatal(res.Err) + } + qry.Close() + } + }) } } diff --git a/promql/engine.go b/promql/engine.go index 38389f99cc1..aeea2b5d344 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "regexp" "runtime" "sort" "strconv" @@ -53,7 +54,6 @@ type engineMetrics struct { queryQueueTime prometheus.Summary queryPrepareTime prometheus.Summary queryInnerEval prometheus.Summary - queryResultAppend prometheus.Summary queryResultSort prometheus.Summary } @@ -78,8 +78,10 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele // A Query is derived from an a raw query string and can be run against an engine // it is associated with. type Query interface { - // Exec processes the query and + // Exec processes the query. Can only be called once. Exec(ctx context.Context) *Result + // Close recovers memory used by the query result. + Close() // Statement returns the parsed statement of the query. Statement() Statement // Stats returns statistics about the lifetime of the query. @@ -98,6 +100,8 @@ type query struct { stmt Statement // Timer stats for the query execution. stats *stats.TimerGroup + // Result matrix for reuse. + matrix Matrix // Cancellation function for the query. cancel func() @@ -122,6 +126,13 @@ func (q *query) Cancel() { } } +// Close implements the Query interface. +func (q *query) Close() { + for _, s := range q.matrix { + putPointSlice(s.Points) + } +} + // Exec implements the Query interface. func (q *query) Exec(ctx context.Context) *Result { if span := opentracing.SpanFromContext(ctx); span != nil { @@ -199,13 +210,6 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, Help: "Query timings", ConstLabels: prometheus.Labels{"slice": "inner_eval"}, }), - queryResultAppend: prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "query_duration_seconds", - Help: "Query timings", - ConstLabels: prometheus.Labels{"slice": "result_append"}, - }), queryResultSort: prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, @@ -222,7 +226,6 @@ func NewEngine(logger log.Logger, reg prometheus.Registerer, maxConcurrent int, metrics.maxConcurrentQueries, metrics.queryInnerEval, metrics.queryPrepareTime, - metrics.queryResultAppend, metrics.queryResultSort, ) } @@ -352,11 +355,11 @@ func durationMilliseconds(d time.Duration) int64 { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (Value, error) { prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - querier, err := ng.populateIterators(ctx, query.queryable, s) + querier, err := ng.populateSeries(ctx, query.queryable, s) prepareTimer.Stop() ng.metrics.queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) - // XXX(fabxc): the querier returned by populateIterators might be instantiated + // XXX(fabxc): the querier returned by populateSeries might be instantiated // we must not return without closing irrespective of the error. // TODO: make this semantically saner. if querier != nil { @@ -368,13 +371,15 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() - // Instant evaluation. + // Instant evaluation. This is executed as a range evaluation with one step. if s.Start == s.End && s.Interval == 0 { start := timeMilliseconds(s.Start) evaluator := &evaluator{ - Timestamp: start, - ctx: ctx, - logger: ng.logger, + startTimestamp: start, + endTimestamp: start, + interval: 1, + ctx: ctx, + logger: ng.logger, } val, err := evaluator.Eval(s.Expr) if err != nil { @@ -383,84 +388,52 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( evalTimer.Stop() ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) - // Point might have a different timestamp, force it to the evaluation - // timestamp as that is when we ran the evaluation. - switch v := val.(type) { - case Scalar: - v.T = start - case Vector: - for i := range v { - v[i].Point.T = start - } - } - - return val, nil - } - numSteps := int(s.End.Sub(s.Start) / s.Interval) - - // Range evaluation. - Seriess := map[uint64]Series{} - for ts := s.Start; !ts.After(s.End); ts = ts.Add(s.Interval) { - if err := contextDone(ctx, "range evaluation"); err != nil { - return nil, err - } - - t := timeMilliseconds(ts) - evaluator := &evaluator{ - Timestamp: t, - ctx: ctx, - logger: ng.logger, - } - val, err := evaluator.Eval(s.Expr) - if err != nil { - return nil, err + mat, ok := val.(Matrix) + if !ok { + panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } - - switch v := val.(type) { - case Scalar: - // As the expression type does not change we can safely default to 0 - // as the fingerprint for Scalar expressions. - ss, ok := Seriess[0] - if !ok { - ss = Series{Points: make([]Point, 0, numSteps)} - Seriess[0] = ss - } - ss.Points = append(ss.Points, Point{V: v.V, T: t}) - Seriess[0] = ss - case Vector: - for _, sample := range v { - h := sample.Metric.Hash() - ss, ok := Seriess[h] - if !ok { - ss = Series{ - Metric: sample.Metric, - Points: make([]Point, 0, numSteps), - } - Seriess[h] = ss - } - sample.Point.T = t - ss.Points = append(ss.Points, sample.Point) - Seriess[h] = ss + query.matrix = mat + switch s.Expr.Type() { + case ValueTypeVector: + // Convert matrix with one value per series into vector. + vector := make(Vector, len(mat)) + for i, s := range mat { + // Point might have a different timestamp, force it to the evaluation + // timestamp as that is when we ran the evaluation. + vector[i] = Sample{Metric: s.Metric, Point: Point{V: s.Points[0].V, T: start}} } + return vector, nil + case ValueTypeScalar: + return Scalar{V: mat[0].Points[0].V, T: start}, nil + case ValueTypeMatrix: + return mat, nil default: - panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) + panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) } + } - evalTimer.Stop() - ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) - if err := contextDone(ctx, "expression evaluation"); err != nil { + // Range evaluation. + evaluator := &evaluator{ + startTimestamp: timeMilliseconds(s.Start), + endTimestamp: timeMilliseconds(s.End), + interval: durationMilliseconds(s.Interval), + ctx: ctx, + logger: ng.logger, + } + val, err := evaluator.Eval(s.Expr) + if err != nil { return nil, err } + evalTimer.Stop() + ng.metrics.queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) - appendTimer := query.stats.GetTimer(stats.ResultAppendTime).Start() - mat := Matrix{} - for _, ss := range Seriess { - mat = append(mat, ss) + mat, ok := val.(Matrix) + if !ok { + panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } - appendTimer.Stop() - ng.metrics.queryResultAppend.Observe(appendTimer.ElapsedTime().Seconds()) + query.matrix = mat if err := contextDone(ctx, "expression evaluation"); err != nil { return nil, err @@ -476,7 +449,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return mat, nil } -func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { +func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { var maxOffset time.Duration Inspect(s.Expr, func(node Node, _ []Node) bool { switch n := node.(type) { @@ -526,10 +499,6 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return false } - for _, s := range n.series { - it := storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) - n.iterators = append(n.iterators, it) - } case *MatrixSelector: params.Func = extractFuncFromPath(path) @@ -544,10 +513,6 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return false } - for _, s := range n.series { - it := storage.NewBuffer(s.Iterator(), durationMilliseconds(n.Range)) - n.iterators = append(n.iterators, it) - } } return true }) @@ -580,31 +545,26 @@ func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { return res, it.Err() } -// An evaluator evaluates given expressions at a fixed timestamp. It is attached to an -// engine through which it connects to a querier and reports errors. On timeout or -// cancellation of its context it terminates. +// An evaluator evaluates given expressions over given fixed timestamps. It +// is attached to an engine through which it connects to a querier and reports +// errors. On timeout or cancellation of its context it terminates. type evaluator struct { ctx context.Context - Timestamp int64 // time in milliseconds + startTimestamp int64 // Start time in milliseconds. - finalizers []func() + endTimestamp int64 // End time in milliseconds. + interval int64 // Interval in milliseconds. logger log.Logger } -func (ev *evaluator) close() { - for _, f := range ev.finalizers { - f() - } -} - -// fatalf causes a panic with the input formatted into an error. +// errorf causes a panic with the input formatted into an error. func (ev *evaluator) errorf(format string, args ...interface{}) { ev.error(fmt.Errorf(format, args...)) } -// fatal causes a panic with the given error. +// error causes a panic with the given error. func (ev *evaluator) error(err error) { panic(err) } @@ -615,88 +575,169 @@ func (ev *evaluator) recover(errp *error) { if e == nil { return } - if _, ok := e.(runtime.Error); ok { + if err, ok := e.(runtime.Error); ok { // Print the stack trace but do not inhibit the running application. buf := make([]byte, 64<<10) buf = buf[:runtime.Stack(buf, false)] level.Error(ev.logger).Log("msg", "runtime panic in parser", "err", e, "stacktrace", string(buf)) - *errp = fmt.Errorf("unexpected error") + *errp = fmt.Errorf("unexpected error: %s", err) } else { *errp = e.(error) } } -// evalScalar attempts to evaluate e to a Scalar value and errors otherwise. -func (ev *evaluator) evalScalar(e Expr) Scalar { - val := ev.eval(e) - sv, ok := val.(Scalar) - if !ok { - ev.errorf("expected Scalar but got %s", documentedType(val.Type())) - } - return sv +func (ev *evaluator) Eval(expr Expr) (v Value, err error) { + defer ev.recover(&err) + return ev.eval(expr), nil } -// evalVector attempts to evaluate e to a Vector value and errors otherwise. -func (ev *evaluator) evalVector(e Expr) Vector { - val := ev.eval(e) - vec, ok := val.(Vector) - if !ok { - ev.errorf("expected instant Vector but got %s", documentedType(val.Type())) +// Extra information and caches for evaluating a single node across steps. +type EvalNodeHelper struct { + // Evaluation timestamp. + ts int64 + // Vector that can be used for output. + out Vector + + // Caches. + // dropMetricName and label_*. + dmn map[uint64]labels.Labels + // signatureFunc. + sigf map[uint64]uint64 + // funcHistogramQuantile. + signatureToMetricWithBuckets map[uint64]*metricWithBuckets + // label_replace. + regex *regexp.Regexp + + // For binary vector matching. + rightSigs map[uint64]Sample + matchedSigs map[uint64]map[uint64]struct{} + resultMetric map[uint64]labels.Labels +} + +// dropMetricName is a cached version of dropMetricName. +func (enh *EvalNodeHelper) dropMetricName(l labels.Labels) labels.Labels { + if enh.dmn == nil { + enh.dmn = make(map[uint64]labels.Labels, len(enh.out)) + } + h := l.Hash() + ret, ok := enh.dmn[h] + if ok { + return ret + } + ret = dropMetricName(l) + enh.dmn[h] = ret + return ret +} + +// signatureFunc is a cached version of signatureFunc. +func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.Labels) uint64 { + if enh.sigf == nil { + enh.sigf = make(map[uint64]uint64, len(enh.out)) + } + f := signatureFunc(on, names...) + return func(l labels.Labels) uint64 { + h := l.Hash() + ret, ok := enh.sigf[h] + if ok { + return ret + } + ret = f(l) + enh.sigf[h] = ret + return ret + } +} + +// rangeEval evaluates the given expressions, and then for each step calls +// the given function with the values computed for each expression at that +// step. The return value is the combination into time series of of all the +// function call results. +func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ...Expr) Matrix { + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + matrixes := make([]Matrix, len(exprs)) + origMatrixes := make([]Matrix, len(exprs)) + for i, e := range exprs { + // Functions will take string arguments from the expressions, not the values. + if e != nil && e.Type() != ValueTypeString { + matrixes[i] = ev.eval(e).(Matrix) + + // Keep a copy of the original point slices so that they + // can be returned to the pool. + origMatrixes[i] = make(Matrix, len(matrixes[i])) + copy(origMatrixes[i], matrixes[i]) + } } - return vec -} -// evalInt attempts to evaluate e into an integer and errors otherwise. -func (ev *evaluator) evalInt(e Expr) int64 { - sc := ev.evalScalar(e) - if !convertibleToInt64(sc.V) { - ev.errorf("Scalar value %v overflows int64", sc.V) + vectors := make([]Vector, len(exprs)) // Input vectors for the function. + args := make([]Value, len(exprs)) // Argument to function. + // Create an output vector that is as big as the input matrix with + // the most time series. + biggestLen := 1 + for i := range exprs { + vectors[i] = make(Vector, 0, len(matrixes[i])) + if len(matrixes[i]) > biggestLen { + biggestLen = len(matrixes[i]) + } } - return int64(sc.V) -} - -// evalFloat attempts to evaluate e into a float and errors otherwise. -func (ev *evaluator) evalFloat(e Expr) float64 { - sc := ev.evalScalar(e) - return float64(sc.V) -} - -// evalMatrix attempts to evaluate e into a Matrix and errors otherwise. -// The error message uses the term "range Vector" to match the user facing -// documentation. -func (ev *evaluator) evalMatrix(e Expr) Matrix { - val := ev.eval(e) - mat, ok := val.(Matrix) - if !ok { - ev.errorf("expected range Vector but got %s", documentedType(val.Type())) + enh := &EvalNodeHelper{out: make(Vector, 0, biggestLen)} + seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + // Gather input vectors for this timestamp. + for i := range exprs { + vectors[i] = vectors[i][:0] + for si, series := range matrixes[i] { + for _, point := range series.Points { + if point.T == ts { + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + matrixes[i][si].Points = series.Points[1:] + } + break + } + } + args[i] = vectors[i] + } + // Make the function call. + enh.ts = ts + result := f(args, enh) + enh.out = result[:0] // Reuse result vector. + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + mat := make(Matrix, len(result)) + for i, s := range result { + s.Point.T = ts + mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}} + } + return mat + } + // Add samples in output vector to output series. + for _, sample := range result { + h := sample.Metric.Hash() + ss, ok := seriess[h] + if !ok { + ss = Series{ + Metric: sample.Metric, + Points: getPointSlice(numSteps), + } + } + sample.Point.T = ts + ss.Points = append(ss.Points, sample.Point) + seriess[h] = ss + } } - return mat -} - -// evalString attempts to evaluate e to a string value and errors otherwise. -func (ev *evaluator) evalString(e Expr) String { - val := ev.eval(e) - sv, ok := val.(String) - if !ok { - ev.errorf("expected string but got %s", documentedType(val.Type())) + // Reuse the original point slices. + for _, m := range origMatrixes { + for _, s := range m { + putPointSlice(s.Points) + } } - return sv -} - -// evalOneOf evaluates e and errors unless the result is of one of the given types. -func (ev *evaluator) evalOneOf(e Expr, t1, t2 ValueType) Value { - val := ev.eval(e) - if val.Type() != t1 && val.Type() != t2 { - ev.errorf("expected %s or %s but got %s", documentedType(t1), documentedType(t2), documentedType(val.Type())) + // Assemble the output matrix. + mat := make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + mat = append(mat, ss) } - return val -} - -func (ev *evaluator) Eval(expr Expr) (v Value, err error) { - defer ev.recover(&err) - defer ev.close() - return ev.eval(expr), nil + return mat } // eval evaluates the given expression as the given AST expression node requires. @@ -706,117 +747,271 @@ func (ev *evaluator) eval(expr Expr) Value { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 switch e := expr.(type) { case *AggregateExpr: - Vector := ev.evalVector(e.Expr) - return ev.aggregation(e.Op, e.Grouping, e.Without, e.Param, Vector) + if s, ok := e.Param.(*StringLiteral); ok { + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh) + }, e.Expr) + } + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + var param float64 + if e.Param != nil { + param = v[0].(Vector)[0].V + } + return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh) + }, e.Param, e.Expr) - case *BinaryExpr: - lhs := ev.evalOneOf(e.LHS, ValueTypeScalar, ValueTypeVector) - rhs := ev.evalOneOf(e.RHS, ValueTypeScalar, ValueTypeVector) + case *Call: + if e.Func.Name == "timestamp" { + // Matrix evaluation always returns the evaluation time, + // so this function needs special handling when given + // a vector selector. + vs, ok := e.Args[0].(*VectorSelector) + if ok { + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return e.Func.Call([]Value{ev.vectorSelector(vs, enh.ts)}, e.Args, enh) + }) + } + } + // Check if the function has a matrix argument. + var matrixArgIndex int + var matrixArg bool + for i, a := range e.Args { + _, ok := a.(*MatrixSelector) + if ok { + matrixArgIndex = i + matrixArg = true + break + } + } + if !matrixArg { + // Does not have a matrix argument. + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return e.Func.Call(v, e.Args, enh) + }, e.Args...) + } - switch lt, rt := lhs.Type(), rhs.Type(); { - case lt == ValueTypeScalar && rt == ValueTypeScalar: - return Scalar{ - V: scalarBinop(e.Op, lhs.(Scalar).V, rhs.(Scalar).V), - T: ev.Timestamp, + inArgs := make([]Value, len(e.Args)) + // Evaluate any non-matrix arguments. + otherArgs := make([]Matrix, len(e.Args)) + otherInArgs := make([]Vector, len(e.Args)) + for i, e := range e.Args { + if i != matrixArgIndex { + otherArgs[i] = ev.eval(e).(Matrix) + otherInArgs[i] = Vector{Sample{}} + inArgs[i] = otherInArgs[i] + } + } + + sel := e.Args[matrixArgIndex].(*MatrixSelector) + mat := make(Matrix, 0, len(sel.series)) // Output matrix. + offset := durationMilliseconds(sel.Offset) + selRange := durationMilliseconds(sel.Range) + // Reuse objects across steps to save memory allocations. + points := getPointSlice(16) + inMatrix := make(Matrix, 1) + inArgs[matrixArgIndex] = inMatrix + enh := &EvalNodeHelper{out: make(Vector, 0, 1)} + // Process all the calls for one time series at a time. + var it *storage.BufferedSeriesIterator + for i, s := range sel.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), selRange) + } else { + it.Reset(s.Iterator()) + } + ss := Series{ + // For all range vector functions, the only change to the + // output labels is dropping the metric name so just do + // it once here. + Metric: dropMetricName(sel.series[i].Labels()), + Points: getPointSlice(numSteps), + } + inMatrix[0].Metric = sel.series[i].Labels() + for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { + step++ + // Set the non-matrix arguments. + // They are scalar, so it is safe to use the step number + // when looking up the argument, as there will be no gaps. + for j := range e.Args { + if j != matrixArgIndex { + otherInArgs[j][0].V = otherArgs[j][0].Points[step].V + } + } + maxt := ts - offset + mint := maxt - selRange + // Evaluate the matrix selector for this series for this step. + points = ev.matrixIterSlice(it, mint, maxt, points[:0]) + if len(points) == 0 { + continue + } + inMatrix[0].Points = points + enh.ts = ts + // Make the function call. + outVec := e.Func.Call(inArgs, e.Args, enh) + enh.out = outVec[:0] + if len(outVec) > 0 { + ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts}) + } } + if len(ss.Points) > 0 { + mat = append(mat, ss) + } + } + putPointSlice(points) + return mat + + case *ParenExpr: + return ev.eval(e.Expr) + + case *UnaryExpr: + mat := ev.eval(e.Expr).(Matrix) + if e.Op == itemSUB { + for i := range mat { + mat[i].Metric = dropMetricName(mat[i].Metric) + for j := range mat[i].Points { + mat[i].Points[j].V = -mat[i].Points[j].V + } + } + } + return mat + case *BinaryExpr: + switch lt, rt := e.LHS.Type(), e.RHS.Type(); { + case lt == ValueTypeScalar && rt == ValueTypeScalar: + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + val := scalarBinop(e.Op, v[0].(Vector)[0].Point.V, v[1].(Vector)[0].Point.V) + return append(enh.out, Sample{Point: Point{V: val}}) + }, e.LHS, e.RHS) case lt == ValueTypeVector && rt == ValueTypeVector: switch e.Op { case itemLAND: - return ev.VectorAnd(lhs.(Vector), rhs.(Vector), e.VectorMatching) + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh) + }, e.LHS, e.RHS) case itemLOR: - return ev.VectorOr(lhs.(Vector), rhs.(Vector), e.VectorMatching) + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh) + }, e.LHS, e.RHS) case itemLUnless: - return ev.VectorUnless(lhs.(Vector), rhs.(Vector), e.VectorMatching) + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh) + }, e.LHS, e.RHS) default: - return ev.VectorBinop(e.Op, lhs.(Vector), rhs.(Vector), e.VectorMatching, e.ReturnBool) + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh) + }, e.LHS, e.RHS) } + case lt == ValueTypeVector && rt == ValueTypeScalar: - return ev.VectorscalarBinop(e.Op, lhs.(Vector), rhs.(Scalar), false, e.ReturnBool) + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh) + }, e.LHS, e.RHS) case lt == ValueTypeScalar && rt == ValueTypeVector: - return ev.VectorscalarBinop(e.Op, rhs.(Vector), lhs.(Scalar), true, e.ReturnBool) + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh) + }, e.LHS, e.RHS) } - case *Call: - return e.Func.Call(ev, e.Args) - - case *MatrixSelector: - return ev.matrixSelector(e) - case *NumberLiteral: - return Scalar{V: e.Val, T: ev.Timestamp} - - case *ParenExpr: - return ev.eval(e.Expr) + return ev.rangeEval(func(v []Value, enh *EvalNodeHelper) Vector { + return append(enh.out, Sample{Point: Point{V: e.Val}}) + }) - case *StringLiteral: - return String{V: e.Val, T: ev.Timestamp} + case *VectorSelector: + mat := make(Matrix, 0, len(e.series)) + var it *storage.BufferedSeriesIterator + for i, s := range e.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) + } else { + it.Reset(s.Iterator()) + } + ss := Series{ + Metric: e.series[i].Labels(), + Points: getPointSlice(numSteps), + } - case *UnaryExpr: - se := ev.evalOneOf(e.Expr, ValueTypeScalar, ValueTypeVector) - // Only + and - are possible operators. - if e.Op == itemSUB { - switch v := se.(type) { - case Scalar: - v.V = -v.V - case Vector: - for i, sv := range v { - v[i].V = -sv.V + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + _, v, ok := ev.vectorSelectorSingle(it, e, ts) + if ok { + ss.Points = append(ss.Points, Point{V: v, T: ts}) } } + + if len(ss.Points) > 0 { + mat = append(mat, ss) + } } - return se + return mat - case *VectorSelector: - return ev.vectorSelector(e) + case *MatrixSelector: + if ev.startTimestamp != ev.endTimestamp { + panic(fmt.Errorf("cannot do range evaluation of matrix selector")) + } + return ev.matrixSelector(e) } + panic(fmt.Errorf("unhandled expression of type: %T", expr)) } // vectorSelector evaluates a *VectorSelector expression. -func (ev *evaluator) vectorSelector(node *VectorSelector) Vector { +func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector { var ( - vec = make(Vector, 0, len(node.series)) - refTime = ev.Timestamp - durationMilliseconds(node.Offset) + vec = make(Vector, 0, len(node.series)) ) - for i, it := range node.iterators { - var t int64 - var v float64 - - ok := it.Seek(refTime) - if !ok { - if it.Err() != nil { - ev.error(it.Err()) - } + var it *storage.BufferedSeriesIterator + for i, s := range node.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) + } else { + it.Reset(s.Iterator()) } + t, v, ok := ev.vectorSelectorSingle(it, node, ts) if ok { - t, v = it.Values() + vec = append(vec, Sample{ + Metric: node.series[i].Labels(), + Point: Point{V: v, T: t}, + }) } - peek := 1 - if !ok || t > refTime { - t, v, ok = it.PeekBack(peek) - peek++ - if !ok || t < refTime-durationMilliseconds(LookbackDelta) { - continue - } - } - if value.IsStaleNaN(v) { - continue + } + return vec +} + +// vectorSelectorSingle evaluates a instant vector for the iterator of one time series. +func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *VectorSelector, ts int64) (int64, float64, bool) { + refTime := ts - durationMilliseconds(node.Offset) + var t int64 + var v float64 + + ok := it.Seek(refTime) + if !ok { + if it.Err() != nil { + ev.error(it.Err()) } + } - vec = append(vec, Sample{ - Metric: node.series[i].Labels(), - Point: Point{V: v, T: t}, - }) + if ok { + t, v = it.Values() } - return vec + + if !ok || t > refTime { + t, v, ok = it.PeekBack(1) + if !ok || t < refTime-durationMilliseconds(LookbackDelta) { + return 0, 0, false + } + } + if value.IsStaleNaN(v) { + return 0, 0, false + } + return t, v, true } var pointPool = sync.Pool{} @@ -833,70 +1028,27 @@ func putPointSlice(p []Point) { pointPool.Put(p[:0]) } -var matrixPool = sync.Pool{} - -func getMatrix(sz int) Matrix { - m := matrixPool.Get() - if m != nil { - return m.(Matrix) - } - return make(Matrix, 0, sz) -} - -func putMatrix(m Matrix) { - matrixPool.Put(m[:0]) -} - // matrixSelector evaluates a *MatrixSelector expression. func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { var ( offset = durationMilliseconds(node.Offset) - maxt = ev.Timestamp - offset + maxt = ev.startTimestamp - offset mint = maxt - durationMilliseconds(node.Range) - matrix = getMatrix(len(node.series)) - // Write all points into a single slice to avoid lots of tiny allocations. - allPoints = getPointSlice(5 * len(matrix)) + matrix = make(Matrix, 0, len(node.series)) ) - ev.finalizers = append(ev.finalizers, - func() { putPointSlice(allPoints) }, - func() { putMatrix(matrix) }, - ) - - for i, it := range node.iterators { - start := len(allPoints) - + var it *storage.BufferedSeriesIterator + for i, s := range node.series { + if it == nil { + it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range)) + } else { + it.Reset(s.Iterator()) + } ss := Series{ Metric: node.series[i].Labels(), } - ok := it.Seek(maxt) - if !ok { - if it.Err() != nil { - ev.error(it.Err()) - } - } - - buf := it.Buffer() - for buf.Next() { - t, v := buf.At() - if value.IsStaleNaN(v) { - continue - } - // Values in the buffer are guaranteed to be smaller than maxt. - if t >= mint { - allPoints = append(allPoints, Point{T: t, V: v}) - } - } - // The seeked sample might also be in the range. - if ok { - t, v := it.Values() - if t == maxt && !value.IsStaleNaN(v) { - allPoints = append(allPoints, Point{T: t, V: v}) - } - } - - ss.Points = allPoints[start:] + ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) if len(ss.Points) > 0 { matrix = append(matrix, ss) @@ -905,13 +1057,42 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { return matrix } -func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector { +// matrixIterSlice evaluates a matrix vector for the iterator of one time series. +func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point { + ok := it.Seek(maxt) + if !ok { + if it.Err() != nil { + ev.error(it.Err()) + } + } + + buf := it.Buffer() + for buf.Next() { + t, v := buf.At() + if value.IsStaleNaN(v) { + continue + } + // Values in the buffer are guaranteed to be smaller than maxt. + if t >= mint { + out = append(out, Point{T: t, V: v}) + } + } + // The seeked sample might also be in the range. + if ok { + t, v := it.Values() + if t == maxt && !value.IsStaleNaN(v) { + out = append(out, Point{T: t, V: v}) + } + } + return out +} + +func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector { if matching.Card != CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := signatureFunc(matching.On, matching.MatchingLabels...) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) - var result Vector // The set of signatures for the right-hand side Vector. rightSigs := map[uint64]struct{}{} // Add all rhs samples to a map so we can easily find matches later. @@ -922,63 +1103,58 @@ func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *VectorMatching) Vector for _, ls := range lhs { // If there's a matching entry in the right-hand side Vector, add the sample. if _, ok := rightSigs[sigf(ls.Metric)]; ok { - result = append(result, ls) + enh.out = append(enh.out, ls) } } - return result + return enh.out } -func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *VectorMatching) Vector { +func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector { if matching.Card != CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := signatureFunc(matching.On, matching.MatchingLabels...) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) - var result Vector leftSigs := map[uint64]struct{}{} // Add everything from the left-hand-side Vector. for _, ls := range lhs { leftSigs[sigf(ls.Metric)] = struct{}{} - result = append(result, ls) + enh.out = append(enh.out, ls) } // Add all right-hand side elements which have not been added from the left-hand side. for _, rs := range rhs { if _, ok := leftSigs[sigf(rs.Metric)]; !ok { - result = append(result, rs) + enh.out = append(enh.out, rs) } } - return result + return enh.out } -func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *VectorMatching) Vector { +func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *VectorMatching, enh *EvalNodeHelper) Vector { if matching.Card != CardManyToMany { panic("set operations must only use many-to-many matching") } - sigf := signatureFunc(matching.On, matching.MatchingLabels...) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) rightSigs := map[uint64]struct{}{} for _, rs := range rhs { rightSigs[sigf(rs.Metric)] = struct{}{} } - var result Vector for _, ls := range lhs { if _, ok := rightSigs[sigf(ls.Metric)]; !ok { - result = append(result, ls) + enh.out = append(enh.out, ls) } } - return result + return enh.out } // VectorBinop evaluates a binary operation between two Vectors, excluding set operators. -func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorMatching, returnBool bool) Vector { +func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorMatching, returnBool bool, enh *EvalNodeHelper) Vector { if matching.Card == CardManyToMany { panic("many-to-many only allowed for set operators") } - var ( - result = Vector{} - sigf = signatureFunc(matching.On, matching.MatchingLabels...) - ) + sigf := enh.signatureFunc(matching.On, matching.MatchingLabels...) // The control flow below handles one-to-one or many-to-one matching. // For one-to-many, swap sidedness and account for the swap when calculating @@ -988,7 +1164,14 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM } // All samples from the rhs hashed by the matching label/values. - rightSigs := map[uint64]Sample{} + if enh.rightSigs == nil { + enh.rightSigs = make(map[uint64]Sample, len(enh.out)) + } else { + for k := range enh.rightSigs { + delete(enh.rightSigs, k) + } + } + rightSigs := enh.rightSigs // Add all rhs samples to a map so we can easily find matches later. for _, rs := range rhs { @@ -1004,7 +1187,14 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM // Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one // the value is a set of signatures to detect duplicated result elements. - matchedSigs := map[uint64]map[uint64]struct{}{} + if enh.matchedSigs == nil { + enh.matchedSigs = make(map[uint64]map[uint64]struct{}, len(rightSigs)) + } else { + for k := range enh.matchedSigs { + delete(enh.matchedSigs, k) + } + } + matchedSigs := enh.matchedSigs // For all lhs samples find a respective rhs sample and perform // the binary operation. @@ -1031,7 +1221,7 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM } else if !keep { continue } - metric := resultMetric(ls.Metric, rs.Metric, op, matching) + metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh) insertedSigs, exists := matchedSigs[sig] if matching.Card == CardOneToOne { @@ -1054,12 +1244,12 @@ func (ev *evaluator) VectorBinop(op ItemType, lhs, rhs Vector, matching *VectorM insertedSigs[insertSig] = struct{}{} } - result = append(result, Sample{ + enh.out = append(enh.out, Sample{ Metric: metric, - Point: Point{V: value, T: ev.Timestamp}, + Point: Point{V: value}, }) } - return result + return enh.out } func hashWithoutLabels(lset labels.Labels, names ...string) uint64 { @@ -1109,7 +1299,20 @@ func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 { // resultMetric returns the metric for the given sample(s) based on the Vector // binary operation and the matching options. -func resultMetric(lhs, rhs labels.Labels, op ItemType, matching *VectorMatching) labels.Labels { +func resultMetric(lhs, rhs labels.Labels, op ItemType, matching *VectorMatching, enh *EvalNodeHelper) labels.Labels { + if enh.resultMetric == nil { + enh.resultMetric = make(map[uint64]labels.Labels, len(enh.out)) + } + // op and matching are always the same for a given node, so + // there's no need to include them in the hash key. + // If the lhs and rhs are the same then the xor would be 0, + // so add in one side to protect against that. + lh := lhs.Hash() + h := (lh ^ rhs.Hash()) + lh + if ret, ok := enh.resultMetric[h]; ok { + return ret + } + lb := labels.NewBuilder(lhs) if shouldDropMetricName(op) { @@ -1140,13 +1343,13 @@ func resultMetric(lhs, rhs labels.Labels, op ItemType, matching *VectorMatching) } } - return lb.Labels() + ret := lb.Labels() + enh.resultMetric[h] = ret + return ret } // VectorscalarBinop evaluates a binary operation between a Vector and a Scalar. -func (ev *evaluator) VectorscalarBinop(op ItemType, lhs Vector, rhs Scalar, swap, returnBool bool) Vector { - vec := make(Vector, 0, len(lhs)) - +func (ev *evaluator) VectorscalarBinop(op ItemType, lhs Vector, rhs Scalar, swap, returnBool bool, enh *EvalNodeHelper) Vector { for _, lhsSample := range lhs { lv, rv := lhsSample.V, rhs.V // lhs always contains the Vector. If the original position was different @@ -1166,12 +1369,12 @@ func (ev *evaluator) VectorscalarBinop(op ItemType, lhs Vector, rhs Scalar, swap if keep { lhsSample.V = value if shouldDropMetricName(op) || returnBool { - lhsSample.Metric = dropMetricName(lhsSample.Metric) + lhsSample.Metric = enh.dropMetricName(lhsSample.Metric) } - vec = append(vec, lhsSample) + enh.out = append(enh.out, lhsSample) } } - return vec + return enh.out } func dropMetricName(l labels.Labels) labels.Labels { @@ -1265,23 +1468,27 @@ type groupedAggregation struct { } // aggregation evaluates an aggregation operation on a Vector. -func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param Expr, vec Vector) Vector { +func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector { result := map[uint64]*groupedAggregation{} var k int64 if op == itemTopK || op == itemBottomK { - k = ev.evalInt(param) + f := param.(float64) + if !convertibleToInt64(f) { + ev.errorf("Scalar value %v overflows int64", f) + } + k = int64(f) if k < 1 { return Vector{} } } var q float64 if op == itemQuantile { - q = ev.evalFloat(param) + q = param.(float64) } var valueLabel string if op == itemCountValues { - valueLabel = ev.evalString(param).V + valueLabel = param.(string) if !without { grouping = append(grouping, valueLabel) } @@ -1411,8 +1618,6 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p } // Construct the result Vector from the aggregated groups. - resultVector := make(Vector, 0, len(result)) - for _, aggr := range result { switch op { case itemAvg: @@ -1433,9 +1638,9 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p // The heap keeps the lowest value on top, so reverse it. sort.Sort(sort.Reverse(aggr.heap)) for _, v := range aggr.heap { - resultVector = append(resultVector, Sample{ + enh.out = append(enh.out, Sample{ Metric: v.Metric, - Point: Point{V: v.V, T: ev.Timestamp}, + Point: Point{V: v.V}, }) } continue // Bypass default append. @@ -1444,9 +1649,9 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p // The heap keeps the lowest value on top, so reverse it. sort.Sort(sort.Reverse(aggr.reverseHeap)) for _, v := range aggr.reverseHeap { - resultVector = append(resultVector, Sample{ + enh.out = append(enh.out, Sample{ Metric: v.Metric, - Point: Point{V: v.V, T: ev.Timestamp}, + Point: Point{V: v.V}, }) } continue // Bypass default append. @@ -1458,12 +1663,12 @@ func (ev *evaluator) aggregation(op ItemType, grouping []string, without bool, p // For other aggregations, we already have the right value. } - resultVector = append(resultVector, Sample{ + enh.out = append(enh.out, Sample{ Metric: aggr.labels, - Point: Point{V: aggr.value, T: ev.Timestamp}, + Point: Point{V: aggr.value}, }) } - return resultVector + return enh.out } // btos returns 1 if b is true, 0 otherwise. diff --git a/promql/functions.go b/promql/functions.go index 91160a039a5..7e0ca4b16a0 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,6 +14,7 @@ package promql import ( + "fmt" "math" "regexp" "sort" @@ -32,29 +33,41 @@ type Function struct { ArgTypes []ValueType Variadic int ReturnType ValueType - Call func(ev *evaluator, args Expressions) Value + + // vals is a list of the evaluated arguments for the function call. + // For range vectors it will be a Matrix with one series, instant vectors a + // Vector, scalars a Vector with one series whose value is the scalar + // value,and nil for strings. + // args are the original arguments to the function, where you can access + // matrixSelectors, vectorSelectors, and StringLiterals. + // enh.out is a pre-allocated empty vector that you may use to accumulate + // output before returning it. The vectors in vals should not be returned.a + // Range vector functions need only return a vector with the right value, + // the metric and timestamp are not neded. + // Instant vector functions need only return a vector with the right values and + // metrics, the timestamp are not needed. + // Scalar results should be returned as the value of a sample in a Vector. + Call func(vals []Value, args Expressions, enh *EvalNodeHelper) Vector } // === time() float64 === -func funcTime(ev *evaluator, args Expressions) Value { - return Scalar{ - V: float64(ev.Timestamp) / 1000, - T: ev.Timestamp, - } +func funcTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return Vector{Sample{Point: Point{ + V: float64(enh.ts) / 1000, + }}} } // extrapolatedRate is a utility function for rate/increase/delta. // It calculates the rate (allowing for counter resets if isCounter is true), // extrapolates if the first/last sample is close to the boundary, and returns // the result as either per-second (if isRate is true) or overall. -func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Value { - ms := arg.(*MatrixSelector) +func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector { + ms := args[0].(*MatrixSelector) var ( - matrix = ev.evalMatrix(ms) - rangeStart = ev.Timestamp - durationMilliseconds(ms.Range+ms.Offset) - rangeEnd = ev.Timestamp - durationMilliseconds(ms.Offset) - resultVector = make(Vector, 0, len(matrix)) + matrix = vals[0].(Matrix) + rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset) + rangeEnd = enh.ts - durationMilliseconds(ms.Offset) ) for _, samples := range matrix { @@ -118,42 +131,40 @@ func extrapolatedRate(ev *evaluator, arg Expr, isCounter bool, isRate bool) Valu resultValue = resultValue / ms.Range.Seconds() } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: resultValue, T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: resultValue}, }) } - return resultVector + return enh.out } // === delta(Matrix ValueTypeMatrix) Vector === -func funcDelta(ev *evaluator, args Expressions) Value { - return extrapolatedRate(ev, args[0], false, false) +func funcDelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extrapolatedRate(vals, args, enh, false, false) } // === rate(node ValueTypeMatrix) Vector === -func funcRate(ev *evaluator, args Expressions) Value { - return extrapolatedRate(ev, args[0], true, true) +func funcRate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extrapolatedRate(vals, args, enh, true, true) } // === increase(node ValueTypeMatrix) Vector === -func funcIncrease(ev *evaluator, args Expressions) Value { - return extrapolatedRate(ev, args[0], true, false) +func funcIncrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extrapolatedRate(vals, args, enh, true, false) } // === irate(node ValueTypeMatrix) Vector === -func funcIrate(ev *evaluator, args Expressions) Value { - return instantValue(ev, args[0], true) +func funcIrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return instantValue(vals, enh.out, true) } // === idelta(node model.ValMatric) Vector === -func funcIdelta(ev *evaluator, args Expressions) Value { - return instantValue(ev, args[0], false) +func funcIdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return instantValue(vals, enh.out, false) } -func instantValue(ev *evaluator, arg Expr, isRate bool) Value { - resultVector := Vector{} - for _, samples := range ev.evalMatrix(arg) { +func instantValue(vals []Value, out Vector, isRate bool) Vector { + for _, samples := range vals[0].(Matrix) { // No sense in trying to compute a rate without at least two points. Drop // this Vector element. if len(samples.Points) < 2 { @@ -182,12 +193,11 @@ func instantValue(ev *evaluator, arg Expr, isRate bool) Value { resultValue /= float64(sampledInterval) / 1000 } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: resultValue, T: ev.Timestamp}, + out = append(out, Sample{ + Point: Point{V: resultValue}, }) } - return resultVector + return out } // Calculate the trend value at the given index i in raw data d. @@ -195,18 +205,15 @@ func instantValue(ev *evaluator, arg Expr, isRate bool) Value { // The argument "s" is the set of computed smoothed values. // The argument "b" is the set of computed trend factors. // The argument "d" is the set of raw input values. -func calcTrendValue(i int, sf, tf float64, s, b, d []float64) float64 { +func calcTrendValue(i int, sf, tf, s0, s1, b float64) float64 { if i == 0 { - return b[0] + return b } - x := tf * (s[i] - s[i-1]) - y := (1 - tf) * b[i-1] - - // Cache the computed value. - b[i] = x + y + x := tf * (s1 - s0) + y := (1 - tf) * b - return b[i] + return x + y } // Holt-Winters is similar to a weighted moving average, where historical data has exponentially less influence on the current data. @@ -214,29 +221,23 @@ func calcTrendValue(i int, sf, tf float64, s, b, d []float64) float64 { // data. A lower smoothing factor increases the influence of historical data. The trend factor (0 < tf < 1) affects // how trends in historical data will affect the current data. A higher trend factor increases the influence. // of trends. Algorithm taken from https://en.wikipedia.org/wiki/Exponential_smoothing titled: "Double exponential smoothing". -func funcHoltWinters(ev *evaluator, args Expressions) Value { - mat := ev.evalMatrix(args[0]) +func funcHoltWinters(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + mat := vals[0].(Matrix) // The smoothing factor argument. - sf := ev.evalFloat(args[1]) + sf := vals[1].(Vector)[0].V // The trend factor argument. - tf := ev.evalFloat(args[2]) + tf := vals[2].(Vector)[0].V // Sanity check the input. if sf <= 0 || sf >= 1 { - ev.errorf("invalid smoothing factor. Expected: 0 < sf < 1 goT: %f", sf) + panic(fmt.Errorf("invalid smoothing factor. Expected: 0 < sf < 1 goT: %f", sf)) } if tf <= 0 || tf >= 1 { - ev.errorf("invalid trend factor. Expected: 0 < tf < 1 goT: %f", sf) + panic(fmt.Errorf("invalid trend factor. Expected: 0 < tf < 1 goT: %f", sf)) } - // Make an output Vector large enough to hold the entire result. - resultVector := make(Vector, 0, len(mat)) - - // Create scratch values. - var s, b, d []float64 - var l int for _, samples := range mat { l = len(samples.Points) @@ -246,144 +247,130 @@ func funcHoltWinters(ev *evaluator, args Expressions) Value { continue } - // Resize scratch values. - if l != len(s) { - s = make([]float64, l) - b = make([]float64, l) - d = make([]float64, l) - } - - // Fill in the d values with the raw values from the input. - for i, v := range samples.Points { - d[i] = v.V - } - + var s0, s1, b float64 // Set initial values. - s[0] = d[0] - b[0] = d[1] - d[0] + s1 = samples.Points[0].V + b = samples.Points[1].V - samples.Points[0].V // Run the smoothing operation. var x, y float64 - for i := 1; i < len(d); i++ { + for i := 1; i < l; i++ { // Scale the raw value against the smoothing factor. - x = sf * d[i] + x = sf * samples.Points[i].V // Scale the last smoothed value with the trend at this point. - y = (1 - sf) * (s[i-1] + calcTrendValue(i-1, sf, tf, s, b, d)) + b = calcTrendValue(i-1, sf, tf, s0, s1, b) + y = (1 - sf) * (s1 + b) - s[i] = x + y + s0, s1 = s1, x+y } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: s[len(s)-1], T: ev.Timestamp}, // The last value in the Vector is the smoothed result. + enh.out = append(enh.out, Sample{ + Point: Point{V: s1}, }) } - return resultVector + return enh.out } // === sort(node ValueTypeVector) Vector === -func funcSort(ev *evaluator, args Expressions) Value { +func funcSort(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { // NaN should sort to the bottom, so take descending sort with NaN first and // reverse it. - byValueSorter := vectorByReverseValueHeap(ev.evalVector(args[0])) + byValueSorter := vectorByReverseValueHeap(vals[0].(Vector)) sort.Sort(sort.Reverse(byValueSorter)) return Vector(byValueSorter) } // === sortDesc(node ValueTypeVector) Vector === -func funcSortDesc(ev *evaluator, args Expressions) Value { +func funcSortDesc(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { // NaN should sort to the bottom, so take ascending sort with NaN first and // reverse it. - byValueSorter := vectorByValueHeap(ev.evalVector(args[0])) + byValueSorter := vectorByValueHeap(vals[0].(Vector)) sort.Sort(sort.Reverse(byValueSorter)) return Vector(byValueSorter) } // === clamp_max(Vector ValueTypeVector, max Scalar) Vector === -func funcClampMax(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - max := ev.evalFloat(args[1]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Min(max, float64(el.V)) +func funcClampMax(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) + max := vals[1].(Vector)[0].Point.V + for _, el := range vec { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: math.Min(max, float64(el.V))}, + }) } - return vec + return enh.out } // === clamp_min(Vector ValueTypeVector, min Scalar) Vector === -func funcClampMin(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - min := ev.evalFloat(args[1]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Max(min, float64(el.V)) +func funcClampMin(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) + min := vals[1].(Vector)[0].Point.V + for _, el := range vec { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: math.Max(min, float64(el.V))}, + }) } - return vec + return enh.out } // === round(Vector ValueTypeVector, toNearest=1 Scalar) Vector === -func funcRound(ev *evaluator, args Expressions) Value { +func funcRound(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) // round returns a number rounded to toNearest. // Ties are solved by rounding up. toNearest := float64(1) if len(args) >= 2 { - toNearest = ev.evalFloat(args[1]) + toNearest = vals[1].(Vector)[0].Point.V } // Invert as it seems to cause fewer floating point accuracy issues. toNearestInverse := 1.0 / toNearest - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Floor(float64(el.V)*toNearestInverse+0.5) / toNearestInverse + for _, el := range vec { + v := math.Floor(float64(el.V)*toNearestInverse+0.5) / toNearestInverse + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: v}, + }) } - return vec + return enh.out } // === Scalar(node ValueTypeVector) Scalar === -func funcScalar(ev *evaluator, args Expressions) Value { - v := ev.evalVector(args[0]) +func funcScalar(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + v := vals[0].(Vector) if len(v) != 1 { - return Scalar{ - V: math.NaN(), - T: ev.Timestamp, - } - } - return Scalar{ - V: v[0].V, - T: ev.Timestamp, + return append(enh.out, Sample{ + Point: Point{V: math.NaN()}, + }) } + return append(enh.out, Sample{ + Point: Point{V: v[0].V}, + }) } -func aggrOverTime(ev *evaluator, args Expressions, aggrFn func([]Point) float64) Value { - mat := ev.evalMatrix(args[0]) - resultVector := Vector{} +func aggrOverTime(vals []Value, enh *EvalNodeHelper, aggrFn func([]Point) float64) Vector { + mat := vals[0].(Matrix) for _, el := range mat { if len(el.Points) == 0 { continue } - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(el.Metric), - Point: Point{V: aggrFn(el.Points), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: aggrFn(el.Points)}, }) } - return resultVector + return enh.out } // === avg_over_time(Matrix ValueTypeMatrix) Vector === -func funcAvgOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcAvgOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum float64 for _, v := range values { sum += v.V @@ -393,27 +380,16 @@ func funcAvgOverTime(ev *evaluator, args Expressions) Value { } // === count_over_time(Matrix ValueTypeMatrix) Vector === -func funcCountOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcCountOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { return float64(len(values)) }) } // === floor(Vector ValueTypeVector) Vector === -func funcFloor(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Floor(float64(el.V)) - } - return vec -} - // === max_over_time(Matrix ValueTypeMatrix) Vector === -func funcMaxOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcMaxOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { max := math.Inf(-1) for _, v := range values { max = math.Max(max, float64(v.V)) @@ -423,8 +399,8 @@ func funcMaxOverTime(ev *evaluator, args Expressions) Value { } // === min_over_time(Matrix ValueTypeMatrix) Vector === -func funcMinOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcMinOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { min := math.Inf(1) for _, v := range values { min = math.Min(min, float64(v.V)) @@ -434,8 +410,8 @@ func funcMinOverTime(ev *evaluator, args Expressions) Value { } // === sum_over_time(Matrix ValueTypeMatrix) Vector === -func funcSumOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcSumOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum float64 for _, v := range values { sum += v.V @@ -445,32 +421,29 @@ func funcSumOverTime(ev *evaluator, args Expressions) Value { } // === quantile_over_time(Matrix ValueTypeMatrix) Vector === -func funcQuantileOverTime(ev *evaluator, args Expressions) Value { - q := ev.evalFloat(args[0]) - mat := ev.evalMatrix(args[1]) - resultVector := Vector{} +func funcQuantileOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + q := vals[0].(Vector)[0].V + mat := vals[1].(Matrix) for _, el := range mat { if len(el.Points) == 0 { continue } - el.Metric = dropMetricName(el.Metric) values := make(vectorByValueHeap, 0, len(el.Points)) for _, v := range el.Points { values = append(values, Sample{Point: Point{V: v.V}}) } - resultVector = append(resultVector, Sample{ - Metric: el.Metric, - Point: Point{V: quantile(q, values), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: quantile(q, values)}, }) } - return resultVector + return enh.out } // === stddev_over_time(Matrix ValueTypeMatrix) Vector === -func funcStddevOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcStddevOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum, squaredSum, count float64 for _, v := range values { sum += v.V @@ -483,8 +456,8 @@ func funcStddevOverTime(ev *evaluator, args Expressions) Value { } // === stdvar_over_time(Matrix ValueTypeMatrix) Vector === -func funcStdvarOverTime(ev *evaluator, args Expressions) Value { - return aggrOverTime(ev, args, func(values []Point) float64 { +func funcStdvarOverTime(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return aggrOverTime(vals, enh, func(values []Point) float64 { var sum, squaredSum, count float64 for _, v := range values { sum += v.V @@ -496,22 +469,10 @@ func funcStdvarOverTime(ev *evaluator, args Expressions) Value { }) } -// === abs(Vector ValueTypeVector) Vector === -func funcAbs(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Abs(float64(el.V)) - } - return vec -} - // === absent(Vector ValueTypeVector) Vector === -func funcAbsent(ev *evaluator, args Expressions) Value { - if len(ev.evalVector(args[0])) > 0 { - return Vector{} +func funcAbsent(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + if len(vals[0].(Vector)) > 0 { + return enh.out } m := []labels.Label{} @@ -522,96 +483,73 @@ func funcAbsent(ev *evaluator, args Expressions) Value { } } } - return Vector{ + return append(enh.out, Sample{ Metric: labels.New(m...), - Point: Point{V: 1, T: ev.Timestamp}, - }, + Point: Point{V: 1}, + }) +} + +func simpleFunc(vals []Value, enh *EvalNodeHelper, f func(float64) float64) Vector { + for _, el := range vals[0].(Vector) { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: f(el.V)}, + }) } + return enh.out +} + +// === abs(Vector ValueTypeVector) Vector === +func funcAbs(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Abs) } // === ceil(Vector ValueTypeVector) Vector === -func funcCeil(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] +func funcCeil(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Ceil) +} - el.Metric = dropMetricName(el.Metric) - el.V = math.Ceil(float64(el.V)) - } - return vec +// === floor(Vector ValueTypeVector) Vector === +func funcFloor(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Floor) } // === exp(Vector ValueTypeVector) Vector === -func funcExp(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Exp(float64(el.V)) - } - return vec +func funcExp(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Exp) } // === sqrt(Vector VectorNode) Vector === -func funcSqrt(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Sqrt(float64(el.V)) - } - return vec +func funcSqrt(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Sqrt) } // === ln(Vector ValueTypeVector) Vector === -func funcLn(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Log(float64(el.V)) - } - return vec +func funcLn(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Log) } // === log2(Vector ValueTypeVector) Vector === -func funcLog2(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Log2(float64(el.V)) - } - return vec +func funcLog2(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Log2) } // === log10(Vector ValueTypeVector) Vector === -func funcLog10(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = math.Log10(float64(el.V)) - } - return vec +func funcLog10(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return simpleFunc(vals, enh, math.Log10) } // === timestamp(Vector ValueTypeVector) Vector === -func funcTimestamp(ev *evaluator, args Expressions) Value { - vec := ev.evalVector(args[0]) - for i := range vec { - el := &vec[i] - - el.Metric = dropMetricName(el.Metric) - el.V = float64(el.T) / 1000.0 +func funcTimestamp(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + vec := vals[0].(Vector) + for _, el := range vec { + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: float64(el.T) / 1000}, + }) } - return vec + return enh.out } // linearRegression performs a least-square linear regression analysis on the @@ -640,9 +578,8 @@ func linearRegression(samples []Point, interceptTime int64) (slope, intercept fl } // === deriv(node ValueTypeMatrix) Vector === -func funcDeriv(ev *evaluator, args Expressions) Value { - mat := ev.evalMatrix(args[0]) - resultVector := make(Vector, 0, len(mat)) +func funcDeriv(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + mat := vals[0].(Matrix) for _, samples := range mat { // No sense in trying to compute a derivative without at least two points. @@ -655,21 +592,17 @@ func funcDeriv(ev *evaluator, args Expressions) Value { // to avoid floating point accuracy issues, see // https://github.com/prometheus/prometheus/issues/2674 slope, _ := linearRegression(samples.Points, samples.Points[0].T) - resultSample := Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: slope, T: ev.Timestamp}, - } - - resultVector = append(resultVector, resultSample) + enh.out = append(enh.out, Sample{ + Point: Point{V: slope}, + }) } - return resultVector + return enh.out } // === predict_linear(node ValueTypeMatrix, k ValueTypeScalar) Vector === -func funcPredictLinear(ev *evaluator, args Expressions) Value { - mat := ev.evalMatrix(args[0]) - resultVector := make(Vector, 0, len(mat)) - duration := ev.evalFloat(args[1]) +func funcPredictLinear(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + mat := vals[0].(Matrix) + duration := vals[1].(Vector)[0].V for _, samples := range mat { // No sense in trying to predict anything without at least two points. @@ -677,23 +610,28 @@ func funcPredictLinear(ev *evaluator, args Expressions) Value { if len(samples.Points) < 2 { continue } - slope, intercept := linearRegression(samples.Points, ev.Timestamp) + slope, intercept := linearRegression(samples.Points, enh.ts) - resultVector = append(resultVector, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: slope*duration + intercept, T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: slope*duration + intercept}, }) } - return resultVector + return enh.out } // === histogram_quantile(k ValueTypeScalar, Vector ValueTypeVector) Vector === -func funcHistogramQuantile(ev *evaluator, args Expressions) Value { - q := ev.evalFloat(args[0]) - inVec := ev.evalVector(args[1]) +func funcHistogramQuantile(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + q := vals[0].(Vector)[0].V + inVec := vals[1].(Vector) + sigf := enh.signatureFunc(false, excludedLabels...) - outVec := Vector{} - signatureToMetricWithBuckets := map[uint64]*metricWithBuckets{} + if enh.signatureToMetricWithBuckets == nil { + enh.signatureToMetricWithBuckets = map[uint64]*metricWithBuckets{} + } else { + for _, v := range enh.signatureToMetricWithBuckets { + v.buckets = v.buckets[:0] + } + } for _, el := range inVec { upperBound, err := strconv.ParseFloat( el.Metric.Get(model.BucketLabel), 64, @@ -703,34 +641,35 @@ func funcHistogramQuantile(ev *evaluator, args Expressions) Value { // TODO(beorn7): Issue a warning somehow. continue } - hash := hashWithoutLabels(el.Metric, excludedLabels...) + hash := sigf(el.Metric) - mb, ok := signatureToMetricWithBuckets[hash] + mb, ok := enh.signatureToMetricWithBuckets[hash] if !ok { el.Metric = labels.NewBuilder(el.Metric). Del(labels.BucketLabel, labels.MetricName). Labels() mb = &metricWithBuckets{el.Metric, nil} - signatureToMetricWithBuckets[hash] = mb + enh.signatureToMetricWithBuckets[hash] = mb } mb.buckets = append(mb.buckets, bucket{upperBound, el.V}) } - for _, mb := range signatureToMetricWithBuckets { - outVec = append(outVec, Sample{ - Metric: mb.metric, - Point: Point{V: bucketQuantile(q, mb.buckets), T: ev.Timestamp}, - }) + for _, mb := range enh.signatureToMetricWithBuckets { + if len(mb.buckets) > 0 { + enh.out = append(enh.out, Sample{ + Metric: mb.metric, + Point: Point{V: bucketQuantile(q, mb.buckets)}, + }) + } } - return outVec + return enh.out } // === resets(Matrix ValueTypeMatrix) Vector === -func funcResets(ev *evaluator, args Expressions) Value { - in := ev.evalMatrix(args[0]) - out := make(Vector, 0, len(in)) +func funcResets(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + in := vals[0].(Matrix) for _, samples := range in { resets := 0 @@ -743,18 +682,16 @@ func funcResets(ev *evaluator, args Expressions) Value { prev = current } - out = append(out, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: float64(resets), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: float64(resets)}, }) } - return out + return enh.out } // === changes(Matrix ValueTypeMatrix) Vector === -func funcChanges(ev *evaluator, args Expressions) Value { - in := ev.evalMatrix(args[0]) - out := make(Vector, 0, len(in)) +func funcChanges(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + in := vals[0].(Matrix) for _, samples := range in { changes := 0 @@ -767,189 +704,214 @@ func funcChanges(ev *evaluator, args Expressions) Value { prev = current } - out = append(out, Sample{ - Metric: dropMetricName(samples.Metric), - Point: Point{V: float64(changes), T: ev.Timestamp}, + enh.out = append(enh.out, Sample{ + Point: Point{V: float64(changes)}, }) } - return out + return enh.out } // === label_replace(Vector ValueTypeVector, dst_label, replacement, src_labelname, regex ValueTypeString) Vector === -func funcLabelReplace(ev *evaluator, args Expressions) Value { +func funcLabelReplace(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { var ( - vector = ev.evalVector(args[0]) - dst = ev.evalString(args[1]).V - repl = ev.evalString(args[2]).V - src = ev.evalString(args[3]).V - regexStr = ev.evalString(args[4]).V + vector = vals[0].(Vector) + dst = args[1].(*StringLiteral).Val + repl = args[2].(*StringLiteral).Val + src = args[3].(*StringLiteral).Val + regexStr = args[4].(*StringLiteral).Val ) - regex, err := regexp.Compile("^(?:" + regexStr + ")$") - if err != nil { - ev.errorf("invalid regular expression in label_replace(): %s", regexStr) - } - if !model.LabelNameRE.MatchString(string(dst)) { - ev.errorf("invalid destination label name in label_replace(): %s", dst) + if enh.regex == nil { + var err error + enh.regex, err = regexp.Compile("^(?:" + regexStr + ")$") + if err != nil { + panic(fmt.Errorf("invalid regular expression in label_replace(): %s", regexStr)) + } + if !model.LabelNameRE.MatchString(string(dst)) { + panic(fmt.Errorf("invalid destination label name in label_replace(): %s", dst)) + } + enh.dmn = make(map[uint64]labels.Labels, len(enh.out)) } outSet := make(map[uint64]struct{}, len(vector)) - for i := range vector { - el := &vector[i] - - srcVal := el.Metric.Get(src) - indexes := regex.FindStringSubmatchIndex(srcVal) - // If there is no match, no replacement should take place. - if indexes == nil { - continue - } - res := regex.ExpandString([]byte{}, repl, srcVal, indexes) - - lb := labels.NewBuilder(el.Metric).Del(dst) - if len(res) > 0 { - lb.Set(dst, string(res)) + for _, el := range vector { + h := el.Metric.Hash() + var outMetric labels.Labels + if l, ok := enh.dmn[h]; ok { + outMetric = l + } else { + srcVal := el.Metric.Get(src) + indexes := enh.regex.FindStringSubmatchIndex(srcVal) + if indexes == nil { + // If there is no match, no replacement should take place. + outMetric = el.Metric + enh.dmn[h] = outMetric + } else { + res := enh.regex.ExpandString([]byte{}, repl, srcVal, indexes) + + lb := labels.NewBuilder(el.Metric).Del(dst) + if len(res) > 0 { + lb.Set(dst, string(res)) + } + outMetric = lb.Labels() + enh.dmn[h] = outMetric + } } - el.Metric = lb.Labels() - h := el.Metric.Hash() - if _, ok := outSet[h]; ok { - ev.errorf("duplicated label set in output of label_replace(): %s", el.Metric) + outHash := outMetric.Hash() + if _, ok := outSet[outHash]; ok { + panic(fmt.Errorf("duplicated label set in output of label_replace(): %s", el.Metric)) } else { - outSet[h] = struct{}{} + enh.out = append(enh.out, + Sample{ + Metric: outMetric, + Point: Point{V: el.Point.V}, + }) + outSet[outHash] = struct{}{} } } - - return vector + return enh.out } // === Vector(s Scalar) Vector === -func funcVector(ev *evaluator, args Expressions) Value { - return Vector{ +func funcVector(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return append(enh.out, Sample{ Metric: labels.Labels{}, - Point: Point{V: ev.evalFloat(args[0]), T: ev.Timestamp}, - }, - } + Point: Point{V: vals[0].(Vector)[0].V}, + }) } // === label_join(vector model.ValVector, dest_labelname, separator, src_labelname...) Vector === -func funcLabelJoin(ev *evaluator, args Expressions) Value { +func funcLabelJoin(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { var ( - vector = ev.evalVector(args[0]) - dst = ev.evalString(args[1]).V - sep = ev.evalString(args[2]).V + vector = vals[0].(Vector) + dst = args[1].(*StringLiteral).Val + sep = args[2].(*StringLiteral).Val srcLabels = make([]string, len(args)-3) ) + + if enh.dmn == nil { + enh.dmn = make(map[uint64]labels.Labels, len(enh.out)) + } + for i := 3; i < len(args); i++ { - src := ev.evalString(args[i]).V + src := args[i].(*StringLiteral).Val if !model.LabelName(src).IsValid() { - ev.errorf("invalid source label name in label_join(): %s", src) + panic(fmt.Errorf("invalid source label name in label_join(): %s", src)) } srcLabels[i-3] = src } if !model.LabelName(dst).IsValid() { - ev.errorf("invalid destination label name in label_join(): %s", dst) + panic(fmt.Errorf("invalid destination label name in label_join(): %s", dst)) } outSet := make(map[uint64]struct{}, len(vector)) - for i := range vector { - el := &vector[i] + srcVals := make([]string, len(srcLabels)) + for _, el := range vector { + h := el.Metric.Hash() + var outMetric labels.Labels + if l, ok := enh.dmn[h]; ok { + outMetric = l + } else { - srcVals := make([]string, len(srcLabels)) - for i, src := range srcLabels { - srcVals[i] = el.Metric.Get(src) - } + for i, src := range srcLabels { + srcVals[i] = el.Metric.Get(src) + } - lb := labels.NewBuilder(el.Metric) + lb := labels.NewBuilder(el.Metric) - strval := strings.Join(srcVals, sep) - if strval == "" { - lb.Del(dst) - } else { - lb.Set(dst, strval) - } + strval := strings.Join(srcVals, sep) + if strval == "" { + lb.Del(dst) + } else { + lb.Set(dst, strval) + } - el.Metric = lb.Labels() - h := el.Metric.Hash() + outMetric = lb.Labels() + enh.dmn[h] = outMetric + } + outHash := outMetric.Hash() - if _, exists := outSet[h]; exists { - ev.errorf("duplicated label set in output of label_join(): %s", el.Metric) + if _, exists := outSet[outHash]; exists { + panic(fmt.Errorf("duplicated label set in output of label_join(): %s", el.Metric)) } else { - outSet[h] = struct{}{} + enh.out = append(enh.out, Sample{ + Metric: outMetric, + Point: Point{V: el.Point.V}, + }) + outSet[outHash] = struct{}{} } } - return vector + return enh.out } // Common code for date related functions. -func dateWrapper(ev *evaluator, args Expressions, f func(time.Time) float64) Value { - var v Vector - if len(args) == 0 { - v = Vector{ +func dateWrapper(vals []Value, enh *EvalNodeHelper, f func(time.Time) float64) Vector { + if len(vals) == 0 { + return append(enh.out, Sample{ Metric: labels.Labels{}, - Point: Point{V: float64(ev.Timestamp) / 1000, T: ev.Timestamp}, - }, - } - } else { - v = ev.evalVector(args[0]) + Point: Point{V: f(time.Unix(enh.ts/1000, 0).UTC())}, + }) } - for i := range v { - el := &v[i] - el.Metric = dropMetricName(el.Metric) + for _, el := range vals[0].(Vector) { t := time.Unix(int64(el.V), 0).UTC() - el.V = f(t) + enh.out = append(enh.out, Sample{ + Metric: enh.dropMetricName(el.Metric), + Point: Point{V: f(t)}, + }) } - return v + return enh.out } // === days_in_month(v Vector) Scalar === -func funcDaysInMonth(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcDaysInMonth(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(32 - time.Date(t.Year(), t.Month(), 32, 0, 0, 0, 0, time.UTC).Day()) }) } // === day_of_month(v Vector) Scalar === -func funcDayOfMonth(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcDayOfMonth(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Day()) }) } // === day_of_week(v Vector) Scalar === -func funcDayOfWeek(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcDayOfWeek(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Weekday()) }) } // === hour(v Vector) Scalar === -func funcHour(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcHour(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Hour()) }) } // === minute(v Vector) Scalar === -func funcMinute(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcMinute(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Minute()) }) } // === month(v Vector) Scalar === -func funcMonth(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcMonth(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Month()) }) } // === year(v Vector) Scalar === -func funcYear(ev *evaluator, args Expressions) Value { - return dateWrapper(ev, args, func(t time.Time) float64 { +func funcYear(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return dateWrapper(vals, enh, func(t time.Time) float64 { return float64(t.Year()) }) } diff --git a/promql/functions_test.go b/promql/functions_test.go index eda22b375b6..19680eb7f76 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -23,63 +23,6 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) -func BenchmarkHoltWinters4Week5Min(b *testing.B) { - input := ` -clear -load 5m - http_requests{path="/foo"} 0+10x8064 - -eval instant at 4w holt_winters(http_requests[4w], 0.3, 0.3) - {path="/foo"} 80640 -` - - bench := NewBenchmark(b, input) - bench.Run() - -} - -func BenchmarkHoltWinters1Week5Min(b *testing.B) { - input := ` -clear -load 5m - http_requests{path="/foo"} 0+10x2016 - -eval instant at 1w holt_winters(http_requests[1w], 0.3, 0.3) - {path="/foo"} 20160 -` - - bench := NewBenchmark(b, input) - bench.Run() -} - -func BenchmarkHoltWinters1Day1Min(b *testing.B) { - input := ` -clear -load 1m - http_requests{path="/foo"} 0+10x1440 - -eval instant at 1d holt_winters(http_requests[1d], 0.3, 0.3) - {path="/foo"} 14400 -` - - bench := NewBenchmark(b, input) - bench.Run() -} - -func BenchmarkChanges1Day1Min(b *testing.B) { - input := ` -clear -load 1m - http_requests{path="/foo"} 0+10x1440 - -eval instant at 1d changes(http_requests[1d]) - {path="/foo"} 1440 -` - - bench := NewBenchmark(b, input) - bench.Run() -} - func TestDeriv(t *testing.T) { // https://github.com/prometheus/prometheus/issues/2674#issuecomment-315439393 // This requires more precision than the usual test system offers, diff --git a/promql/test.go b/promql/test.go index 6bd07cf7ec0..0b512881c6a 100644 --- a/promql/test.go +++ b/promql/test.go @@ -160,7 +160,7 @@ func (t *Test) parseEval(lines []string, i int) (int, *evalCmd, error) { } ts := testStartTime.Add(time.Duration(offset)) - cmd := newEvalCmd(expr, ts, ts, 0) + cmd := newEvalCmd(expr, ts) switch mod { case "ordered": cmd.ordered = true @@ -301,11 +301,9 @@ func (cmd *loadCmd) append(a storage.Appender) error { // evalCmd is a command that evaluates an expression for the given time (range) // and expects a specific result. type evalCmd struct { - expr string - start, end time.Time - interval time.Duration + expr string + start time.Time - instant bool fail, ordered bool metrics map[uint64]labels.Labels @@ -321,13 +319,10 @@ func (e entry) String() string { return fmt.Sprintf("%d: %s", e.pos, e.vals) } -func newEvalCmd(expr string, start, end time.Time, interval time.Duration) *evalCmd { +func newEvalCmd(expr string, start time.Time) *evalCmd { return &evalCmd{ - expr: expr, - start: start, - end: end, - interval: interval, - instant: start == end && interval == 0, + expr: expr, + start: start, metrics: map[uint64]labels.Labels{}, expected: map[uint64]entry{}, @@ -354,37 +349,9 @@ func (ev *evalCmd) expect(pos int, m labels.Labels, vals ...sequenceValue) { func (ev *evalCmd) compareResult(result Value) error { switch val := result.(type) { case Matrix: - if ev.instant { - return fmt.Errorf("received range result on instant evaluation") - } - seen := map[uint64]bool{} - for pos, v := range val { - fp := v.Metric.Hash() - if _, ok := ev.metrics[fp]; !ok { - return fmt.Errorf("unexpected metric %s in result", v.Metric) - } - exp := ev.expected[fp] - if ev.ordered && exp.pos != pos+1 { - return fmt.Errorf("expected metric %s with %v at position %d but was at %d", v.Metric, exp.vals, exp.pos, pos+1) - } - for i, expVal := range exp.vals { - if !almostEqual(expVal.value, v.Points[i].V) { - return fmt.Errorf("expected %v for %s but got %v", expVal, v.Metric, v.Points) - } - } - seen[fp] = true - } - for fp, expVals := range ev.expected { - if !seen[fp] { - return fmt.Errorf("expected metric %s with %v not found", ev.metrics[fp], expVals) - } - } + return fmt.Errorf("received range result on instant evaluation") case Vector: - if !ev.instant { - return fmt.Errorf("received instant result on range evaluation") - } - seen := map[uint64]bool{} for pos, v := range val { fp := v.Metric.Hash() @@ -464,8 +431,7 @@ func (t *Test) exec(tc testCommand) error { } case *evalCmd: - qry, _ := ParseExpr(cmd.expr) - q := t.queryEngine.newQuery(t.storage, qry, cmd.start, cmd.end, cmd.interval) + q, _ := t.queryEngine.NewInstantQuery(t.storage, cmd.expr, cmd.start) res := q.Exec(t.context) if res.Err != nil { if cmd.fail { @@ -473,6 +439,7 @@ func (t *Test) exec(tc testCommand) error { } return fmt.Errorf("error evaluating query %q: %s", cmd.expr, res.Err) } + defer q.Close() if res.Err == nil && cmd.fail { return fmt.Errorf("expected error evaluating query but got none") } @@ -482,6 +449,37 @@ func (t *Test) exec(tc testCommand) error { return fmt.Errorf("error in %s %s: %s", cmd, cmd.expr, err) } + // Check query returns same result in range mode, + /// by checking against the middle step. + q, _ = t.queryEngine.NewRangeQuery(t.storage, cmd.expr, cmd.start.Add(-time.Minute), cmd.start.Add(time.Minute), time.Minute) + rangeRes := q.Exec(t.context) + if rangeRes.Err != nil { + return fmt.Errorf("error evaluating query %q in range mode: %s", cmd.expr, rangeRes.Err) + } + defer q.Close() + if cmd.ordered { + // Ordering isn't defined for range queries. + return nil + } + mat := rangeRes.Value.(Matrix) + vec := make(Vector, 0, len(mat)) + for _, series := range mat { + for _, point := range series.Points { + if point.T == timeMilliseconds(cmd.start) { + vec = append(vec, Sample{Metric: series.Metric, Point: point}) + break + } + } + } + if _, ok := res.Value.(Scalar); ok { + err = cmd.compareResult(Scalar{V: vec[0].Point.V}) + } else { + err = cmd.compareResult(vec) + } + if err != nil { + return fmt.Errorf("error in %s %s rande mode: %s", cmd, cmd.expr, err) + } + default: panic("promql.Test.exec: unknown test command type") } diff --git a/promql/test_test.go b/promql/test_test.go deleted file mode 100644 index 5de250749f7..00000000000 --- a/promql/test_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2015 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, softwar -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package promql - -// RunAsBenchmark runs the test in benchmark mode. -// This will not count any loads or non eval functions. -func (t *Test) RunAsBenchmark(b *Benchmark) error { - for _, cmd := range t.cmds { - - switch cmd.(type) { - // Only time the "eval" command. - case *evalCmd: - err := t.exec(cmd) - if err != nil { - return err - } - default: - if b.iterCount == 0 { - b.b.StopTimer() - err := t.exec(cmd) - if err != nil { - return err - } - b.b.StartTimer() - } - } - } - return nil -} diff --git a/promql/testdata/operators.test b/promql/testdata/operators.test index 1adb598c165..7bac547801a 100644 --- a/promql/testdata/operators.test +++ b/promql/testdata/operators.test @@ -22,6 +22,19 @@ eval instant at 50m 2 - SUM(http_requests) BY (job) {job="api-server"} -998 {job="app-server"} -2598 +eval instant at 50m -http_requests{job="api-server",instance="0",group="production"} + {job="api-server",instance="0",group="production"} -100 + +eval instant at 50m +http_requests{job="api-server",instance="0",group="production"} + http_requests{job="api-server",instance="0",group="production"} 100 + +eval instant at 50m - - - SUM(http_requests) BY (job) + {job="api-server"} -1000 + {job="app-server"} -2600 + +eval instant at 50m - - - 1 + -1 + eval instant at 50m 1000 / SUM(http_requests) BY (job) {job="api-server"} 1 {job="app-server"} 0.38461538461538464 diff --git a/storage/buffer.go b/storage/buffer.go index 77476c61460..7df4027c555 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -30,23 +30,30 @@ type BufferedSeriesIterator struct { // of the current element and the duration of delta before. func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { bit := &BufferedSeriesIterator{ - it: it, - buf: newSampleRing(delta, 16), - lastTime: math.MinInt64, - ok: true, + buf: newSampleRing(delta, 16), } - it.Next() + bit.Reset(it) return bit } +// Reset re-uses the buffer with a new iterator. +func (b *BufferedSeriesIterator) Reset(it SeriesIterator) { + b.it = it + b.lastTime = math.MinInt64 + b.ok = true + b.buf.reset() + it.Next() +} + // PeekBack returns the nth previous element of the iterator. If there is none buffered, // ok is false. func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, ok bool) { return b.buf.nthLast(n) } -// Buffer returns an iterator over the buffered data. +// Buffer returns an iterator over the buffered data. Invalidates previously +// returned iterators. func (b *BufferedSeriesIterator) Buffer() SeriesIterator { return b.buf.iterator() } @@ -118,6 +125,8 @@ type sampleRing struct { i int // position of most recent element in ring buffer f int // position of first element in ring buffer l int // number of elements in buffer + + it sampleRingIterator } func newSampleRing(delta int64, sz int) *sampleRing { @@ -133,8 +142,11 @@ func (r *sampleRing) reset() { r.f = 0 } +// Returns the current iterator. Invalidates previously retuned iterators. func (r *sampleRing) iterator() SeriesIterator { - return &sampleRingIterator{r: r, i: -1} + r.it.r = r + r.it.i = -1 + return &r.it } type sampleRingIterator struct { diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index 9c0c42b325c..3fd593cb924 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -23,7 +23,6 @@ const ( ResultSortTime QueryPreparationTime InnerEvalTime - ResultAppendTime ExecQueueTime ExecTotalTime ) @@ -39,8 +38,6 @@ func (s QueryTiming) String() string { return "Query preparation time" case InnerEvalTime: return "Inner eval time" - case ResultAppendTime: - return "Result append time" case ExecQueueTime: return "Exec queue wait time" case ExecTotalTime: @@ -56,7 +53,6 @@ type queryTimings struct { ResultSortTime float64 `json:"resultSortTime"` QueryPreparationTime float64 `json:"queryPreparationTime"` InnerEvalTime float64 `json:"innerEvalTime"` - ResultAppendTime float64 `json:"resultAppendTime"` ExecQueueTime float64 `json:"execQueueTime"` ExecTotalTime float64 `json:"execTotalTime"` } @@ -81,8 +77,6 @@ func NewQueryStats(tg *TimerGroup) *QueryStats { qt.QueryPreparationTime = timer.Duration() case InnerEvalTime: qt.InnerEvalTime = timer.Duration() - case ResultAppendTime: - qt.ResultAppendTime = timer.Duration() case ExecQueueTime: qt.ExecQueueTime = timer.Duration() case ExecTotalTime: diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 503d6687258..c9f5ebe0259 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -105,7 +105,7 @@ func setCORS(w http.ResponseWriter) { } } -type apiFunc func(r *http.Request) (interface{}, *apiError) +type apiFunc func(r *http.Request) (interface{}, *apiError, func()) // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. @@ -156,13 +156,17 @@ func (api *API) Register(r *route.Router) { wrap := func(f apiFunc) http.HandlerFunc { hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { setCORS(w) - if data, err := f(r); err != nil { + data, err, finalizer := f(r) + if err != nil { respondError(w, err, data) } else if data != nil { respond(w, data) } else { w.WriteHeader(http.StatusNoContent) } + if finalizer != nil { + finalizer() + } }) return api.ready(httputil.CompressionHandler{ Handler: hf, @@ -200,17 +204,17 @@ type queryData struct { Stats *stats.QueryStats `json:"stats,omitempty"` } -func (api *API) options(r *http.Request) (interface{}, *apiError) { - return nil, nil +func (api *API) options(r *http.Request) (interface{}, *apiError, func()) { + return nil, nil, nil } -func (api *API) query(r *http.Request) (interface{}, *apiError) { +func (api *API) query(r *http.Request) (interface{}, *apiError, func()) { var ts time.Time if t := r.FormValue("time"); t != "" { var err error ts, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { ts = api.now() @@ -221,7 +225,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -230,20 +234,20 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: - return nil, &apiError{errorCanceled, res.Err} + return nil, &apiError{errorCanceled, res.Err}, qry.Close case promql.ErrQueryTimeout: - return nil, &apiError{errorTimeout, res.Err} + return nil, &apiError{errorTimeout, res.Err}, qry.Close case promql.ErrStorage: - return nil, &apiError{errorInternal, res.Err} + return nil, &apiError{errorInternal, res.Err}, qry.Close } - return nil, &apiError{errorExec, res.Err} + return nil, &apiError{errorExec, res.Err}, qry.Close } // Optional stats field in response if parameter "stats" is not empty. @@ -256,38 +260,38 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil + }, nil, qry.Close } -func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { +func (api *API) queryRange(r *http.Request) (interface{}, *apiError, func()) { start, err := parseTime(r.FormValue("start")) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } end, err := parseTime(r.FormValue("end")) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } if end.Before(start) { err := errors.New("end timestamp must not be before start time") - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } step, err := parseDuration(r.FormValue("step")) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } if step <= 0 { err := errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. if end.Sub(start)/step > 11000 { err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } ctx := r.Context() @@ -295,7 +299,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -304,18 +308,18 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: - return nil, &apiError{errorCanceled, res.Err} + return nil, &apiError{errorCanceled, res.Err}, qry.Close case promql.ErrQueryTimeout: - return nil, &apiError{errorTimeout, res.Err} + return nil, &apiError{errorTimeout, res.Err}, qry.Close } - return nil, &apiError{errorExec, res.Err} + return nil, &apiError{errorExec, res.Err}, qry.Close } // Optional stats field in response if parameter "stats" is not empty. @@ -328,28 +332,28 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - }, nil + }, nil, qry.Close } -func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { +func (api *API) labelValues(r *http.Request) (interface{}, *apiError, func()) { ctx := r.Context() name := route.Param(ctx, "name") if !model.LabelNameRE.MatchString(name) { - return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} + return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}, nil } q, err := api.Queryable.Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } defer q.Close() vals, err := q.LabelValues(name) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } - return vals, nil + return vals, nil, nil } var ( @@ -357,10 +361,10 @@ var ( maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999) ) -func (api *API) series(r *http.Request) (interface{}, *apiError) { +func (api *API) series(r *http.Request) (interface{}, *apiError, func()) { r.ParseForm() if len(r.Form["match[]"]) == 0 { - return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} + return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil } var start time.Time @@ -368,7 +372,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var err error start, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { start = minTime @@ -379,7 +383,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { var err error end, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { end = maxTime @@ -389,14 +393,14 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } matcherSets = append(matcherSets, matchers) } q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } defer q.Close() @@ -404,7 +408,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { for _, mset := range matcherSets { s, err := q.Select(nil, mset...) if err != nil { - return nil, &apiError{errorExec, err} + return nil, &apiError{errorExec, err}, nil } sets = append(sets, s) } @@ -415,14 +419,14 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { metrics = append(metrics, set.At().Labels()) } if set.Err() != nil { - return nil, &apiError{errorExec, set.Err()} + return nil, &apiError{errorExec, set.Err()}, nil } - return metrics, nil + return metrics, nil, nil } -func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { - return nil, &apiError{errorInternal, fmt.Errorf("not implemented")} +func (api *API) dropSeries(r *http.Request) (interface{}, *apiError, func()) { + return nil, &apiError{errorInternal, fmt.Errorf("not implemented")}, nil } // Target has the information for one target. @@ -451,7 +455,7 @@ type TargetDiscovery struct { DroppedTargets []*DroppedTarget `json:"droppedTargets"` } -func (api *API) targets(r *http.Request) (interface{}, *apiError) { +func (api *API) targets(r *http.Request) (interface{}, *apiError, func()) { tActive := api.targetRetriever.TargetsActive() tDropped := api.targetRetriever.TargetsDropped() res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))} @@ -479,7 +483,7 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) { DiscoveredLabels: t.DiscoveredLabels().Map(), } } - return res, nil + return res, nil, nil } // AlertmanagerDiscovery has all the active Alertmanagers. @@ -493,7 +497,7 @@ type AlertmanagerTarget struct { URL string `json:"url"` } -func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError) { +func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError, func()) { urls := api.alertmanagerRetriever.Alertmanagers() droppedURLS := api.alertmanagerRetriever.DroppedAlertmanagers() ams := &AlertmanagerDiscovery{ActiveAlertmanagers: make([]*AlertmanagerTarget, len(urls)), DroppedAlertmanagers: make([]*AlertmanagerTarget, len(droppedURLS))} @@ -503,22 +507,22 @@ func (api *API) alertmanagers(r *http.Request) (interface{}, *apiError) { for i, url := range droppedURLS { ams.DroppedAlertmanagers[i] = &AlertmanagerTarget{URL: url.String()} } - return ams, nil + return ams, nil, nil } type prometheusConfig struct { YAML string `json:"yaml"` } -func (api *API) serveConfig(r *http.Request) (interface{}, *apiError) { +func (api *API) serveConfig(r *http.Request) (interface{}, *apiError, func()) { cfg := &prometheusConfig{ YAML: api.config().String(), } - return cfg, nil + return cfg, nil, nil } -func (api *API) serveFlags(r *http.Request) (interface{}, *apiError) { - return api.flagsMap, nil +func (api *API) serveFlags(r *http.Request) (interface{}, *apiError, func()) { + return api.flagsMap, nil, nil } func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { @@ -598,18 +602,18 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { } } -func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { +func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError, func()) { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")} + return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil } db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")} + return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil } r.ParseForm() if len(r.Form["match[]"]) == 0 { - return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} + return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")}, nil } var start time.Time @@ -617,7 +621,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { var err error start, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { start = minTime @@ -628,7 +632,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { var err error end, err = parseTime(t) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } } else { end = maxTime @@ -637,7 +641,7 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { - return nil, &apiError{errorBadData, err} + return nil, &apiError{errorBadData, err}, nil } var selector tsdbLabels.Selector @@ -646,22 +650,22 @@ func (api *API) deleteSeries(r *http.Request) (interface{}, *apiError) { } if err := db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), selector...); err != nil { - return nil, &apiError{errorInternal, err} + return nil, &apiError{errorInternal, err}, nil } } - return nil, nil + return nil, nil, nil } -func (api *API) snapshot(r *http.Request) (interface{}, *apiError) { +func (api *API) snapshot(r *http.Request) (interface{}, *apiError, func()) { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")} + return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil } skipHead, _ := strconv.ParseBool(r.FormValue("skip_head")) db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")} + return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil } var ( @@ -672,31 +676,31 @@ func (api *API) snapshot(r *http.Request) (interface{}, *apiError) { dir = filepath.Join(snapdir, name) ) if err := os.MkdirAll(dir, 0777); err != nil { - return nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)} + return nil, &apiError{errorInternal, fmt.Errorf("create snapshot directory: %s", err)}, nil } if err := db.Snapshot(dir, !skipHead); err != nil { - return nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)} + return nil, &apiError{errorInternal, fmt.Errorf("create snapshot: %s", err)}, nil } return struct { Name string `json:"name"` - }{name}, nil + }{name}, nil, nil } -func (api *API) cleanTombstones(r *http.Request) (interface{}, *apiError) { +func (api *API) cleanTombstones(r *http.Request) (interface{}, *apiError, func()) { if !api.enableAdmin { - return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")} + return nil, &apiError{errorUnavailable, errors.New("Admin APIs disabled")}, nil } db := api.db() if db == nil { - return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")} + return nil, &apiError{errorUnavailable, errors.New("TSDB not ready")}, nil } if err := db.CleanTombstones(); err != nil { - return nil, &apiError{errorInternal, err} + return nil, &apiError{errorInternal, err}, nil } - return nil, nil + return nil, nil, nil } func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index d935f12ed02..06e1a4ac635 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -530,7 +530,7 @@ func TestEndpoints(t *testing.T) { if err != nil { t.Fatal(err) } - resp, apiErr := test.endpoint(req.WithContext(ctx)) + resp, apiErr, _ := test.endpoint(req.WithContext(ctx)) if apiErr != nil { if test.errType == errorNone { t.Fatalf("Unexpected error: %s", apiErr)