Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution: add timing information #290

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type Opts struct {
// This will default to false.
EnableXFunctions bool

// EnableAnalysis enables query analysis.
EnableAnalysis bool

// FallbackEngine
Engine v1.QueryEngine
}
Expand Down Expand Up @@ -211,6 +214,7 @@ func New(opts Opts) *compatibilityEngine {
timeout: opts.Timeout,
metrics: metrics,
extLookbackDelta: opts.ExtLookbackDelta,
enableAnalysis: opts.EnableAnalysis,
}
}

Expand All @@ -226,6 +230,8 @@ type compatibilityEngine struct {
timeout time.Duration
metrics *engineMetrics

enableAnalysis bool

extLookbackDelta time.Duration
}

Expand Down Expand Up @@ -260,7 +266,7 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que
})
lplan = lplan.Optimize(e.logicalOptimizers)

exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta)
exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
Expand All @@ -275,12 +281,13 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que
}

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
expr: expr,
ts: ts,
t: InstantQuery,
resultSort: resultSort,
Query: &Query{exec: exec, opts: opts},
engine: e,
expr: expr,
ts: ts,
t: InstantQuery,
resultSort: resultSort,
debugWriter: e.debugWriter,
}, nil
}

Expand Down Expand Up @@ -311,7 +318,7 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query
})
lplan = lplan.Optimize(e.logicalOptimizers)

exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta)
exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
Expand All @@ -326,18 +333,23 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query
}

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
expr: expr,
t: RangeQuery,
Query: &Query{exec: exec, opts: opts},
engine: e,
expr: expr,
t: RangeQuery,
debugWriter: e.debugWriter,
}, nil
}

type ExplainableQuery interface {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can call it ObservableQuery now because it's not just Explain() anymore.

promql.Query

Explain() *ExplainOutputNode
Profile()
Analyze() *AnalyzeOutputNode
}

type AnalyzeOutputNode struct {
// TODO: Add fields.
}

type ExplainOutputNode struct {
Expand All @@ -358,8 +370,11 @@ func (q *Query) Explain() *ExplainOutputNode {
return explainVector(q.exec)
}

func (q *Query) Profile() {
// TODO(bwplotka): Return profile.
// Analyze returns human-readable query analysis of the created exector.
// This must always be called after Exec, in order to populate telemetry data.
func (q *Query) Analyze() *AnalyzeOutputNode {
// TODO: Implement by calling Analyze on ObservableVectorOperators.
return &AnalyzeOutputNode{}
}

func explainVector(v model.VectorOperator) *ExplainOutputNode {
Expand Down Expand Up @@ -478,13 +493,17 @@ type compatibilityQuery struct {
t QueryType
resultSort resultSorter

cancel context.CancelFunc
cancel context.CancelFunc
debugWriter io.Writer
}

func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
// Handle case with strings early on as this does not need us to process samples.
switch e := q.expr.(type) {
case *parser.StringLiteral:
if q.debugWriter != nil {
analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "")
}
return &promql.Result{Value: promql.String{V: e.Val, T: q.ts.UnixMilli()}}
}
ret = &promql.Result{
Expand Down Expand Up @@ -567,6 +586,9 @@ loop:
}
sort.Sort(resultMatrix)
ret.Value = resultMatrix
if q.debugWriter != nil {
analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "")
}
return ret
}

Expand Down Expand Up @@ -610,6 +632,10 @@ loop:
}

ret.Value = result

if q.debugWriter != nil {
analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "")
}
return ret
}

Expand Down Expand Up @@ -688,6 +714,12 @@ func recoverEngine(logger log.Logger, expr parser.Expr, errp *error) {
}
}

// Useful for local testing.

func analyze(w io.Writer, o model.ObservableVectorOperator, indent, indentNext string) {
// TODO: Implement.
}

func explain(w io.Writer, o model.VectorOperator, indent, indentNext string) {
me, next := o.Explain()
_, _ = w.Write([]byte(indent))
Expand Down
3 changes: 2 additions & 1 deletion execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const stepsBatch = 10

// New creates new physical query execution for a given query expression which represents logical plan.
// TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan.
func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration) (model.VectorOperator, error) {
func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, enableAnalysis bool) (model.VectorOperator, error) {
opts := &query.Options{
Context: ctx,
Start: mint,
Expand All @@ -61,6 +61,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min
LookbackDelta: lookbackDelta,
StepsBatch: stepsBatch,
ExtLookbackDelta: extLookbackDelta,
EnableAnalysis: enableAnalysis,
}
selectorPool := engstore.NewSelectorPool(queryable)
hints := storage.SelectHints{
Expand Down
1 change: 1 addition & 0 deletions execution/function/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type scalarFunctionOperator struct {
pool *model.VectorPool
next model.VectorOperator
model.TimingInformation
}

func (o *scalarFunctionOperator) Explain() (me string, next []model.VectorOperator) {
Expand Down
24 changes: 24 additions & 0 deletions execution/model/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,34 @@ package model

import (
"context"
"time"

"github.com/prometheus/prometheus/model/labels"
)

type NoopTimingInformation struct{}

func (ti *NoopTimingInformation) AddCPUTimeTaken(t time.Duration) {}

type TimingInformation struct {
CPUTime time.Duration
}

func (ti *TimingInformation) AddCPUTimeTaken(t time.Duration) {
ti.CPUTime += t
}

type OperatorTelemetry interface {
AddCPUTimeTaken(time.Duration)
}

type ObservableVectorOperator interface {
VectorOperator
OperatorTelemetry

Analyze() (*TimingInformation, []ObservableVectorOperator)
}

// VectorOperator performs operations on series in step by step fashion.
type VectorOperator interface {
// Next yields vectors of samples from all series for one or more execution steps.
Expand Down
10 changes: 9 additions & 1 deletion execution/scan/literal_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ type numberLiteralSelector struct {
once sync.Once

val float64
t model.OperatorTelemetry
}

func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val float64) *numberLiteralSelector {
return &numberLiteralSelector{
op := &numberLiteralSelector{
vectorPool: pool,
numSteps: opts.NumSteps(),
mint: opts.Start.UnixMilli(),
Expand All @@ -39,6 +40,13 @@ func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val f
currentStep: opts.Start.UnixMilli(),
val: val,
}

op.t = &model.NoopTimingInformation{}
if opts.EnableAnalysis {
op.t = &model.TimingInformation{}
}

return op
}

func (o *numberLiteralSelector) Explain() (me string, next []model.VectorOperator) {
Expand Down
3 changes: 2 additions & 1 deletion query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type Options struct {
LookbackDelta time.Duration
ExtLookbackDelta time.Duration

StepsBatch int64
StepsBatch int64
EnableAnalysis bool
}

func (o *Options) NumSteps() int {
Expand Down
Loading