Skip to content

Commit

Permalink
all: implement subquery expressions for instant queries
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Aug 20, 2023
1 parent 0df2a1a commit 44eabd4
Show file tree
Hide file tree
Showing 23 changed files with 478 additions and 117 deletions.
9 changes: 8 additions & 1 deletion engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ func BenchmarkInstantQuery(b *testing.B) {
name: "sort_desc",
query: `sort_desc(http_requests_total)`,
},
{
name: "subquery sum_over_time",
query: `sum_over_time(count(http_requests_total)[1h:10s])`,
},
}

for _, tc := range cases {
Expand All @@ -497,7 +501,10 @@ func BenchmarkInstantQuery(b *testing.B) {
}
})
b.Run("new_engine", func(b *testing.B) {
ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}})
ng := engine.New(engine.Opts{
EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second},
EnableSubqueries: true,
})
b.ResetTimer()
b.ReportAllocs()

Expand Down
62 changes: 43 additions & 19 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/thanos-io/promql-engine/extlabels"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/parser"
"github.com/thanos-io/promql-engine/query"
)

type QueryType int
Expand All @@ -47,6 +48,7 @@ const (
subsystem string = "engine"
InstantQuery QueryType = 1
RangeQuery QueryType = 2
stepsBatch = 10
)

type Opts struct {
Expand All @@ -72,6 +74,10 @@ type Opts struct {
// This will default to false.
EnableXFunctions bool

// EnableSubqueries enables the engine to handle subqueries without falling back to prometheus.
// This will default to false.
EnableSubqueries bool

// FallbackEngine
Engine v1.QueryEngine

Expand Down Expand Up @@ -219,6 +225,10 @@ func New(opts Opts) *compatibilityEngine {
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
enableSubqueries: opts.EnableSubqueries,
noStepSubqueryIntervalFn: func(d time.Duration) time.Duration {
return time.Duration(opts.NoStepSubqueryIntervalFn(d.Milliseconds()) * 1000000)
},
}
}

Expand All @@ -234,8 +244,10 @@ type compatibilityEngine struct {
timeout time.Duration
metrics *engineMetrics

extLookbackDelta time.Duration
enableAnalysis bool
extLookbackDelta time.Duration
enableAnalysis bool
enableSubqueries bool
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

func (e *compatibilityEngine) SetQueryLogger(l promql.QueryLogger) {
Expand All @@ -260,15 +272,21 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que
// the presentation layer and not when computing the results.
resultSort := newResultSort(expr)

lplan := logicalplan.New(expr, &logicalplan.Opts{
Start: ts,
End: ts,
Step: 1,
LookbackDelta: opts.LookbackDelta(),
})
lplan = lplan.Optimize(e.logicalOptimizers)

exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta(), e.extLookbackDelta, e.enableAnalysis)
qOpts := &query.Options{
Context: ctx,
Start: ts,
End: ts,
Step: 0,
StepsBatch: stepsBatch,
LookbackDelta: opts.LookbackDelta(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
EnableSubqueries: e.enableSubqueries,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
}

lplan := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers)
exec, err := execution.New(lplan.Expr(), q, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
Expand Down Expand Up @@ -311,15 +329,21 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}

lplan := logicalplan.New(expr, &logicalplan.Opts{
Start: start,
End: end,
Step: step,
LookbackDelta: opts.LookbackDelta(),
})
lplan = lplan.Optimize(e.logicalOptimizers)
qOpts := &query.Options{
Context: ctx,
Start: start,
End: end,
Step: step,
StepsBatch: stepsBatch,
LookbackDelta: opts.LookbackDelta(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
EnableSubqueries: false, // not yet implemented for range queries.
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
}

exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta(), e.extLookbackDelta, e.enableAnalysis)
lplan := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers)
exec, err := execution.New(lplan.Expr(), q, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
Expand Down
104 changes: 100 additions & 4 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestMain(m *testing.M) {

func TestPromqlAcceptance(t *testing.T) {
engine := engine.New(engine.Opts{
EnableSubqueries: true,
EngineOpts: promql.EngineOpts{
EnableAtModifier: true,
EnableNegativeOffset: true,
Expand Down Expand Up @@ -2775,10 +2776,11 @@ func TestInstantQuery(t *testing.T) {
// Negative offset and at modifier are enabled by default
// since Prometheus v2.33.0, so we also enable them.
opts := promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() },
}

cases := []struct {
Expand All @@ -2788,6 +2790,99 @@ func TestInstantQuery(t *testing.T) {
queryTime time.Time
sortByLabels bool // if true, the series in the result between the old and new engine should be sorted before compared
}{
{
name: "sum_over_time with subquery",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="2"} 2+2x50
http_requests_total{pod="nginx-4", series="3"} 5+2x50
http_requests_total{pod="nginx-5", series="1"} 8+4x50
http_requests_total{pod="nginx-6", series="2"} 2+3x50`,
queryTime: time.Unix(600, 0),
query: "sum_over_time(sum by (series) (http_requests_total)[5m:1m])",
sortByLabels: true,
},
{
name: "sum_over_time with subquery with default step",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="2"} 2+2x50
http_requests_total{pod="nginx-4", series="3"} 5+2x50
http_requests_total{pod="nginx-5", series="1"} 8+4x50
http_requests_total{pod="nginx-6", series="2"} 2+3x50`,
queryTime: time.Unix(600, 0),
query: "sum_over_time(sum by (series) (http_requests_total)[5m:])",
sortByLabels: true,
},
{
name: "sum_over_time with subquery with resolution that doesnt divide step length",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="2"} 2+2x50
http_requests_total{pod="nginx-4", series="3"} 5+2x50
http_requests_total{pod="nginx-5", series="1"} 8+4x50
http_requests_total{pod="nginx-6", series="2"} 2+3x50`,
queryTime: time.Unix(600, 0),
query: "sum_over_time(sum by (series) (http_requests_total)[5m:22s])",
sortByLabels: true,
},
{
name: "sum_over_time with subquery with offset",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="2"} 2+2x50
http_requests_total{pod="nginx-4", series="3"} 5+2x50
http_requests_total{pod="nginx-5", series="1"} 8+4x50
http_requests_total{pod="nginx-6", series="2"} 2+3x50`,
queryTime: time.Unix(600, 0),
query: "sum_over_time(sum by (series) (http_requests_total)[5m:1m] offset 1m)",
sortByLabels: true,
},
{
name: "sum_over_time with subquery with inner offset",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="2"} 2+2x50
http_requests_total{pod="nginx-4", series="3"} 5+2x50
http_requests_total{pod="nginx-5", series="1"} 8+4x50
http_requests_total{pod="nginx-6", series="2"} 2+3x50`,
queryTime: time.Unix(600, 0),
query: "sum_over_time(sum by (series) (http_requests_total offset 1m)[5m:1m])",
sortByLabels: true,
},
{
name: "sum_over_time with subquery with inner @ modifier",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="2"} 2+2x50
http_requests_total{pod="nginx-4", series="3"} 5+2x50
http_requests_total{pod="nginx-5", series="1"} 8+4x50
http_requests_total{pod="nginx-6", series="2"} 2+3x50`,
queryTime: time.Unix(600, 0),
query: "sum_over_time(sum by (series) (http_requests_total @ 10)[5m:1m])",
sortByLabels: true,
},
{
name: "sum_over_time with nested subqueries with inner @ modifier",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="2"} 2+2x50
http_requests_total{pod="nginx-4", series="3"} 5+2x50
http_requests_total{pod="nginx-5", series="1"} 8+4x50
http_requests_total{pod="nginx-6", series="2"} 2+3x50`,
queryTime: time.Unix(600, 0),
query: "sum_over_time(rate(sum by (series) (http_requests_total @ 10)[5m:1m] @0)[10m:1m])",
sortByLabels: true,
},
{
name: "sum_over_time with subquery should drop name label",
load: `load 10s
http_requests_total{pod="nginx-1", series="1"} 1+1x40
http_requests_total{pod="nginx-2", series="1"} 2+2x50`,
queryTime: time.Unix(0, 0),
query: `sum_over_time(http_requests_total{series="1"} offset 7s[1h:1m] @ 119.800)`,
sortByLabels: true,
},
{
name: "duplicate label set",
load: `load 5m
Expand Down Expand Up @@ -3638,6 +3733,7 @@ func TestInstantQuery(t *testing.T) {
EngineOpts: opts,
DisableFallback: disableFallback,
LogicalOptimizers: optimizers,
EnableSubqueries: true,
})

ctx := context.Background()
Expand Down
6 changes: 3 additions & 3 deletions engine/user_defined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ load 30s

type injectVectorSelector struct{}

func (i injectVectorSelector) Optimize(expr parser.Expr, _ *logicalplan.Opts) parser.Expr {
func (i injectVectorSelector) Optimize(expr parser.Expr, _ *query.Options) parser.Expr {
logicalplan.TraverseBottomUp(nil, &expr, func(_, current *parser.Expr) bool {
switch (*current).(type) {
case *parser.VectorSelector:
Expand All @@ -76,9 +76,9 @@ type logicalVectorSelector struct {
*parser.VectorSelector
}

func (c logicalVectorSelector) MakeExecutionOperator(stepsBatch int, vectors *model.VectorPool, selectors *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
func (c logicalVectorSelector) MakeExecutionOperator(vectors *model.VectorPool, selectors *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
return &vectorSelectorOperator{
stepsBatch: stepsBatch,
stepsBatch: opts.StepsBatch,
vectors: vectors,

mint: opts.Start.UnixMilli(),
Expand Down
9 changes: 5 additions & 4 deletions execution/aggregate/hashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/parser"
"github.com/thanos-io/promql-engine/query"
"github.com/thanos-io/promql-engine/worker"
)

Expand Down Expand Up @@ -48,7 +49,7 @@ func NewHashAggregate(
aggregation parser.ItemType,
by bool,
labels []string,
stepsBatch int,
opts *query.Options,
) (model.VectorOperator, error) {
newAccumulator, err := makeAccumulatorFunc(aggregation)
if err != nil {
Expand All @@ -61,15 +62,15 @@ func NewHashAggregate(
a := &aggregate{
next: next,
paramOp: paramOp,
params: make([]float64, stepsBatch),
params: make([]float64, opts.StepsBatch),
vectorPool: points,
by: by,
aggregation: aggregation,
labels: labels,
stepsBatch: stepsBatch,
stepsBatch: opts.StepsBatch,
newAccumulator: newAccumulator,
}
a.workers = worker.NewGroup(stepsBatch, a.workerTask)
a.workers = worker.NewGroup(opts.StepsBatch, a.workerTask)
a.OperatorTelemetry = &model.TrackedTelemetry{}

return a, nil
Expand Down
3 changes: 1 addition & 2 deletions execution/aggregate/khashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func NewKHashAggregate(
aggregation parser.ItemType,
by bool,
labels []string,
stepsBatch int,
opts *query.Options,
) (model.VectorOperator, error) {
var compare func(float64, float64) bool
Expand All @@ -74,7 +73,7 @@ func NewKHashAggregate(
labels: labels,
paramOp: paramOp,
compare: compare,
params: make([]float64, stepsBatch),
params: make([]float64, opts.StepsBatch),
}
a.OperatorTelemetry = &model.NoopTelemetry{}
if opts.EnableAnalysis {
Expand Down
Loading

0 comments on commit 44eabd4

Please sign in to comment.