Skip to content

Commit

Permalink
misc: move tracker to engine, construct query opts in engine
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Aug 18, 2023
1 parent cf9d7f9 commit 318b752
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 171 deletions.
34 changes: 32 additions & 2 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ import (
"github.com/thanos-io/promql-engine/execution"
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/tracking"
"github.com/thanos-io/promql-engine/execution/warnings"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/parser"
"github.com/thanos-io/promql-engine/query"
)

type QueryType int
Expand All @@ -47,6 +49,8 @@ const (
subsystem string = "engine"
InstantQuery QueryType = 1
RangeQuery QueryType = 2

stepsBatch = 10
)

type Opts struct {
Expand Down Expand Up @@ -270,7 +274,20 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que
})
lplan = lplan.Optimize(e.logicalOptimizers)

exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.maxSamples, e.enableAnalysis)
qopts := &query.Options{
Context: ctx,
Start: ts,
End: ts,
Step: 1,
LookbackDelta: e.lookbackDelta,
StepsBatch: stepsBatch,
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
}

tracker := tracking.NewTracker(e.maxSamples, qopts)

exec, err := execution.New(ctx, tracker, lplan.Expr(), q, qopts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
Expand Down Expand Up @@ -322,7 +339,20 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query
})
lplan = lplan.Optimize(e.logicalOptimizers)

exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.maxSamples, e.enableAnalysis)
qopts := &query.Options{
Context: ctx,
Start: start,
End: end,
Step: step,
LookbackDelta: e.lookbackDelta,
StepsBatch: stepsBatch,
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
}

tracker := tracking.NewTracker(e.maxSamples, qopts)

exec, err := execution.New(ctx, tracker, lplan.Expr(), q, qopts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
Expand Down
105 changes: 46 additions & 59 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,65 +32,52 @@ import (
"github.com/thanos-io/promql-engine/execution/binary"
"github.com/thanos-io/promql-engine/execution/exchange"
"github.com/thanos-io/promql-engine/execution/function"
"github.com/thanos-io/promql-engine/execution/limits"
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/noop"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/remote"
"github.com/thanos-io/promql-engine/execution/scan"
"github.com/thanos-io/promql-engine/execution/step_invariant"
engstore "github.com/thanos-io/promql-engine/execution/storage"
"github.com/thanos-io/promql-engine/execution/tracking"
"github.com/thanos-io/promql-engine/execution/unary"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/parser"
"github.com/thanos-io/promql-engine/query"
)

const stepsBatch = 10

// New creates new physical query execution for a given query expression which represents logical plan.
// TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan.
func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, maxSamples int, enableAnalysis bool) (model.VectorOperator, error) {
opts := &query.Options{
Context: ctx,
Start: mint,
End: maxt,
Step: step,
LookbackDelta: lookbackDelta,
StepsBatch: stepsBatch,
ExtLookbackDelta: extLookbackDelta,
EnableAnalysis: enableAnalysis,
}
func New(ctx context.Context, tracker *tracking.Tracker, expr parser.Expr, queryable storage.Queryable, opts *query.Options) (model.VectorOperator, error) {
selectorPool := engstore.NewSelectorPool(queryable)
hints := storage.SelectHints{
Start: mint.UnixMilli(),
End: maxt.UnixMilli(),
Start: opts.Start.UnixMilli(),
End: opts.End.UnixMilli(),
// TODO(fpetkovski): Adjust the step for sub-queries once they are supported.
Step: step.Milliseconds(),
Step: opts.Step.Milliseconds(),
}
limits := limits.NewLimits(maxSamples, opts)

return newOperator(expr, selectorPool, opts, hints, limits)
return newOperator(expr, selectorPool, opts, hints, tracker)
}

func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) {
func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) {
switch e := expr.(type) {
case *parser.NumberLiteral:
return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, e.Val), nil
return scan.NewNumberLiteralSelector(model.NewVectorPool(int(opts.StepsBatch)), opts, e.Val), nil

case *parser.VectorSelector:
start, end := getTimeRangesForVectorSelector(e, opts, 0)
hints.Start = start
hints.End = end
filter := storage.GetSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, hints)
return newShardedVectorSelector(filter, opts, e.Offset, limits)
return newShardedVectorSelector(filter, opts, e.Offset, tracker)

case *logicalplan.FilteredSelector:
start, end := getTimeRangesForVectorSelector(e.VectorSelector, opts, 0)
hints.Start = start
hints.End = end
selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
return newShardedVectorSelector(selector, opts, e.Offset, limits)
return newShardedVectorSelector(selector, opts, e.Offset, tracker)

case *parser.Call:
hints.Func = e.Func.Name
Expand All @@ -99,33 +86,33 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
for i := range e.Args {
switch t := e.Args[i].(type) {
case *parser.MatrixSelector:
return newRangeVectorFunction(e, t, storage, opts, hints, limits)
return newRangeVectorFunction(e, t, storage, opts, hints, tracker)
}
}
return newInstantVectorFunction(e, storage, opts, hints, limits)
return newInstantVectorFunction(e, storage, opts, hints, tracker)

case *parser.AggregateExpr:
hints.Func = e.Op.String()
hints.Grouping = e.Grouping
hints.By = !e.Without
var paramOp model.VectorOperator

next, err := newOperator(e.Expr, storage, opts, hints, limits)
next, err := newOperator(e.Expr, storage, opts, hints, tracker)
if err != nil {
return nil, err
}

if e.Param != nil && e.Param.Type() != parser.ValueTypeString {
paramOp, err = newOperator(e.Param, storage, opts, hints, limits)
paramOp, err = newOperator(e.Param, storage, opts, hints, tracker)
if err != nil {
return nil, err
}
}

if e.Op == parser.TOPK || e.Op == parser.BOTTOMK {
next, err = aggregate.NewKHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch, opts)
next, err = aggregate.NewKHashAggregate(model.NewVectorPool(int(opts.StepsBatch)), next, paramOp, e.Op, !e.Without, e.Grouping, int(opts.StepsBatch), opts)
} else {
next, err = aggregate.NewHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch)
next, err = aggregate.NewHashAggregate(model.NewVectorPool(int(opts.StepsBatch)), next, paramOp, e.Op, !e.Without, e.Grouping, int(opts.StepsBatch))
}

if err != nil {
Expand All @@ -136,24 +123,24 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O

case *parser.BinaryExpr:
if e.LHS.Type() == parser.ValueTypeScalar || e.RHS.Type() == parser.ValueTypeScalar {
return newScalarBinaryOperator(e, storage, opts, hints, limits)
return newScalarBinaryOperator(e, storage, opts, hints, tracker)
}

return newVectorBinaryOperator(e, storage, opts, hints, limits)
return newVectorBinaryOperator(e, storage, opts, hints, tracker)

case *parser.ParenExpr:
return newOperator(e.Expr, storage, opts, hints, limits)
return newOperator(e.Expr, storage, opts, hints, tracker)

case *parser.UnaryExpr:
next, err := newOperator(e.Expr, storage, opts, hints, limits)
next, err := newOperator(e.Expr, storage, opts, hints, tracker)
if err != nil {
return nil, err
}
switch e.Op {
case parser.ADD:
return next, nil
case parser.SUB:
return unary.NewUnaryNegation(next, stepsBatch)
return unary.NewUnaryNegation(next, int(opts.StepsBatch))
default:
// This shouldn't happen as Op was validated when parsing already
// https://github.com/prometheus/prometheus/blob/v2.38.0/promql/parser/parse.go#L573.
Expand All @@ -163,13 +150,13 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
case *parser.StepInvariantExpr:
switch t := e.Expr.(type) {
case *parser.NumberLiteral:
return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, t.Val), nil
return scan.NewNumberLiteralSelector(model.NewVectorPool(int(opts.StepsBatch)), opts, t.Val), nil
}
next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints, limits)
next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints, tracker)
if err != nil {
return nil, err
}
return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(stepsBatch, 1), next, e.Expr, opts, stepsBatch)
return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(int(opts.StepsBatch), 1), next, e.Expr, opts, int(opts.StepsBatch))

case logicalplan.Deduplicate:
// The Deduplicate operator will deduplicate samples using a last-sample-wins strategy.
Expand All @@ -182,14 +169,14 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O

operators := make([]model.VectorOperator, len(e.Expressions))
for i, expr := range e.Expressions {
operator, err := newOperator(expr, storage, opts, hints, limits)
operator, err := newOperator(expr, storage, opts, hints, tracker)
if err != nil {
return nil, err
}
operators[i] = operator
}
coalesce := exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...)
dedup := exchange.NewDedupOperator(model.NewVectorPool(stepsBatch), coalesce)
coalesce := exchange.NewCoalesce(model.NewVectorPool(int(opts.StepsBatch)), opts, operators...)
dedup := exchange.NewDedupOperator(model.NewVectorPool(int(opts.StepsBatch)), coalesce)
return exchange.NewConcurrent(dedup, 2), nil

case logicalplan.RemoteExecution:
Expand All @@ -204,12 +191,12 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
// We need to set the lookback for the selector to 0 since the remote query already applies one lookback.
selectorOpts := *opts
selectorOpts.LookbackDelta = 0
remoteExec := remote.NewExecution(qry, model.NewVectorPool(stepsBatch), e.QueryRangeStart, &selectorOpts, limits)
remoteExec := remote.NewExecution(qry, model.NewVectorPool(int(opts.StepsBatch)), e.QueryRangeStart, &selectorOpts, tracker)
return exchange.NewConcurrent(remoteExec, 2), nil
case logicalplan.Noop:
return noop.NewOperator(), nil
case logicalplan.UserDefinedExpr:
return e.MakeExecutionOperator(stepsBatch, model.NewVectorPool(stepsBatch), storage, opts, hints)
return e.MakeExecutionOperator(int(opts.StepsBatch), model.NewVectorPool(int(opts.StepsBatch)), storage, opts, hints)
default:
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e)
}
Expand All @@ -226,7 +213,7 @@ func unpackVectorSelector(t *parser.MatrixSelector) (*parser.VectorSelector, []*
}
}

func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) {
func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) {
// TODO(saswatamcode): Range vector result might need new operator
// before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39
vs, filters, err := unpackVectorSelector(t)
Expand All @@ -252,34 +239,34 @@ func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *e

operators := make([]model.VectorOperator, 0, numShards)
for i := 0; i < numShards; i++ {
operator, err := scan.NewMatrixSelector(model.NewVectorPool(stepsBatch), filter, e, opts, t.Range, vs.Offset, i, numShards, limits)
operator, err := scan.NewMatrixSelector(model.NewVectorPool(int(opts.StepsBatch)), filter, e, opts, t.Range, vs.Offset, i, numShards, tracker)
if err != nil {
return nil, err
}
operators = append(operators, exchange.NewConcurrent(operator, 2))
}

return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil
return exchange.NewCoalesce(model.NewVectorPool(int(opts.StepsBatch)), opts, operators...), nil
}

func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) {
func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) {
nextOperators := make([]model.VectorOperator, 0, len(e.Args))
for i := range e.Args {
// Strings don't need an operator
if e.Args[i].Type() == parser.ValueTypeString {
continue
}
next, err := newOperator(e.Args[i], storage, opts, hints, limits)
next, err := newOperator(e.Args[i], storage, opts, hints, tracker)
if err != nil {
return nil, err
}
nextOperators = append(nextOperators, next)
}

return function.NewFunctionOperator(e, nextOperators, stepsBatch, opts)
return function.NewFunctionOperator(e, nextOperators, int(opts.StepsBatch), opts)
}

func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration, limits *limits.Limits) (model.VectorOperator, error) {
func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration, tracker *tracking.Tracker) (model.VectorOperator, error) {
numShards := runtime.GOMAXPROCS(0) / 2
if numShards < 1 {
numShards = 1
Expand All @@ -288,31 +275,31 @@ func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Opti
for i := 0; i < numShards; i++ {
operator := exchange.NewConcurrent(
scan.NewVectorSelector(
model.NewVectorPool(stepsBatch), selector, opts, offset, i, numShards, limits), 2)
model.NewVectorPool(int(opts.StepsBatch)), selector, opts, offset, i, numShards, tracker), 2)
operators = append(operators, operator)
}

return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil
return exchange.NewCoalesce(model.NewVectorPool(int(opts.StepsBatch)), opts, operators...), nil
}

func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) {
leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints, limits)
func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) {
leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints, tracker)
if err != nil {
return nil, err
}
rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints, limits)
rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints, tracker)
if err != nil {
return nil, err
}
return binary.NewVectorOperator(model.NewVectorPool(stepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts)
return binary.NewVectorOperator(model.NewVectorPool(int(opts.StepsBatch)), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts)
}

func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) {
lhs, err := newOperator(e.LHS, selectorPool, opts, hints, limits)
func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) {
lhs, err := newOperator(e.LHS, selectorPool, opts, hints, tracker)
if err != nil {
return nil, err
}
rhs, err := newOperator(e.RHS, selectorPool, opts, hints, limits)
rhs, err := newOperator(e.RHS, selectorPool, opts, hints, tracker)
if err != nil {
return nil, err
}
Expand All @@ -325,7 +312,7 @@ func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.Select
scalarSide = binary.ScalarSideLeft
}

return binary.NewScalar(model.NewVectorPoolWithSize(stepsBatch, 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool, opts)
return binary.NewScalar(model.NewVectorPoolWithSize(int(opts.StepsBatch), 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool, opts)
}

// Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791.
Expand Down
Loading

0 comments on commit 318b752

Please sign in to comment.