diff --git a/promql/engine.go b/promql/engine.go index 283538ccd52..f01eb0f5c77 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -513,24 +513,32 @@ func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration { func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, storage.Warnings, error) { var maxOffset time.Duration + Inspect(s.Expr, func(node Node, path []Node) error { + var nodeOffset time.Duration subqOffset := ng.cumulativeSubqueryOffset(path) switch n := node.(type) { case *VectorSelector: - if maxOffset < LookbackDelta+subqOffset { - maxOffset = LookbackDelta + subqOffset - } - if n.Offset+LookbackDelta+subqOffset > maxOffset { - maxOffset = n.Offset + LookbackDelta + subqOffset + nodeOffset += LookbackDelta + subqOffset + if n.Offset > 0 { + nodeOffset += n.Offset } case *MatrixSelector: - if maxOffset < n.Range+subqOffset { - maxOffset = n.Range + subqOffset + nodeOffset += n.Range + subqOffset + if n.Offset > 0 { + nodeOffset += n.Offset } - if n.Offset+n.Range+subqOffset > maxOffset { - maxOffset = n.Offset + n.Range + subqOffset + // Include an extra LookbackDelta iff this is the argument to an + // extended range function. Extended ranges include one extra + // point, this is how far back we need to look for it. + f, ok := getFunction(extractFuncFromPath(path)) + if ok && f.ExtRange { + nodeOffset += LookbackDelta } } + if maxOffset < nodeOffset { + maxOffset = nodeOffset + } return nil }) @@ -583,6 +591,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev // For all matrix queries we want to ensure that we have (end-start) + range selected // this way we have `range` data before the start time params.Start = params.Start - durationMilliseconds(n.Range) + // Include an extra LookbackDelta iff this is the argument to an + // extended range function. Extended ranges include one extra + // point, this is how far back we need to look for it. + f, ok := getFunction(params.Func) + if ok && f.ExtRange { + params.Start = params.Start - durationMilliseconds(LookbackDelta) + } if n.Offset > 0 { offsetMilliseconds := durationMilliseconds(n.Offset) params.Start = params.Start - offsetMilliseconds @@ -979,7 +994,15 @@ func (ev *evaluator) eval(expr Expr) Value { mat := make(Matrix, 0, len(sel.series)) // Output matrix. offset := durationMilliseconds(sel.Offset) selRange := durationMilliseconds(sel.Range) + bufferRange := selRange stepRange := selRange + // Include an extra LookbackDelta iff this is an extended + // range function. Extended ranges include one extra point, + // this is how far back we need to look for it. + if e.Func.ExtRange { + bufferRange += durationMilliseconds(LookbackDelta) + stepRange += durationMilliseconds(LookbackDelta) + } if stepRange > ev.interval { stepRange = ev.interval } @@ -989,7 +1012,7 @@ func (ev *evaluator) eval(expr Expr) Value { inArgs[matrixArgIndex] = inMatrix enh := &EvalNodeHelper{out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. - it := storage.NewBuffer(selRange) + it := storage.NewBuffer(bufferRange) for i, s := range sel.series { points = points[:0] it.Reset(s.Iterator()) @@ -1014,7 +1037,7 @@ func (ev *evaluator) eval(expr Expr) Value { maxt := ts - offset mint := maxt - selRange // Evaluate the matrix selector for this series for this step. - points = ev.matrixIterSlice(it, mint, maxt, points) + points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points) if len(points) == 0 { continue } @@ -1271,7 +1294,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { Metric: node.series[i].Labels(), } - ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) + ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16)) if len(ss.Points) > 0 { matrix = append(matrix, ss) @@ -1287,23 +1310,38 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { // // As an optimization, the matrix vector may already contain points of the same // time series from the evaluation of an earlier step (with lower mint and maxt -// values). Any such points falling before mint are discarded; points that fall -// into the [mint, maxt] range are retained; only points with later timestamps -// are populated from the iterator. -func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point { - if len(out) > 0 && out[len(out)-1].T >= mint { +// values). Any such points falling before mint (except the last, when extRange +// is true) are discarded; points that fall into the [mint, maxt] range are +// retained; only points with later timestamps are populated from the iterator. +func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point) []Point { + extMint := mint - durationMilliseconds(LookbackDelta) + if len(out) > 0 && (out[len(out)-1].T >= mint || (extRange && out[len(out)-1].T >= extMint)) { // There is an overlap between previous and current ranges, retain common // points. In most such cases: // (a) the overlap is significantly larger than the eval step; and/or // (b) the number of samples is relatively small. // so a linear search will be as fast as a binary search. var drop int - for drop = 0; out[drop].T < mint; drop++ { + if !extRange { + for drop = 0; out[drop].T < mint; drop++ { + } + // Only append points with timestamps after the last timestamp we have. + mint = out[len(out)-1].T + 1 + } else { + // This is an argument to an extended range function, first go past mint. + for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ { + } + // Then, go back one sample if within LookbackDelta of mint. + if drop > 0 && out[drop-1].T >= extMint { + drop-- + } + if out[len(out)-1].T >= mint { + // Only append points with timestamps after the last timestamp we have. + mint = out[len(out)-1].T + 1 + } } copy(out, out[drop:]) out = out[:len(out)-drop] - // Only append points with timestamps after the last timestamp we have. - mint = out[len(out)-1].T + 1 } else { out = out[:0] } @@ -1316,18 +1354,35 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m } buf := it.Buffer() + appendedPointBeforeMint := len(out) > 0 for buf.Next() { t, v := buf.At() if value.IsStaleNaN(v) { continue } - // Values in the buffer are guaranteed to be smaller than maxt. - if t >= mint { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) + if !extRange { + // Values in the buffer are guaranteed to be smaller than maxt. + if t >= mint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, V: v}) + ev.currentSamples++ + } + } else { + // This is the argument to an extended range function: if any point + // exists at or before range start, add it and then keep replacing + // it with later points while not yet (strictly) inside the range. + if t > mint || !appendedPointBeforeMint { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + out = append(out, Point{T: t, V: v}) + ev.currentSamples++ + appendedPointBeforeMint = true + } else { + out[len(out)-1] = Point{T: t, V: v} } - out = append(out, Point{T: t, V: v}) - ev.currentSamples++ } } // The seeked sample might also be in the range. diff --git a/promql/functions.go b/promql/functions.go index 4bb3f8f436d..223d6dca7fd 100644 --- a/promql/functions.go +++ b/promql/functions.go @@ -14,7 +14,9 @@ package promql import ( + "fmt" "math" + "os" "regexp" "sort" "strconv" @@ -34,6 +36,7 @@ type Function struct { ArgTypes []ValueType Variadic int ReturnType ValueType + ExtRange bool // vals is a list of the evaluated arguments for the function call. // For range vectors it will be a Matrix with one series, instant vectors a @@ -139,6 +142,71 @@ func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCou return enh.out } +func extendedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector { + ms := args[0].(*MatrixSelector) + matrix := vals[0].(Matrix) + + var ( + rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset) + rangeEnd = enh.ts - durationMilliseconds(ms.Offset) + ) + + for _, samples := range matrix { + points := samples.Points + if len(points) < 2 { + continue + } + sampledRange := float64(points[len(points)-1].T - points[0].T) + averageInterval := sampledRange / float64(len(points)-1) + + firstPoint := 0 + // If the point before the range is too far from rangeStart, drop it. + if float64(rangeStart-points[0].T) > averageInterval { + if len(points) < 3 { + continue + } + firstPoint = 1 + sampledRange = float64(points[len(points)-1].T - points[1].T) + averageInterval = sampledRange / float64(len(points)-2) + } + + var ( + counterCorrection float64 + lastValue float64 + ) + if isCounter { + for i := firstPoint; i < len(points); i++ { + sample := points[i] + if sample.V < lastValue { + counterCorrection += lastValue + } + lastValue = sample.V + } + } + resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection + + // Duration between last sample and boundary of range. + durationToEnd := float64(rangeEnd - points[len(points)-1].T) + + // If the points cover the whole range (i.e. they start just before the + // range start and end just before the range end) adjust the value from + // the sampled range to the requested range. + if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval { + adjustToRange := float64(durationMilliseconds(ms.Range)) + resultValue = resultValue * (adjustToRange / sampledRange) + } + + if isRate { + resultValue = resultValue / ms.Range.Seconds() + } + + enh.out = append(enh.out, Sample{ + Point: Point{V: resultValue}, + }) + } + return enh.out +} + // === delta(Matrix ValueTypeMatrix) Vector === func funcDelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { return extrapolatedRate(vals, args, enh, false, false) @@ -154,6 +222,21 @@ func funcIncrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { return extrapolatedRate(vals, args, enh, true, false) } +// === xdelta(Matrix ValueTypeMatrix) Vector === +func funcXdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, false, false) +} + +// === xrate(node ValueTypeMatrix) Vector === +func funcXrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, true, true) +} + +// === xincrease(node ValueTypeMatrix) Vector === +func funcXincrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { + return extendedRate(vals, args, enh, true, false) +} + // === irate(node ValueTypeMatrix) Vector === func funcIrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector { return instantValue(vals, enh.out, true) @@ -1186,6 +1269,27 @@ var functions = map[string]*Function{ ReturnType: ValueTypeVector, Call: funcVector, }, + "xdelta": { + Name: "xdelta", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + Call: funcXdelta, + ExtRange: true, + }, + "xincrease": { + Name: "xincrease", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + Call: funcXincrease, + ExtRange: true, + }, + "xrate": { + Name: "xrate", + ArgTypes: []ValueType{ValueTypeMatrix}, + ReturnType: ValueTypeVector, + Call: funcXrate, + ExtRange: true, + }, "year": { Name: "year", ArgTypes: []ValueType{ValueTypeVector}, @@ -1195,6 +1299,24 @@ var functions = map[string]*Function{ }, } +func init() { + // REPLACE_RATE_FUNCS replaces the default rate extrapolation functions + // with xrate functions. This allows for a drop-in replacement and Grafana + // auto-completion, Prometheus tooling, Thanos, etc. should still work as expected. + if os.Getenv("REPLACE_RATE_FUNCS") == "1" { + functions["delta"] = functions["xdelta"] + functions["increase"] = functions["xincrease"] + functions["rate"] = functions["xrate"] + functions["delta"].Name = "delta" + functions["increase"].Name = "increase" + functions["rate"].Name = "rate" + delete(functions, "xdelta") + delete(functions, "xincrease") + delete(functions, "xrate") + fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).") + } +} + // getFunction returns a predefined Function object for the given name. func getFunction(name string) (*Function, bool) { function, ok := functions[name] diff --git a/promql/testdata/functions.test b/promql/testdata/functions.test index f4d8fed6cab..45d07d65bc7 100644 --- a/promql/testdata/functions.test +++ b/promql/testdata/functions.test @@ -1,3 +1,138 @@ +# Comparison of rate vs xrate. + +load 5s + http_requests{path="/foo"} 1 1 1 2 2 2 2 2 3 3 3 + http_requests{path="/bar"} 1 2 3 4 5 6 7 8 9 10 11 + + +# +# Timeseries starts inside range, (presumably) goes on after range end. +# + +# 1. Reference eval, aligned with collection. +eval instant at 25s rate(http_requests[50s]) + {path="/foo"} .022 + {path="/bar"} .12 + +eval instant at 25s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# 2. Eval 1 second earlier compared to (1). +# * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); +# * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). +# XXX Seeing ~20% jump for path="/foo" +eval instant at 24s rate(http_requests[50s]) + {path="/foo"} .0265 + {path="/bar"} .116 + +eval instant at 24s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .08 + +# 3. Eval 1 second later compared to (1). +# * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). +# * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). +# XXX Higher instead of lower for both. +eval instant at 26s rate(http_requests[50s]) + {path="/foo"} .0228 + {path="/bar"} .124 + +eval instant at 26s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + + +# +# Timeseries starts before range, ends within range. +# + +# 4. Reference eval, aligned with collection. +eval instant at 75s rate(http_requests[50s]) + {path="/foo"} .022 + {path="/bar"} .11 + +eval instant at 75s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# 5. Eval 1s earlier compared to (4). +# * path="/foo" rate should be same or fractionally lower ("longer" sample, same actual increase). +# * path="/bar" rate should be same or fractionally lower ("longer" sample, same actual increase). +# XXX Higher instead of lower for both. +eval instant at 74s rate(http_requests[50s]) + {path="/foo"} .0228 + {path="/bar"} .114 + +# XXX Higher instead of lower for {path="/bar"}. +eval instant at 74s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .12 + +# 6. Eval 1s later compared to (4). Rate/increase (should be) fractionally smaller. +# * path="/foo" rate should be same or fractionally higher ("shorter" sample, same actual increase); +# * path="/bar" rate should be same or fractionally lower (80% the increase, 80/96% range covered by sample). +# XXX Seeing ~20% jump for path="/foo", decrease instead of increase for path="/bar". +eval instant at 76s rate(http_requests[50s]) + {path="/foo"} .0265 + {path="/bar"} .106 + +eval instant at 76s xrate(http_requests[50s]) + {path="/foo"} .02 + {path="/bar"} .1 + +# +# Evaluation of 10 second rate every 10 seconds, not aligned with collection. +# + +eval instant at 9s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 19s rate(http_requests[10s]) + {path="/foo"} 0.2 + {path="/bar"} 0.2 + +eval instant at 29s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 39s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +# XXX Missed an increase in path="/foo" between timestamps 35 and 40 (both in this eval and the one before). +eval instant at 49s rate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 9s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.1 + +eval instant at 19s xrate(http_requests[10s]) + {path="/foo"} 0.1 + {path="/bar"} 0.2 + +eval instant at 29s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +eval instant at 39s xrate(http_requests[10s]) + {path="/foo"} 0 + {path="/bar"} 0.2 + +# XXX Sees the increase in path="/foo" between timestamps 35 and 40. +eval instant at 49s xrate(http_requests[10s]) + {path="/foo"} .1 + {path="/bar"} 0.2 + +clear + + + + + # Testdata for resets() and changes(). load 5m http_requests{path="/foo"} 1 2 3 0 1 0 0 1 2 0 @@ -62,23 +197,57 @@ eval instant at 15m changes(x[15m]) clear -# Tests for increase(). -load 5m +# Tests for increase()/xincrease()/xrate(). +load 5s http_requests{path="/foo"} 0+10x10 - http_requests{path="/bar"} 0+10x5 0+10x5 + http_requests{path="/bar"} 0+10x5 0+10x4 # Tests for increase(). -eval instant at 50m increase(http_requests[50m]) +eval instant at 50s increase(http_requests[50s]) + {path="/foo"} 100 + {path="/bar"} 90 + +eval instant at 50s increase(http_requests[100s]) {path="/foo"} 100 {path="/bar"} 90 -eval instant at 50m increase(http_requests[100m]) +# Tests for xincrease(). +eval instant at 50s xincrease(http_requests[50s]) {path="/foo"} 100 {path="/bar"} 90 +eval instant at 50s xincrease(http_requests[5s]) + {path="/foo"} 10 + {path="/bar"} 10 + +eval instant at 50s xincrease(http_requests[3s]) + {path="/foo"} 6 + {path="/bar"} 6 + +eval instant at 49s xincrease(http_requests[3s]) + +# Tests for xrate(). +eval instant at 50s xrate(http_requests[50s]) + {path="/foo"} 2 + {path="/bar"} 1.8 + +eval instant at 50s xrate(http_requests[100s]) + {path="/foo"} 1 + {path="/bar"} 0.9 + +eval instant at 50s xrate(http_requests[5s]) + {path="/foo"} 2 + {path="/bar"} 2 + +eval instant at 50s xrate(http_requests[3s]) + {path="/foo"} 2 + {path="/bar"} 2 + +eval instant at 49s xrate(http_requests[3s]) + clear -# Test for increase() with counter reset. +# Test for increase()/xincrease with counter reset. # When the counter is reset, it always starts at 0. # So the sequence 3 2 (decreasing counter = reset) is interpreted the same as 3 0 1 2. # Prometheus assumes it missed the intermediate values 0 and 1. @@ -86,7 +255,10 @@ load 5m http_requests{path="/foo"} 0 1 2 3 2 3 4 eval instant at 30m increase(http_requests[30m]) - {path="/foo"} 7 + {path="/foo"} 7 + +eval instant at 30m xincrease(http_requests[30m]) + {path="/foo"} 7 clear @@ -106,15 +278,27 @@ eval instant at 30m irate(http_requests[50m]) clear -# Tests for delta(). +# Tests for delta()/xdelta(). load 5m - http_requests{path="/foo"} 0 50 100 150 200 - http_requests{path="/bar"} 200 150 100 50 0 + http_requests{path="/foo"} 0 50 300 150 200 + http_requests{path="/bar"} 200 150 300 50 0 eval instant at 20m delta(http_requests[20m]) {path="/foo"} 200 {path="/bar"} -200 +eval instant at 20m xdelta(http_requests[20m]) + {path="/foo"} 200 + {path="/bar"} -200 + +eval instant at 20m xdelta(http_requests[19m]) + {path="/foo"} 190 + {path="/bar"} -190 + +eval instant at 20m xdelta(http_requests[1m]) + {path="/foo"} 10 + {path="/bar"} -10 + clear # Tests for idelta().