Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subquery #4831

Merged
merged 37 commits into from
Dec 22, 2018
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b4a0f85
Lexer for subquery
codesome Nov 3, 2018
ed23b8c
Parser for subquery based on unary expression
codesome Nov 4, 2018
687ff22
Unit tests for unary expression based subquery
codesome Nov 4, 2018
b30086f
Parser for subquery for all expressions
codesome Nov 5, 2018
4c74315
Merge remote-tracking branch 'upstream/master' into subqueries
codesome Nov 7, 2018
0a015fc
Partially working query engine for subqueries
codesome Nov 11, 2018
7d07e42
Subquery as function argument
codesome Nov 11, 2018
68d77cf
Optional subquery step
codesome Nov 12, 2018
d803fe8
Fix duplicate label set in vector
codesome Nov 13, 2018
7bae5d9
Extra check for start time of subquery evaluation
codesome Nov 13, 2018
f38ac99
Some simple tests for subquery
codesome Nov 13, 2018
1ea2e3d
Small nits
codesome Nov 15, 2018
f81427d
offset for subquery: lexer and parser
codesome Nov 16, 2018
2afa745
Separate test for subquery selector
codesome Nov 19, 2018
37751ae
offset for subquery: engine
codesome Nov 19, 2018
c5e2e2c
Align subquery startTimestamp with the step
codesome Nov 19, 2018
8ca45ca
Fix review comments
codesome Nov 21, 2018
683545d
Fix test
codesome Nov 21, 2018
814633c
Aligned step for newEv.startTimestamp
codesome Nov 23, 2018
3c0c12b
Carry forward currentSamples from parent evaluator
codesome Nov 27, 2018
24a6966
Properly aligned start time for subquery
codesome Nov 28, 2018
73dc306
Merge remote-tracking branch 'upstream/master' into subqueries
codesome Nov 28, 2018
25a1188
More unit tests
codesome Nov 28, 2018
70cb12b
Fix review comments
codesome Nov 30, 2018
5e42e92
Merge remote-tracking branch 'upstream/master' into subqueries
codesome Nov 30, 2018
6526971
Fix review comments
codesome Dec 4, 2018
130184b
Merge remote-tracking branch 'upstream/master' into subqueries
codesome Dec 4, 2018
e0e785d
subquery.test in testdata
codesome Dec 6, 2018
7dc326d
Fix review comments
codesome Dec 15, 2018
6f6cd06
Merge remote-tracking branch 'upstream/master' into subqueries
codesome Dec 15, 2018
a29b240
Update error message
codesome Dec 18, 2018
af74e46
Fix test
codesome Dec 18, 2018
300fdc6
Docs
codesome Dec 18, 2018
fd1b008
Fix Seek and added more tests
codesome Dec 19, 2018
5ff3f1a
Fix doc
codesome Dec 20, 2018
6dd9868
Merge remote-tracking branch 'upstream/master' into subqueries
codesome Dec 20, 2018
2acacd2
Update example in the doc
codesome Dec 21, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func main() {
}

promql.LookbackDelta = time.Duration(cfg.lookbackDelta)
promql.SetDefaultEvaluationInterval(time.Duration(config.DefaultGlobalConfig.EvaluationInterval))

logger := promlog.New(&cfg.promlogConfig)

Expand Down Expand Up @@ -654,6 +655,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
if failed {
return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}
promql.SetDefaultEvaluationInterval(time.Duration(conf.GlobalConfig.EvaluationInterval))
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions docs/querying/basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,20 @@ The same works for range vectors. This returns the 5-minutes rate that

rate(http_requests_total[5m] offset 1w)

## Subquery

Subquery allows you to run an instant query for a given range and resolution. Like how range vector selector is for vector selector, subquery is for an instant query. The result of a subquery is a matrix.
codesome marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison to range vector selectors is confusing. It'd be instant vector expressions.


Syntax: `<instant_query> '[' <range> ':' [<resolution>] ']' [ offset <duration> ]`

* `<resolution>` is optional. Default is the global evaluation interval.

Examples
codesome marked this conversation as resolved.
Show resolved Hide resolved

1. `rate(http_requests_total[5m])[30m:1m]` - This returns 5-minute rate of `http_requests_total` metric for the past 30 minutes, at a resolution of 1 minute.

2. `rate(sum_over_time(http_requests_total[30s:10s])[50s:])` - This is an example of nested subquery and using default resolution for outer subquery.
codesome marked this conversation as resolved.
Show resolved Hide resolved

## Operators

Prometheus supports many binary and aggregation operators. These are described
Expand Down
15 changes: 15 additions & 0 deletions promql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ type MatrixSelector struct {
series []storage.Series
}

// SubqueryExpr represents a subquery.
type SubqueryExpr struct {
Expr Expr
Range time.Duration
Offset time.Duration
Step time.Duration
}

// NumberLiteral represents a number.
type NumberLiteral struct {
Val float64
Expand Down Expand Up @@ -153,6 +161,7 @@ type VectorSelector struct {
func (e *AggregateExpr) Type() ValueType { return ValueTypeVector }
func (e *Call) Type() ValueType { return e.Func.ReturnType }
func (e *MatrixSelector) Type() ValueType { return ValueTypeMatrix }
func (e *SubqueryExpr) Type() ValueType { return ValueTypeMatrix }
func (e *NumberLiteral) Type() ValueType { return ValueTypeScalar }
func (e *ParenExpr) Type() ValueType { return e.Expr.Type() }
func (e *StringLiteral) Type() ValueType { return ValueTypeString }
Expand All @@ -169,6 +178,7 @@ func (*AggregateExpr) expr() {}
func (*BinaryExpr) expr() {}
func (*Call) expr() {}
func (*MatrixSelector) expr() {}
func (*SubqueryExpr) expr() {}
func (*NumberLiteral) expr() {}
func (*ParenExpr) expr() {}
func (*StringLiteral) expr() {}
Expand Down Expand Up @@ -267,6 +277,11 @@ func Walk(v Visitor, node Node, path []Node) error {
return err
}

case *SubqueryExpr:
if err := Walk(v, n.Expr, path); err != nil {
return err
}

case *ParenExpr:
if err := Walk(v, n.Expr, path); err != nil {
return err
Expand Down
149 changes: 118 additions & 31 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -51,6 +52,26 @@ const (
minInt64 = -9223372036854775808
)

var (
// LookbackDelta determines the time since the last sample after which a time
// series is considered stale.
LookbackDelta = 5 * time.Minute

// DefaultEvaluationInterval is the default evaluation interval of
// a subquery in milliseconds.
DefaultEvaluationInterval int64
)

// SetDefaultEvaluationInterval sets DefaultEvaluationInterval.
func SetDefaultEvaluationInterval(ev time.Duration) {
atomic.StoreInt64(&DefaultEvaluationInterval, durationToInt64Millis(ev))
}

// GetDefaultEvaluationInterval returns the DefaultEvaluationInterval as time.Duration.
func GetDefaultEvaluationInterval() int64 {
return atomic.LoadInt64(&DefaultEvaluationInterval)
}

type engineMetrics struct {
currentQueries prometheus.Gauge
maxConcurrentQueries prometheus.Gauge
Expand Down Expand Up @@ -404,12 +425,13 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
if s.Start == s.End && s.Interval == 0 {
start := timeMilliseconds(s.Start)
evaluator := &evaluator{
startTimestamp: start,
endTimestamp: start,
interval: 1,
ctx: ctx,
maxSamples: ng.maxSamplesPerQuery,
logger: ng.logger,
startTimestamp: start,
endTimestamp: start,
interval: 1,
ctx: ctx,
maxSamples: ng.maxSamplesPerQuery,
defaultEvalInterval: GetDefaultEvaluationInterval(),
logger: ng.logger,
}
val, err := evaluator.Eval(s.Expr)
if err != nil {
Expand Down Expand Up @@ -445,12 +467,13 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (

// Range evaluation.
evaluator := &evaluator{
startTimestamp: timeMilliseconds(s.Start),
endTimestamp: timeMilliseconds(s.End),
interval: durationMilliseconds(s.Interval),
ctx: ctx,
maxSamples: ng.maxSamplesPerQuery,
logger: ng.logger,
startTimestamp: timeMilliseconds(s.Start),
endTimestamp: timeMilliseconds(s.End),
interval: durationMilliseconds(s.Interval),
ctx: ctx,
maxSamples: ng.maxSamplesPerQuery,
defaultEvalInterval: GetDefaultEvaluationInterval(),
logger: ng.logger,
}
val, err := evaluator.Eval(s.Expr)
if err != nil {
Expand All @@ -477,23 +500,36 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return mat, nil, warnings
}

// cumulativeSubqueryOffset returns the sum of range and offset of all subqueries in the path.
func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration {
codesome marked this conversation as resolved.
Show resolved Hide resolved
var subqOffset time.Duration
for _, node := range path {
switch n := node.(type) {
case *SubqueryExpr:
subqOffset += n.Range + n.Offset
}
}
return subqOffset
}

func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error, storage.Warnings) {
var maxOffset time.Duration
Inspect(s.Expr, func(node Node, _ []Node) error {
Inspect(s.Expr, func(node Node, path []Node) error {
subqOffset := ng.cumulativeSubqueryOffset(path)
switch n := node.(type) {
case *VectorSelector:
if maxOffset < LookbackDelta {
maxOffset = LookbackDelta
if maxOffset < LookbackDelta+subqOffset {
maxOffset = LookbackDelta + subqOffset
}
if n.Offset+LookbackDelta > maxOffset {
maxOffset = n.Offset + LookbackDelta
if n.Offset+LookbackDelta+subqOffset > maxOffset {
maxOffset = n.Offset + LookbackDelta + subqOffset
}
case *MatrixSelector:
if maxOffset < n.Range {
maxOffset = n.Range
if maxOffset < n.Range+subqOffset {
maxOffset = n.Range + subqOffset
}
if n.Offset+n.Range > maxOffset {
maxOffset = n.Offset + n.Range
if n.Offset+n.Range+subqOffset > maxOffset {
maxOffset = n.Offset + n.Range + subqOffset
}
}
return nil
Expand All @@ -514,7 +550,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
params := &storage.SelectParams{
Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End),
Step: int64(s.Interval / time.Millisecond),
Step: durationToInt64Millis(s.Interval),
}

switch n := node.(type) {
Expand Down Expand Up @@ -624,9 +660,10 @@ type evaluator struct {
endTimestamp int64 // End time in milliseconds.
interval int64 // Interval in milliseconds.

maxSamples int
currentSamples int
logger log.Logger
maxSamples int
currentSamples int
defaultEvalInterval int64
logger log.Logger
}

// errorf causes a panic with the input formatted into an error.
Expand Down Expand Up @@ -839,6 +876,21 @@ func (ev *evaluator) rangeEval(f func([]Value, *EvalNodeHelper) Vector, exprs ..
return mat
}

// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *SubqueryExpr) *MatrixSelector {
val := ev.eval(subq).(Matrix)
ms := &MatrixSelector{
Range: subq.Range,
Offset: subq.Offset,
series: make([]storage.Series, 0, len(val)),
}
for _, s := range val {
ms.series = append(ms.series, NewStorageSeries(s))
}
return ms
}

// eval evaluates the given expression as the given AST expression node requires.
func (ev *evaluator) eval(expr Expr) Value {
// This is the top-level evaluation method.
Expand Down Expand Up @@ -880,10 +932,17 @@ func (ev *evaluator) eval(expr Expr) Value {
var matrixArgIndex int
var matrixArg bool
for i, a := range e.Args {
_, ok := a.(*MatrixSelector)
if ok {
if _, ok := a.(*MatrixSelector); ok {
matrixArgIndex = i
matrixArg = true
break
}
// SubqueryExpr can be used in place of MatrixSelector.
if subq, ok := a.(*SubqueryExpr); ok {
matrixArgIndex = i
matrixArg = true
// Replacing SubqueryExpr with MatrixSelector.
e.Args[i] = ev.evalSubquery(subq)
break
}
}
Expand Down Expand Up @@ -1077,11 +1136,43 @@ func (ev *evaluator) eval(expr Expr) Value {
panic(fmt.Errorf("cannot do range evaluation of matrix selector"))
}
return ev.matrixSelector(e)

case *SubqueryExpr:
offsetMillis := durationToInt64Millis(e.Offset)
rangeMillis := durationToInt64Millis(e.Range)
newEv := &evaluator{
endTimestamp: ev.endTimestamp - offsetMillis,
interval: ev.defaultEvalInterval,
ctx: ev.ctx,
currentSamples: ev.currentSamples,
maxSamples: ev.maxSamples,
defaultEvalInterval: ev.defaultEvalInterval,
logger: ev.logger,
}

if e.Step != 0 {
newEv.interval = durationToInt64Millis(e.Step)
}

// Start with the first timestamp after (ev.startTimestamp - offset - range)
// that is aligned with the step (multiple of 'newEv.interval').
newEv.startTimestamp = newEv.interval * ((ev.startTimestamp - offsetMillis - rangeMillis) / newEv.interval)
codesome marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get rid of the if below if you write this as:

Suggested change
newEv.startTimestamp = newEv.interval * ((ev.startTimestamp - offsetMillis - rangeMillis) / newEv.interval)
newEv.startTimestamp = ((ev.startTimestamp - offsetMillis - rangeMillis - 1) / newEv.interval + 1) * newEv.interval

I would also replace all the comments on the few lines above with "Start from the first timestamp at or after `ev.startTimestamp - offset - range` that is a multiple of `interval`.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was testing this, and noticed that when the start time goes negative because of offset and range, this equation wont hold good.

if newEv.startTimestamp < (ev.startTimestamp - offsetMillis - rangeMillis) {
newEv.startTimestamp += newEv.interval
}

res := newEv.eval(e.Expr)
ev.currentSamples = newEv.currentSamples
return res
}

panic(fmt.Errorf("unhandled expression of type: %T", expr))
}

func durationToInt64Millis(d time.Duration) int64 {
return int64(d / time.Millisecond)
}

// vectorSelector evaluates a *VectorSelector expression.
func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
if err := checkForSeriesSetExpansion(node, ev.ctx); err != nil {
Expand Down Expand Up @@ -1825,10 +1916,6 @@ func shouldDropMetricName(op ItemType) bool {
}
}

// LookbackDelta determines the time since the last sample after which a time
// series is considered stale.
var LookbackDelta = 5 * time.Minute

// documentedType returns the internal type to the equivalent
// user facing terminology as defined in the documentation.
func documentedType(t ValueType) string {
Expand Down
Loading