From ea601a83d85fc2104bf86f44d3ff005d61814931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 27 Jun 2023 14:31:52 +0300 Subject: [PATCH 1/2] execution: add timing information MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let's try using embedding to capture observability information for each operator instead of wrapping operator inside of operator. Main reasoning for this approach instead of the other way is to be able to catch more granular data like memory allocations. With wrapping we would have to have cross-references between operators and this seems cleaner to me. Need to explore this further to see how it looks like. Signed-off-by: Giedrius Statkevičius --- execution/function/scalar.go | 1 + execution/model/operator.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/execution/function/scalar.go b/execution/function/scalar.go index 4335b6fe..57cbf1be 100644 --- a/execution/function/scalar.go +++ b/execution/function/scalar.go @@ -15,6 +15,7 @@ import ( type scalarFunctionOperator struct { pool *model.VectorPool next model.VectorOperator + model.TimingInformation } func (o *scalarFunctionOperator) Explain() (me string, next []model.VectorOperator) { diff --git a/execution/model/operator.go b/execution/model/operator.go index 40d04eb6..d64175bb 100644 --- a/execution/model/operator.go +++ b/execution/model/operator.go @@ -5,10 +5,30 @@ package model import ( "context" + "time" "github.com/prometheus/prometheus/model/labels" ) +type NoopTimingInformation struct{} + +func (ti *NoopTimingInformation) AddCPUTimeTaken(t time.Duration) {} + +type TimingInformation struct { + CPUTime time.Duration +} + +func (ti *TimingInformation) AddCPUTimeTaken(t time.Duration) { + ti.CPUTime += t +} + +type ObservableVectorOperator interface { + VectorOperator + AddCPUTimeTaken(time.Duration) + + Analyze() (*TimingInformation, []ObservableVectorOperator) +} + // VectorOperator performs operations on series in step by step fashion. type VectorOperator interface { // Next yields vectors of samples from all series for one or more execution steps. From 4db3acad2957b12b9d61ecd0a4e7fba0faea216f Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 3 Jul 2023 12:00:30 +0530 Subject: [PATCH 2/2] Hook ObservableVectorOperator into ExplainableQuery Signed-off-by: Saswata Mukherjee --- engine/engine.go | 64 ++++++++++++++++++++++-------- execution/execution.go | 3 +- execution/model/operator.go | 6 ++- execution/scan/literal_selector.go | 10 ++++- query/options.go | 3 +- 5 files changed, 66 insertions(+), 20 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index f8130ca4..bbd3e43a 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -69,6 +69,9 @@ type Opts struct { // This will default to false. EnableXFunctions bool + // EnableAnalysis enables query analysis. + EnableAnalysis bool + // FallbackEngine Engine v1.QueryEngine } @@ -211,6 +214,7 @@ func New(opts Opts) *compatibilityEngine { timeout: opts.Timeout, metrics: metrics, extLookbackDelta: opts.ExtLookbackDelta, + enableAnalysis: opts.EnableAnalysis, } } @@ -226,6 +230,8 @@ type compatibilityEngine struct { timeout time.Duration metrics *engineMetrics + enableAnalysis bool + extLookbackDelta time.Duration } @@ -260,7 +266,7 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta) + exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) @@ -275,12 +281,13 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - expr: expr, - ts: ts, - t: InstantQuery, - resultSort: resultSort, + Query: &Query{exec: exec, opts: opts}, + engine: e, + expr: expr, + ts: ts, + t: InstantQuery, + resultSort: resultSort, + debugWriter: e.debugWriter, }, nil } @@ -311,7 +318,7 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta) + exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) @@ -326,10 +333,11 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - expr: expr, - t: RangeQuery, + Query: &Query{exec: exec, opts: opts}, + engine: e, + expr: expr, + t: RangeQuery, + debugWriter: e.debugWriter, }, nil } @@ -337,7 +345,11 @@ type ExplainableQuery interface { promql.Query Explain() *ExplainOutputNode - Profile() + Analyze() *AnalyzeOutputNode +} + +type AnalyzeOutputNode struct { + // TODO: Add fields. } type ExplainOutputNode struct { @@ -358,8 +370,11 @@ func (q *Query) Explain() *ExplainOutputNode { return explainVector(q.exec) } -func (q *Query) Profile() { - // TODO(bwplotka): Return profile. +// Analyze returns human-readable query analysis of the created exector. +// This must always be called after Exec, in order to populate telemetry data. +func (q *Query) Analyze() *AnalyzeOutputNode { + // TODO: Implement by calling Analyze on ObservableVectorOperators. + return &AnalyzeOutputNode{} } func explainVector(v model.VectorOperator) *ExplainOutputNode { @@ -478,13 +493,17 @@ type compatibilityQuery struct { t QueryType resultSort resultSorter - cancel context.CancelFunc + cancel context.CancelFunc + debugWriter io.Writer } func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { // Handle case with strings early on as this does not need us to process samples. switch e := q.expr.(type) { case *parser.StringLiteral: + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") + } return &promql.Result{Value: promql.String{V: e.Val, T: q.ts.UnixMilli()}} } ret = &promql.Result{ @@ -567,6 +586,9 @@ loop: } sort.Sort(resultMatrix) ret.Value = resultMatrix + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") + } return ret } @@ -610,6 +632,10 @@ loop: } ret.Value = result + + if q.debugWriter != nil { + analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") + } return ret } @@ -688,6 +714,12 @@ func recoverEngine(logger log.Logger, expr parser.Expr, errp *error) { } } +// Useful for local testing. + +func analyze(w io.Writer, o model.ObservableVectorOperator, indent, indentNext string) { + // TODO: Implement. +} + func explain(w io.Writer, o model.VectorOperator, indent, indentNext string) { me, next := o.Explain() _, _ = w.Write([]byte(indent)) diff --git a/execution/execution.go b/execution/execution.go index 58e2916d..3d0ee945 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -52,7 +52,7 @@ const stepsBatch = 10 // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. -func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration) (model.VectorOperator, error) { +func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, enableAnalysis bool) (model.VectorOperator, error) { opts := &query.Options{ Context: ctx, Start: mint, @@ -61,6 +61,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min LookbackDelta: lookbackDelta, StepsBatch: stepsBatch, ExtLookbackDelta: extLookbackDelta, + EnableAnalysis: enableAnalysis, } selectorPool := engstore.NewSelectorPool(queryable) hints := storage.SelectHints{ diff --git a/execution/model/operator.go b/execution/model/operator.go index d64175bb..ee08687b 100644 --- a/execution/model/operator.go +++ b/execution/model/operator.go @@ -22,9 +22,13 @@ func (ti *TimingInformation) AddCPUTimeTaken(t time.Duration) { ti.CPUTime += t } +type OperatorTelemetry interface { + AddCPUTimeTaken(time.Duration) +} + type ObservableVectorOperator interface { VectorOperator - AddCPUTimeTaken(time.Duration) + OperatorTelemetry Analyze() (*TimingInformation, []ObservableVectorOperator) } diff --git a/execution/scan/literal_selector.go b/execution/scan/literal_selector.go index 802f067a..c62ac65a 100644 --- a/execution/scan/literal_selector.go +++ b/execution/scan/literal_selector.go @@ -27,10 +27,11 @@ type numberLiteralSelector struct { once sync.Once val float64 + t model.OperatorTelemetry } func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val float64) *numberLiteralSelector { - return &numberLiteralSelector{ + op := &numberLiteralSelector{ vectorPool: pool, numSteps: opts.NumSteps(), mint: opts.Start.UnixMilli(), @@ -39,6 +40,13 @@ func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val f currentStep: opts.Start.UnixMilli(), val: val, } + + op.t = &model.NoopTimingInformation{} + if opts.EnableAnalysis { + op.t = &model.TimingInformation{} + } + + return op } func (o *numberLiteralSelector) Explain() (me string, next []model.VectorOperator) { diff --git a/query/options.go b/query/options.go index 0143eea9..fed4fa26 100644 --- a/query/options.go +++ b/query/options.go @@ -16,7 +16,8 @@ type Options struct { LookbackDelta time.Duration ExtLookbackDelta time.Duration - StepsBatch int64 + StepsBatch int64 + EnableAnalysis bool } func (o *Options) NumSteps() int {