Skip to content

Commit

Permalink
Adds a OperatorTracer object to extract observability information fro…
Browse files Browse the repository at this point in the history
…m vector operators during query execution.

The OperatorTracer is passed to the VectorOperator during Series() and Next() operations, and is a container
to aggregate o11y information. Currently, the vector selector and matrix selector operators use this tracer
for calculating query samples.
  • Loading branch information
sahnib committed Apr 10, 2023
1 parent 190e5c3 commit bb4a6ec
Show file tree
Hide file tree
Showing 18 changed files with 155 additions and 110 deletions.
19 changes: 12 additions & 7 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ type compatibilityQuery struct {
t QueryType
resultSort resultSorter

querySamples *stats.QuerySamples

cancel context.CancelFunc
}

Expand All @@ -471,7 +473,8 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
defer cancel()
q.cancel = cancel

resultSeries, err := q.Query.exec.Series(ctx)
tracer := newOperatorTracer(q.opts.EnablePerStepStats)
resultSeries, err := q.Query.exec.Series(ctx, tracer)
if err != nil {
return newErrResult(ret, err)
}
Expand All @@ -489,7 +492,7 @@ loop:
case <-ctx.Done():
return newErrResult(ret, ctx.Err())
default:
r, err := q.Query.exec.Next(ctx)
r, err := q.Query.exec.Next(ctx, tracer)
if err != nil {
return newErrResult(ret, err)
}
Expand Down Expand Up @@ -612,11 +615,7 @@ func (q *compatibilityQuery) Statement() promparser.Statement { return nil }

// Stats always returns empty query stats for now to avoid panic.
func (q *compatibilityQuery) Stats() *stats.Statistics {
var enablePerStepStats bool
if q.opts != nil {
enablePerStepStats = q.opts.EnablePerStepStats
}
return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: stats.NewQuerySamples(enablePerStepStats)}
return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: q.querySamples}
}

func (q *compatibilityQuery) Close() { q.Cancel() }
Expand Down Expand Up @@ -679,3 +678,9 @@ func explain(w io.Writer, o model.VectorOperator, indent, indentNext string) {
}
}
}

func newOperatorTracer(enablePerStepStats bool) *model.OperatorTracer {
return &model.OperatorTracer{
QuerySamples: stats.NewQuerySamples(enablePerStepStats),
}
}
26 changes: 13 additions & 13 deletions execution/aggregate/hashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (a *aggregate) Explain() (me string, next []model.VectorOperator) {
return fmt.Sprintf("[*aggregate] %v without (%v)", a.aggregation.String(), a.labels), ops
}

func (a *aggregate) Series(ctx context.Context) ([]labels.Labels, error) {
func (a *aggregate) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) {
var err error
a.once.Do(func() { err = a.initializeTables(ctx) })
a.once.Do(func() { err = a.initializeTables(ctx, tracer) })
if err != nil {
return nil, err
}
Expand All @@ -101,14 +101,14 @@ func (a *aggregate) GetPool() *model.VectorPool {
return a.vectorPool
}

func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
func (a *aggregate) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

in, err := a.next.Next(ctx)
in, err := a.next.Next(ctx, tracer)
if err != nil {
return nil, err
}
Expand All @@ -117,13 +117,13 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
}
defer a.next.GetPool().PutVectors(in)

a.once.Do(func() { err = a.initializeTables(ctx) })
a.once.Do(func() { err = a.initializeTables(ctx, tracer) })
if err != nil {
return nil, err
}

if a.paramOp != nil {
args, err := a.paramOp.Next(ctx)
args, err := a.paramOp.Next(ctx, tracer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,17 +156,17 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
return result, nil
}

func (a *aggregate) initializeTables(ctx context.Context) error {
func (a *aggregate) initializeTables(ctx context.Context, tracer *model.OperatorTracer) error {
var (
tables []aggregateTable
series []labels.Labels
err error
)

if a.by && len(a.labels) == 0 {
tables, series, err = a.initializeVectorizedTables(ctx)
tables, series, err = a.initializeVectorizedTables(ctx, tracer)
} else {
tables, series, err = a.initializeScalarTables(ctx)
tables, series, err = a.initializeScalarTables(ctx, tracer)
}
if err != nil {
return err
Expand All @@ -184,10 +184,10 @@ func (a *aggregate) workerTask(workerID int, arg float64, vector model.StepVecto
return table.toVector(a.vectorPool)
}

func (a *aggregate) initializeVectorizedTables(ctx context.Context) ([]aggregateTable, []labels.Labels, error) {
func (a *aggregate) initializeVectorizedTables(ctx context.Context, tracer *model.OperatorTracer) ([]aggregateTable, []labels.Labels, error) {
tables, err := newVectorizedTables(a.stepsBatch, a.aggregation)
if errors.Is(err, parse.ErrNotSupportedExpr) {
return a.initializeScalarTables(ctx)
return a.initializeScalarTables(ctx, tracer)
}

if err != nil {
Expand All @@ -197,8 +197,8 @@ func (a *aggregate) initializeVectorizedTables(ctx context.Context) ([]aggregate
return tables, []labels.Labels{{}}, nil
}

func (a *aggregate) initializeScalarTables(ctx context.Context) ([]aggregateTable, []labels.Labels, error) {
series, err := a.next.Series(ctx)
func (a *aggregate) initializeScalarTables(ctx context.Context, tracer *model.OperatorTracer) ([]aggregateTable, []labels.Labels, error) {
series, err := a.next.Series(ctx, tracer)
if err != nil {
return nil, nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions execution/aggregate/khashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func NewKHashAggregate(
return a, nil
}

func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) {
in, err := a.next.Next(ctx)
func (a *kAggregate) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) {
in, err := a.next.Next(ctx, tracer)
if err != nil {
return nil, err
}
args, err := a.paramOp.Next(ctx)
args, err := a.paramOp.Next(ctx, tracer)
if err != nil {
return nil, err
}
Expand All @@ -108,7 +108,7 @@ func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) {
return nil, errors.New("scalar argument not found")
}

a.once.Do(func() { err = a.init(ctx) })
a.once.Do(func() { err = a.init(ctx, tracer) })
if err != nil {
return nil, err
}
Expand All @@ -123,9 +123,9 @@ func (a *kAggregate) Next(ctx context.Context) ([]model.StepVector, error) {
return result, nil
}

func (a *kAggregate) Series(ctx context.Context) ([]labels.Labels, error) {
func (a *kAggregate) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) {
var err error
a.once.Do(func() { err = a.init(ctx) })
a.once.Do(func() { err = a.init(ctx, tracer) })
if err != nil {
return nil, err
}
Expand All @@ -144,8 +144,8 @@ func (a *kAggregate) Explain() (me string, next []model.VectorOperator) {
return fmt.Sprintf("[*kaggregate] %v without (%v)", a.aggregation.String(), a.labels), []model.VectorOperator{a.paramOp, a.next}
}

func (a *kAggregate) init(ctx context.Context) error {
series, err := a.next.Series(ctx)
func (a *kAggregate) init(ctx context.Context, tracer *model.OperatorTracer) error {
series, err := a.next.Series(ctx, tracer)
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions execution/binary/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,35 +83,35 @@ func (o *scalarOperator) Explain() (me string, next []model.VectorOperator) {
return fmt.Sprintf("[*scalarOperator] %s", parser.ItemTypeStr[o.opType]), []model.VectorOperator{o.next, o.scalar}
}

func (o *scalarOperator) Series(ctx context.Context) ([]labels.Labels, error) {
func (o *scalarOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) {
var err error
o.seriesOnce.Do(func() { err = o.loadSeries(ctx) })
o.seriesOnce.Do(func() { err = o.loadSeries(ctx, tracer) })
if err != nil {
return nil, err
}
return o.series, nil
}

func (o *scalarOperator) Next(ctx context.Context) ([]model.StepVector, error) {
func (o *scalarOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

in, err := o.next.Next(ctx)
in, err := o.next.Next(ctx, tracer)
if err != nil {
return nil, err
}
if in == nil {
return nil, nil
}
o.seriesOnce.Do(func() { err = o.loadSeries(ctx) })
o.seriesOnce.Do(func() { err = o.loadSeries(ctx, tracer) })
if err != nil {
return nil, err
}

scalarIn, err := o.scalar.Next(ctx)
scalarIn, err := o.scalar.Next(ctx, tracer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -157,8 +157,8 @@ func (o *scalarOperator) GetPool() *model.VectorPool {
return o.pool
}

func (o *scalarOperator) loadSeries(ctx context.Context) error {
vectorSeries, err := o.next.Series(ctx)
func (o *scalarOperator) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error {
vectorSeries, err := o.next.Series(ctx, tracer)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions execution/binary/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,29 +83,29 @@ func (o *vectorOperator) Explain() (me string, next []model.VectorOperator) {
return fmt.Sprintf("[*vectorOperator] %s %v ignoring %v group %v", parser.ItemTypeStr[o.opType], o.matching.Card.String(), o.matching.On, o.matching.Include), []model.VectorOperator{o.lhs, o.rhs}
}

func (o *vectorOperator) Series(ctx context.Context) ([]labels.Labels, error) {
func (o *vectorOperator) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) {
var err error
o.once.Do(func() { err = o.initOutputs(ctx) })
o.once.Do(func() { err = o.initOutputs(ctx, tracer) })
if err != nil {
return nil, err
}

return o.series, nil
}

func (o *vectorOperator) initOutputs(ctx context.Context) error {
func (o *vectorOperator) initOutputs(ctx context.Context, tracer *model.OperatorTracer) error {
var highCardSide []labels.Labels
var errChan = make(chan error, 1)
go func() {
var err error
highCardSide, err = o.lhs.Series(ctx)
highCardSide, err = o.lhs.Series(ctx, tracer)
if err != nil {
errChan <- err
}
close(errChan)
}()

lowCardSide, err := o.rhs.Series(ctx)
lowCardSide, err := o.rhs.Series(ctx, tracer)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (o *vectorOperator) initOutputs(ctx context.Context) error {
return nil
}

func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) {
func (o *vectorOperator) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand All @@ -166,14 +166,14 @@ func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) {
var lerrChan = make(chan error, 1)
go func() {
var err error
lhs, err = o.lhs.Next(ctx)
lhs, err = o.lhs.Next(ctx, tracer)
if err != nil {
lerrChan <- err
}
close(lerrChan)
}()

rhs, rerr := o.rhs.Next(ctx)
rhs, rerr := o.rhs.Next(ctx, tracer)
lerr := <-lerrChan
if rerr != nil {
return nil, rerr
Expand All @@ -190,7 +190,7 @@ func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) {
}

var err error
o.once.Do(func() { err = o.initOutputs(ctx) })
o.once.Do(func() { err = o.initOutputs(ctx, tracer) })
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions execution/exchange/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ func (c *coalesce) GetPool() *model.VectorPool {
return c.pool
}

func (c *coalesce) Series(ctx context.Context) ([]labels.Labels, error) {
func (c *coalesce) Series(ctx context.Context, tracer *model.OperatorTracer) ([]labels.Labels, error) {
var err error
c.once.Do(func() { err = c.loadSeries(ctx) })
c.once.Do(func() { err = c.loadSeries(ctx, tracer) })
if err != nil {
return nil, err
}
return c.series, nil
}

func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) {
func (c *coalesce) Next(ctx context.Context, tracer *model.OperatorTracer) ([]model.StepVector, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

var err error
c.once.Do(func() { err = c.loadSeries(ctx) })
c.once.Do(func() { err = c.loadSeries(ctx, tracer) })
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) {
go func(opIdx int, o model.VectorOperator) {
defer c.wg.Done()

in, err := o.Next(ctx)
in, err := o.Next(ctx, tracer)
if err != nil {
errChan <- err
return
Expand Down Expand Up @@ -139,7 +139,7 @@ func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) {
return out, nil
}

func (c *coalesce) loadSeries(ctx context.Context) error {
func (c *coalesce) loadSeries(ctx context.Context, tracer *model.OperatorTracer) error {
var wg sync.WaitGroup
var numSeries uint64
allSeries := make([][]labels.Labels, len(c.operators))
Expand All @@ -160,7 +160,7 @@ func (c *coalesce) loadSeries(ctx context.Context) error {
}

}()
series, err := c.operators[i].Series(ctx)
series, err := c.operators[i].Series(ctx, tracer)
if err != nil {
errChan <- err
return
Expand Down

0 comments on commit bb4a6ec

Please sign in to comment.