Skip to content

Commit

Permalink
Implement xrate/xincrease/xdelta functions, as per prometheus#3746
Browse files Browse the repository at this point in the history
Signed-off-by: Sylvain Rabot <s.rabot@lectra.com>
  • Loading branch information
free authored and Sylvain Rabot committed May 10, 2019
1 parent d3245f1 commit 9d315c4
Show file tree
Hide file tree
Showing 3 changed files with 397 additions and 36 deletions.
107 changes: 81 additions & 26 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,24 +513,32 @@ func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration {

func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, storage.Warnings, error) {
var maxOffset time.Duration

Inspect(s.Expr, func(node Node, path []Node) error {
var nodeOffset time.Duration
subqOffset := ng.cumulativeSubqueryOffset(path)
switch n := node.(type) {
case *VectorSelector:
if maxOffset < LookbackDelta+subqOffset {
maxOffset = LookbackDelta + subqOffset
}
if n.Offset+LookbackDelta+subqOffset > maxOffset {
maxOffset = n.Offset + LookbackDelta + subqOffset
nodeOffset += LookbackDelta + subqOffset
if n.Offset > 0 {
nodeOffset += n.Offset
}
case *MatrixSelector:
if maxOffset < n.Range+subqOffset {
maxOffset = n.Range + subqOffset
nodeOffset += n.Range + subqOffset
if n.Offset > 0 {
nodeOffset += n.Offset
}
if n.Offset+n.Range+subqOffset > maxOffset {
maxOffset = n.Offset + n.Range + subqOffset
// Include an extra LookbackDelta iff this is the argument to an
// extended range function. Extended ranges include one extra
// point, this is how far back we need to look for it.
f, ok := getFunction(extractFuncFromPath(path))
if ok && f.ExtRange {
nodeOffset += LookbackDelta
}
}
if maxOffset < nodeOffset {
maxOffset = nodeOffset
}
return nil
})

Expand Down Expand Up @@ -583,6 +591,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
params.Start = params.Start - durationMilliseconds(n.Range)
// Include an extra LookbackDelta iff this is the argument to an
// extended range function. Extended ranges include one extra
// point, this is how far back we need to look for it.
f, ok := getFunction(params.Func)
if ok && f.ExtRange {
params.Start = params.Start - durationMilliseconds(LookbackDelta)
}
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds
Expand Down Expand Up @@ -979,7 +994,15 @@ func (ev *evaluator) eval(expr Expr) Value {
mat := make(Matrix, 0, len(sel.series)) // Output matrix.
offset := durationMilliseconds(sel.Offset)
selRange := durationMilliseconds(sel.Range)
bufferRange := selRange
stepRange := selRange
// Include an extra LookbackDelta iff this is an extended
// range function. Extended ranges include one extra point,
// this is how far back we need to look for it.
if e.Func.ExtRange {
bufferRange += durationMilliseconds(LookbackDelta)
stepRange += durationMilliseconds(LookbackDelta)
}
if stepRange > ev.interval {
stepRange = ev.interval
}
Expand All @@ -989,7 +1012,7 @@ func (ev *evaluator) eval(expr Expr) Value {
inArgs[matrixArgIndex] = inMatrix
enh := &EvalNodeHelper{out: make(Vector, 0, 1)}
// Process all the calls for one time series at a time.
it := storage.NewBuffer(selRange)
it := storage.NewBuffer(bufferRange)
for i, s := range sel.series {
points = points[:0]
it.Reset(s.Iterator())
Expand All @@ -1014,7 +1037,7 @@ func (ev *evaluator) eval(expr Expr) Value {
maxt := ts - offset
mint := maxt - selRange
// Evaluate the matrix selector for this series for this step.
points = ev.matrixIterSlice(it, mint, maxt, points)
points = ev.matrixIterSlice(it, mint, maxt, e.Func.ExtRange, points)
if len(points) == 0 {
continue
}
Expand Down Expand Up @@ -1271,7 +1294,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
Metric: node.series[i].Labels(),
}

ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
ss.Points = ev.matrixIterSlice(it, mint, maxt, false, getPointSlice(16))

if len(ss.Points) > 0 {
matrix = append(matrix, ss)
Expand All @@ -1287,23 +1310,38 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
//
// As an optimization, the matrix vector may already contain points of the same
// time series from the evaluation of an earlier step (with lower mint and maxt
// values). Any such points falling before mint are discarded; points that fall
// into the [mint, maxt] range are retained; only points with later timestamps
// are populated from the iterator.
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, out []Point) []Point {
if len(out) > 0 && out[len(out)-1].T >= mint {
// values). Any such points falling before mint (except the last, when extRange
// is true) are discarded; points that fall into the [mint, maxt] range are
// retained; only points with later timestamps are populated from the iterator.
func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, maxt int64, extRange bool, out []Point) []Point {
extMint := mint - durationMilliseconds(LookbackDelta)
if len(out) > 0 && (out[len(out)-1].T >= mint || (extRange && out[len(out)-1].T >= extMint)) {
// There is an overlap between previous and current ranges, retain common
// points. In most such cases:
// (a) the overlap is significantly larger than the eval step; and/or
// (b) the number of samples is relatively small.
// so a linear search will be as fast as a binary search.
var drop int
for drop = 0; out[drop].T < mint; drop++ {
if !extRange {
for drop = 0; out[drop].T < mint; drop++ {
}
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
} else {
// This is an argument to an extended range function, first go past mint.
for drop = 0; drop < len(out) && out[drop].T <= mint; drop++ {
}
// Then, go back one sample if within LookbackDelta of mint.
if drop > 0 && out[drop-1].T >= extMint {
drop--
}
if out[len(out)-1].T >= mint {
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
}
}
copy(out, out[drop:])
out = out[:len(out)-drop]
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
} else {
out = out[:0]
}
Expand All @@ -1316,18 +1354,35 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
}

buf := it.Buffer()
appendedPointBeforeMint := len(out) > 0
for buf.Next() {
t, v := buf.At()
if value.IsStaleNaN(v) {
continue
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
if !extRange {
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
}
} else {
// This is the argument to an extended range function: if any point
// exists at or before range start, add it and then keep replacing
// it with later points while not yet (strictly) inside the range.
if t > mint || !appendedPointBeforeMint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
appendedPointBeforeMint = true
} else {
out[len(out)-1] = Point{T: t, V: v}
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
}
}
// The seeked sample might also be in the range.
Expand Down
122 changes: 122 additions & 0 deletions promql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package promql

import (
"fmt"
"math"
"os"
"regexp"
"sort"
"strconv"
Expand All @@ -34,6 +36,7 @@ type Function struct {
ArgTypes []ValueType
Variadic int
ReturnType ValueType
ExtRange bool

// vals is a list of the evaluated arguments for the function call.
// For range vectors it will be a Matrix with one series, instant vectors a
Expand Down Expand Up @@ -139,6 +142,71 @@ func extrapolatedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCou
return enh.out
}

func extendedRate(vals []Value, args Expressions, enh *EvalNodeHelper, isCounter bool, isRate bool) Vector {
ms := args[0].(*MatrixSelector)
matrix := vals[0].(Matrix)

var (
rangeStart = enh.ts - durationMilliseconds(ms.Range+ms.Offset)
rangeEnd = enh.ts - durationMilliseconds(ms.Offset)
)

for _, samples := range matrix {
points := samples.Points
if len(points) < 2 {
continue
}
sampledRange := float64(points[len(points)-1].T - points[0].T)
averageInterval := sampledRange / float64(len(points)-1)

firstPoint := 0
// If the point before the range is too far from rangeStart, drop it.
if float64(rangeStart-points[0].T) > averageInterval {
if len(points) < 3 {
continue
}
firstPoint = 1
sampledRange = float64(points[len(points)-1].T - points[1].T)
averageInterval = sampledRange / float64(len(points)-2)
}

var (
counterCorrection float64
lastValue float64
)
if isCounter {
for i := firstPoint; i < len(points); i++ {
sample := points[i]
if sample.V < lastValue {
counterCorrection += lastValue
}
lastValue = sample.V
}
}
resultValue := points[len(points)-1].V - points[firstPoint].V + counterCorrection

// Duration between last sample and boundary of range.
durationToEnd := float64(rangeEnd - points[len(points)-1].T)

// If the points cover the whole range (i.e. they start just before the
// range start and end just before the range end) adjust the value from
// the sampled range to the requested range.
if points[firstPoint].T <= rangeStart && durationToEnd < averageInterval {
adjustToRange := float64(durationMilliseconds(ms.Range))
resultValue = resultValue * (adjustToRange / sampledRange)
}

if isRate {
resultValue = resultValue / ms.Range.Seconds()
}

enh.out = append(enh.out, Sample{
Point: Point{V: resultValue},
})
}
return enh.out
}

// === delta(Matrix ValueTypeMatrix) Vector ===
func funcDelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
return extrapolatedRate(vals, args, enh, false, false)
Expand All @@ -154,6 +222,21 @@ func funcIncrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
return extrapolatedRate(vals, args, enh, true, false)
}

// === xdelta(Matrix ValueTypeMatrix) Vector ===
func funcXdelta(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, false, false)
}

// === xrate(node ValueTypeMatrix) Vector ===
func funcXrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, true, true)
}

// === xincrease(node ValueTypeMatrix) Vector ===
func funcXincrease(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
return extendedRate(vals, args, enh, true, false)
}

// === irate(node ValueTypeMatrix) Vector ===
func funcIrate(vals []Value, args Expressions, enh *EvalNodeHelper) Vector {
return instantValue(vals, enh.out, true)
Expand Down Expand Up @@ -1186,6 +1269,27 @@ var functions = map[string]*Function{
ReturnType: ValueTypeVector,
Call: funcVector,
},
"xdelta": {
Name: "xdelta",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
Call: funcXdelta,
ExtRange: true,
},
"xincrease": {
Name: "xincrease",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
Call: funcXincrease,
ExtRange: true,
},
"xrate": {
Name: "xrate",
ArgTypes: []ValueType{ValueTypeMatrix},
ReturnType: ValueTypeVector,
Call: funcXrate,
ExtRange: true,
},
"year": {
Name: "year",
ArgTypes: []ValueType{ValueTypeVector},
Expand All @@ -1195,6 +1299,24 @@ var functions = map[string]*Function{
},
}

func init() {
// REPLACE_RATE_FUNCS replaces the default rate extrapolation functions
// with xrate functions. This allows for a drop-in replacement and Grafana
// auto-completion, Prometheus tooling, Thanos, etc. should still work as expected.
if os.Getenv("REPLACE_RATE_FUNCS") == "1" {
functions["delta"] = functions["xdelta"]
functions["increase"] = functions["xincrease"]
functions["rate"] = functions["xrate"]
functions["delta"].Name = "delta"
functions["increase"].Name = "increase"
functions["rate"].Name = "rate"
delete(functions, "xdelta")
delete(functions, "xincrease")
delete(functions, "xrate")
fmt.Println("Successfully replaced rate & friends with xrate & friends (and removed xrate & friends function keys).")
}
}

// getFunction returns a predefined Function object for the given name.
func getFunction(name string) (*Function, bool) {
function, ok := functions[name]
Expand Down

0 comments on commit 9d315c4

Please sign in to comment.