From 6765d04a5bb9628ec067a5b2ae74f99dcbf2ff3b Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Wed, 16 Aug 2023 17:45:24 +0200 Subject: [PATCH 1/3] execution,engine: respect configured max samples Signed-off-by: Michael Hoffmann --- engine/engine.go | 6 ++-- engine/engine_test.go | 56 +++++++++++++++++++++++++++-- execution/execution.go | 58 ++++++++++++++++--------------- execution/limits/limits.go | 35 +++++++++++++++++++ execution/remote/operator.go | 5 +-- execution/scan/matrix_selector.go | 43 +++++++++++++++++++---- execution/scan/vector_selector.go | 15 ++++++-- go.mod | 2 +- 8 files changed, 175 insertions(+), 45 deletions(-) create mode 100644 execution/limits/limits.go diff --git a/engine/engine.go b/engine/engine.go index 12858ba6..5d27e226 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -217,6 +217,7 @@ func New(opts Opts) *compatibilityEngine { timeout: opts.Timeout, metrics: metrics, extLookbackDelta: opts.ExtLookbackDelta, + maxSamples: opts.MaxSamples, enableAnalysis: opts.EnableAnalysis, } } @@ -234,6 +235,7 @@ type compatibilityEngine struct { metrics *engineMetrics extLookbackDelta time.Duration + maxSamples int enableAnalysis bool } @@ -268,7 +270,7 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) + exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.maxSamples, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) @@ -320,7 +322,7 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.enableAnalysis) + exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.maxSamples, e.enableAnalysis) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) diff --git a/engine/engine_test.go b/engine/engine_test.go index afb5b08a..4e43f510 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -2787,8 +2787,8 @@ func TestInstantQuery(t *testing.T) { { name: "duplicate label set", load: `load 5m - testmetric1{src="a",dst="b"} 0 - testmetric2{src="a",dst="b"} 1`, +testmetric1{src="a",dst="b"} 0 +testmetric2{src="a",dst="b"} 1`, query: "changes({__name__=~'testmetric1|testmetric2'}[5m])", }, { @@ -3661,7 +3661,7 @@ func TestInstantQuery(t *testing.T) { if hasNaNs(oldResult) { t.Log("Applying comparison with NaN equality.") equalsWithNaNs(t, oldResult, newResult) - } else if oldResult.Err != nil { + } else if oldResult.Err != nil && newResult.Err != nil { testutil.Equals(t, oldResult.Err.Error(), newResult.Err.Error()) } else { testutil.Equals(t, oldResult, newResult) @@ -3675,6 +3675,56 @@ func TestInstantQuery(t *testing.T) { } } +func TestQueryLimits(t *testing.T) { + load := `load 1s + example{foo="bar"} 1+0x3600 + example{foo="baz"} 1+0x3600 + ` + test, err := promql.NewTest(t, load) + testutil.Ok(t, err) + defer test.Close() + testutil.Ok(t, test.Run()) + + ctx := test.Context() + + newEngine := engine.New(engine.Opts{ + EngineOpts: promql.EngineOpts{ + MaxSamples: 10, + Timeout: 10 * time.Second, + }}) + + t.Run("one series too many samples", func(t *testing.T) { + query := `sum_over_time(example{foo="bar"}[20s])` + q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(20, 0)) + testutil.Ok(t, err) + + newResult := q1.Exec(ctx) + testutil.NotOk(t, newResult.Err) + testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error()) + }) + t.Run("two series too many samples", func(t *testing.T) { + query := `sum_over_time(example[10s])` + q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(5, 0)) + testutil.Ok(t, err) + + newResult := q1.Exec(ctx) + testutil.NotOk(t, newResult.Err) + testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error()) + }) + t.Run("range query should only account for samples at each step", func(t *testing.T) { + query := `sum(example)` + start := time.Unix(0, 0) + end := start.Add(time.Hour) + step := time.Second + + q1, err := newEngine.NewRangeQuery(ctx, test.Queryable(), nil, query, start, end, step) + testutil.Ok(t, err) + + newResult := q1.Exec(ctx) + testutil.Ok(t, newResult.Err) + }) +} + func TestQueryCancellation(t *testing.T) { twelveHours := int64(12 * time.Hour.Seconds()) diff --git a/execution/execution.go b/execution/execution.go index 8dd84bfb..50e8499e 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -32,6 +32,7 @@ import ( "github.com/thanos-io/promql-engine/execution/binary" "github.com/thanos-io/promql-engine/execution/exchange" "github.com/thanos-io/promql-engine/execution/function" + "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/noop" "github.com/thanos-io/promql-engine/execution/parse" @@ -49,7 +50,7 @@ const stepsBatch = 10 // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. -func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, enableAnalysis bool) (model.VectorOperator, error) { +func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, maxSamples int, enableAnalysis bool) (model.VectorOperator, error) { opts := &query.Options{ Context: ctx, Start: mint, @@ -67,11 +68,12 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min // TODO(fpetkovski): Adjust the step for sub-queries once they are supported. Step: step.Milliseconds(), } + limits := limits.NewLimits(maxSamples) - return newOperator(expr, selectorPool, opts, hints) + return newOperator(expr, selectorPool, opts, hints, limits) } -func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { +func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { switch e := expr.(type) { case *parser.NumberLiteral: return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, e.Val), nil @@ -81,14 +83,14 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O hints.Start = start hints.End = end filter := storage.GetSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, hints) - return newShardedVectorSelector(filter, opts, e.Offset) + return newShardedVectorSelector(filter, opts, e.Offset, limits) case *logicalplan.FilteredSelector: start, end := getTimeRangesForVectorSelector(e.VectorSelector, opts, 0) hints.Start = start hints.End = end selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints) - return newShardedVectorSelector(selector, opts, e.Offset) + return newShardedVectorSelector(selector, opts, e.Offset, limits) case *parser.Call: hints.Func = e.Func.Name @@ -97,10 +99,10 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O for i := range e.Args { switch t := e.Args[i].(type) { case *parser.MatrixSelector: - return newRangeVectorFunction(e, t, storage, opts, hints) + return newRangeVectorFunction(e, t, storage, opts, hints, limits) } } - return newInstantVectorFunction(e, storage, opts, hints) + return newInstantVectorFunction(e, storage, opts, hints, limits) case *parser.AggregateExpr: hints.Func = e.Op.String() @@ -108,13 +110,13 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O hints.By = !e.Without var paramOp model.VectorOperator - next, err := newOperator(e.Expr, storage, opts, hints) + next, err := newOperator(e.Expr, storage, opts, hints, limits) if err != nil { return nil, err } if e.Param != nil && e.Param.Type() != parser.ValueTypeString { - paramOp, err = newOperator(e.Param, storage, opts, hints) + paramOp, err = newOperator(e.Param, storage, opts, hints, limits) if err != nil { return nil, err } @@ -134,16 +136,16 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O case *parser.BinaryExpr: if e.LHS.Type() == parser.ValueTypeScalar || e.RHS.Type() == parser.ValueTypeScalar { - return newScalarBinaryOperator(e, storage, opts, hints) + return newScalarBinaryOperator(e, storage, opts, hints, limits) } - return newVectorBinaryOperator(e, storage, opts, hints) + return newVectorBinaryOperator(e, storage, opts, hints, limits) case *parser.ParenExpr: - return newOperator(e.Expr, storage, opts, hints) + return newOperator(e.Expr, storage, opts, hints, limits) case *parser.UnaryExpr: - next, err := newOperator(e.Expr, storage, opts, hints) + next, err := newOperator(e.Expr, storage, opts, hints, limits) if err != nil { return nil, err } @@ -163,7 +165,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O case *parser.NumberLiteral: return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, t.Val), nil } - next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints) + next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints, limits) if err != nil { return nil, err } @@ -180,7 +182,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O operators := make([]model.VectorOperator, len(e.Expressions)) for i, expr := range e.Expressions { - operator, err := newOperator(expr, storage, opts, hints) + operator, err := newOperator(expr, storage, opts, hints, limits) if err != nil { return nil, err } @@ -202,7 +204,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O // We need to set the lookback for the selector to 0 since the remote query already applies one lookback. selectorOpts := *opts selectorOpts.LookbackDelta = 0 - remoteExec := remote.NewExecution(qry, model.NewVectorPool(stepsBatch), e.QueryRangeStart, &selectorOpts) + remoteExec := remote.NewExecution(qry, model.NewVectorPool(stepsBatch), e.QueryRangeStart, &selectorOpts, limits) return exchange.NewConcurrent(remoteExec, 2), nil case logicalplan.Noop: return noop.NewOperator(), nil @@ -224,7 +226,7 @@ func unpackVectorSelector(t *parser.MatrixSelector) (*parser.VectorSelector, []* } } -func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { +func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { // TODO(saswatamcode): Range vector result might need new operator // before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39 vs, filters, err := unpackVectorSelector(t) @@ -250,7 +252,7 @@ func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *e operators := make([]model.VectorOperator, 0, numShards) for i := 0; i < numShards; i++ { - operator, err := scan.NewMatrixSelector(model.NewVectorPool(stepsBatch), filter, e, opts, t.Range, vs.Offset, i, numShards) + operator, err := scan.NewMatrixSelector(model.NewVectorPool(stepsBatch), filter, e, opts, t.Range, vs.Offset, i, numShards, limits) if err != nil { return nil, err } @@ -260,14 +262,14 @@ func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *e return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil } -func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { +func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { nextOperators := make([]model.VectorOperator, 0, len(e.Args)) for i := range e.Args { // Strings don't need an operator if e.Args[i].Type() == parser.ValueTypeString { continue } - next, err := newOperator(e.Args[i], storage, opts, hints) + next, err := newOperator(e.Args[i], storage, opts, hints, limits) if err != nil { return nil, err } @@ -277,7 +279,7 @@ func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, op return function.NewFunctionOperator(e, nextOperators, stepsBatch, opts) } -func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration) (model.VectorOperator, error) { +func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration, limits *limits.Limits) (model.VectorOperator, error) { numShards := runtime.GOMAXPROCS(0) / 2 if numShards < 1 { numShards = 1 @@ -286,31 +288,31 @@ func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Opti for i := 0; i < numShards; i++ { operator := exchange.NewConcurrent( scan.NewVectorSelector( - model.NewVectorPool(stepsBatch), selector, opts, offset, i, numShards), 2) + model.NewVectorPool(stepsBatch), selector, opts, offset, i, numShards, limits), 2) operators = append(operators, operator) } return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil } -func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { - leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints) +func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { + leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints, limits) if err != nil { return nil, err } - rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints) + rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints, limits) if err != nil { return nil, err } return binary.NewVectorOperator(model.NewVectorPool(stepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts) } -func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { - lhs, err := newOperator(e.LHS, selectorPool, opts, hints) +func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { + lhs, err := newOperator(e.LHS, selectorPool, opts, hints, limits) if err != nil { return nil, err } - rhs, err := newOperator(e.RHS, selectorPool, opts, hints) + rhs, err := newOperator(e.RHS, selectorPool, opts, hints, limits) if err != nil { return nil, err } diff --git a/execution/limits/limits.go b/execution/limits/limits.go new file mode 100644 index 00000000..b925a3eb --- /dev/null +++ b/execution/limits/limits.go @@ -0,0 +1,35 @@ +package limits + +import ( + "sync" + + "github.com/efficientgo/core/errors" + "go.uber.org/atomic" +) + +type Limits struct { + maxSamples int + + curSamplesPerTimestamp sync.Map +} + +func NewLimits(maxSamples int) *Limits { + return &Limits{ + maxSamples: maxSamples, + } +} + +func (l *Limits) AccountSamplesForTimestamp(t int64, n int) error { + if l.maxSamples == 0 { + return nil + } + v, _ := l.curSamplesPerTimestamp.LoadOrStore(t, atomic.NewInt64(0)) + av := v.(*atomic.Int64) + + if cur := av.Load(); cur+int64(n) > int64(l.maxSamples) { + return errors.New("query processing would load too many samples into memory in query execution") + } + + av.Add(int64(n)) + return nil +} diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 3e232fa2..2b21add7 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/scan" engstore "github.com/thanos-io/promql-engine/execution/storage" @@ -27,14 +28,14 @@ type Execution struct { model.OperatorTelemetry } -func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options) *Execution { +func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options, limits *limits.Limits) *Execution { storage := newStorageFromQuery(query, opts) e := &Execution{ storage: storage, query: query, opts: opts, queryRangeStart: queryRangeStart, - vectorSelector: scan.NewVectorSelector(pool, storage, opts, 0, 0, 1), + vectorSelector: scan.NewVectorSelector(pool, storage, opts, 0, 0, 1, limits), } e.OperatorTelemetry = &model.NoopTelemetry{} if opts.EnableAnalysis { diff --git a/execution/scan/matrix_selector.go b/execution/scan/matrix_selector.go index 5f7cdf71..0b66c853 100644 --- a/execution/scan/matrix_selector.go +++ b/execution/scan/matrix_selector.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/promql-engine/execution/function" + "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" engstore "github.com/thanos-io/promql-engine/execution/storage" @@ -57,6 +58,8 @@ type matrixSelector struct { // Lookback delta for extended range functions. extLookbackDelta int64 model.OperatorTelemetry + + limits *limits.Limits } // NewMatrixSelector creates operator which selects vector of series over time. @@ -67,6 +70,7 @@ func NewMatrixSelector( opts *query.Options, selectRange, offset time.Duration, shard, numShard int, + limits *limits.Limits, ) (model.VectorOperator, error) { call, ok := rangeVectorFuncs[funcExpr.Func.Name] @@ -94,6 +98,8 @@ func NewMatrixSelector( numShards: numShard, extLookbackDelta: opts.ExtLookbackDelta.Milliseconds(), + + limits: limits, } m.OperatorTelemetry = &model.NoopTelemetry{} if opts.EnableAnalysis { @@ -166,9 +172,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { var err error if !o.isExtFunction { - rangeSamples, err = selectPoints(series.samples, mint, maxt, o.scanners[i].previousSamples) + rangeSamples, err = selectPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples) } else { - rangeSamples, err = selectExtPoints(series.samples, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs) + rangeSamples, err = selectExtPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs) } if err != nil { @@ -271,8 +277,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. -// TODO(fpetkovski): Add max samples limit. -func selectPoints(it *storage.BufferedSeriesIterator, mint, maxt int64, out []sample) ([]sample, error) { +func selectPoints(it *storage.BufferedSeriesIterator, limits *limits.Limits, ts, mint, maxt int64, out []sample) ([]sample, error) { if len(out) > 0 && out[len(out)-1].T >= mint { // There is an overlap between previous and current ranges, retain common // points. In most such cases: @@ -309,6 +314,9 @@ loop: continue loop } if t >= mint { + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, H: fh}) } case chunkenc.ValFloat: @@ -318,6 +326,9 @@ loop: } // Values in the buffer are guaranteed to be smaller than maxt. if t >= mint { + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, F: v}) } } @@ -328,11 +339,17 @@ loop: case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: t, fh := it.AtFloatHistogram() if t == maxt && !value.IsStaleNaN(fh.Sum) { + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, H: fh}) } case chunkenc.ValFloat: t, v := it.At() if t == maxt && !value.IsStaleNaN(v) { + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, F: v}) } } @@ -348,8 +365,7 @@ loop: // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. -// TODO(fpetkovski): Add max samples limit. -func selectExtPoints(it *storage.BufferedSeriesIterator, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) { +func selectExtPoints(it *storage.BufferedSeriesIterator, limits *limits.Limits, ts, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) { extMint := mint - extLookbackDelta if len(out) > 0 && out[len(out)-1].T >= mint { @@ -402,6 +418,9 @@ loop: *metricAppearedTs = &t } if t >= mint { + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, H: fh}) } case chunkenc.ValFloat: @@ -417,9 +436,15 @@ loop: // exists at or before range start, add it and then keep replacing // it with later points while not yet (strictly) inside the range. if t >= mint || !appendedPointBeforeMint { + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, F: v}) appendedPointBeforeMint = true } else { + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out[len(out)-1] = sample{T: t, F: v} } @@ -434,6 +459,9 @@ loop: if *metricAppearedTs == nil { *metricAppearedTs = &t } + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, H: fh}) } case chunkenc.ValFloat: @@ -442,6 +470,9 @@ loop: if *metricAppearedTs == nil { *metricAppearedTs = &t } + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return out, err + } out = append(out, sample{T: t, F: v}) } } diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index 4b2c4dd8..f0397a31 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -12,6 +12,7 @@ import ( "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" engstore "github.com/thanos-io/promql-engine/execution/storage" "github.com/thanos-io/promql-engine/query" @@ -48,6 +49,8 @@ type vectorSelector struct { shard int numShards int model.OperatorTelemetry + + limits *limits.Limits } // NewVectorSelector creates operator which selects vector of series. @@ -57,6 +60,7 @@ func NewVectorSelector( queryOpts *query.Options, offset time.Duration, shard, numShards int, + limits *limits.Limits, ) model.VectorOperator { o := &vectorSelector{ storage: selector, @@ -72,6 +76,8 @@ func NewVectorSelector( shard: shard, numShards: numShards, + + limits: limits, } o.OperatorTelemetry = &model.NoopTelemetry{} if queryOpts.EnableAnalysis { @@ -131,7 +137,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { ) for currStep := 0; currStep < o.numSteps && seriesTs <= o.maxt; currStep++ { - _, v, h, ok, err := selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) + _, v, h, ok, err := selectPoint(series.samples, o.limits, seriesTs, o.lookbackDelta, o.offset) if err != nil { return nil, err } @@ -180,13 +186,16 @@ func (o *vectorSelector) loadSeries(ctx context.Context) error { return err } -// TODO(fpetkovski): Add max samples limit. -func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (int64, float64, *histogram.FloatHistogram, bool, error) { +func selectPoint(it *storage.MemoizedSeriesIterator, limits *limits.Limits, ts, lookbackDelta, offset int64) (int64, float64, *histogram.FloatHistogram, bool, error) { refTime := ts - offset var t int64 var v float64 var fh *histogram.FloatHistogram + if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + return t, v, fh, false, err + } + valueType := it.Seek(refTime) switch valueType { case chunkenc.ValNone: diff --git a/go.mod b/go.mod index 62675831..985652e4 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/prometheus/common v0.42.0 github.com/prometheus/prometheus v0.44.1-0.20230522123707-905a0bd63a12 github.com/stretchr/testify v1.8.2 + go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.2.1 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 gonum.org/v1/gonum v0.12.0 @@ -65,7 +66,6 @@ require ( go.opentelemetry.io/otel v1.14.0 // indirect go.opentelemetry.io/otel/metric v0.37.0 // indirect go.opentelemetry.io/otel/trace v1.14.0 // indirect - go.uber.org/atomic v1.10.0 // indirect golang.org/x/net v0.9.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect golang.org/x/sync v0.1.0 // indirect From cf9d7f97af9cb3fae1745e744208de80a5867dca Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Thu, 17 Aug 2023 15:51:18 +0200 Subject: [PATCH 2/3] test: experiment with sliced of atomic integers instead of sync map Signed-off-by: Michael Hoffmann --- engine/engine_test.go | 99 +++++++++++++++---------------- execution/execution.go | 2 +- execution/limits/limits.go | 74 +++++++++++++++++++---- execution/scan/matrix_selector.go | 31 +++++----- execution/scan/vector_selector.go | 16 ++--- 5 files changed, 135 insertions(+), 87 deletions(-) diff --git a/engine/engine_test.go b/engine/engine_test.go index 4e43f510..91fcf23f 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -114,6 +114,55 @@ func TestQueryExplain(t *testing.T) { } } +func TestQueryLimits(t *testing.T) { + load := `load 1s + example{foo="bar"} 1+0x3600 + example{foo="baz"} 1+0x3600 + ` + test, err := promql.NewTest(t, load) + testutil.Ok(t, err) + defer test.Close() + testutil.Ok(t, test.Run()) + + ctx := test.Context() + + newEngine := engine.New(engine.Opts{ + EngineOpts: promql.EngineOpts{ + MaxSamples: 1000, + Timeout: 10 * time.Second, + }}) + + t.Run("one series too many samples", func(t *testing.T) { + query := `sum_over_time(example{foo="bar"}[1h])` + q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(1800, 0)) + testutil.Ok(t, err) + + newResult := q1.Exec(ctx) + testutil.NotOk(t, newResult.Err) + testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error()) + }) + t.Run("two series too many samples", func(t *testing.T) { + query := `sum_over_time(example[1h])` + q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(900, 0)) + testutil.Ok(t, err) + + newResult := q1.Exec(ctx) + testutil.NotOk(t, newResult.Err) + testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error()) + }) + t.Run("range query should only account for samples at each batch", func(t *testing.T) { + query := `sum(example)` + start := time.Unix(0, 0) + end := start.Add(time.Hour) + step := time.Second + + q1, err := newEngine.NewRangeQuery(ctx, test.Queryable(), nil, query, start, end, step) + testutil.Ok(t, err) + newResult := q1.Exec(ctx) + testutil.Ok(t, newResult.Err) + }) +} + func assertExecutionTimeNonZero(t *testing.T, got *engine.AnalyzeOutputNode) bool { if got != nil { if got.OperatorTelemetry.ExecutionTimeTaken() <= 0 { @@ -3675,56 +3724,6 @@ testmetric2{src="a",dst="b"} 1`, } } -func TestQueryLimits(t *testing.T) { - load := `load 1s - example{foo="bar"} 1+0x3600 - example{foo="baz"} 1+0x3600 - ` - test, err := promql.NewTest(t, load) - testutil.Ok(t, err) - defer test.Close() - testutil.Ok(t, test.Run()) - - ctx := test.Context() - - newEngine := engine.New(engine.Opts{ - EngineOpts: promql.EngineOpts{ - MaxSamples: 10, - Timeout: 10 * time.Second, - }}) - - t.Run("one series too many samples", func(t *testing.T) { - query := `sum_over_time(example{foo="bar"}[20s])` - q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(20, 0)) - testutil.Ok(t, err) - - newResult := q1.Exec(ctx) - testutil.NotOk(t, newResult.Err) - testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error()) - }) - t.Run("two series too many samples", func(t *testing.T) { - query := `sum_over_time(example[10s])` - q1, err := newEngine.NewInstantQuery(ctx, test.Queryable(), nil, query, time.Unix(5, 0)) - testutil.Ok(t, err) - - newResult := q1.Exec(ctx) - testutil.NotOk(t, newResult.Err) - testutil.Equals(t, "query processing would load too many samples into memory in query execution", newResult.Err.Error()) - }) - t.Run("range query should only account for samples at each step", func(t *testing.T) { - query := `sum(example)` - start := time.Unix(0, 0) - end := start.Add(time.Hour) - step := time.Second - - q1, err := newEngine.NewRangeQuery(ctx, test.Queryable(), nil, query, start, end, step) - testutil.Ok(t, err) - - newResult := q1.Exec(ctx) - testutil.Ok(t, newResult.Err) - }) -} - func TestQueryCancellation(t *testing.T) { twelveHours := int64(12 * time.Hour.Seconds()) diff --git a/execution/execution.go b/execution/execution.go index 50e8499e..90dc7dd1 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -68,7 +68,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min // TODO(fpetkovski): Adjust the step for sub-queries once they are supported. Step: step.Milliseconds(), } - limits := limits.NewLimits(maxSamples) + limits := limits.NewLimits(maxSamples, opts) return newOperator(expr, selectorPool, opts, hints, limits) } diff --git a/execution/limits/limits.go b/execution/limits/limits.go index b925a3eb..ae4a9c55 100644 --- a/execution/limits/limits.go +++ b/execution/limits/limits.go @@ -1,35 +1,83 @@ package limits import ( - "sync" - "github.com/efficientgo/core/errors" "go.uber.org/atomic" + + "github.com/thanos-io/promql-engine/query" ) -type Limits struct { - maxSamples int +// We only check every 100 added samples if the limit is breached. +// Doing so for every sample would be prohibitively expensive. +const resolution = 100 - curSamplesPerTimestamp sync.Map +type Limits struct { + maxSamples int + samplesPerBatch []*atomic.Int64 } -func NewLimits(maxSamples int) *Limits { - return &Limits{ - maxSamples: maxSamples, +// NewLimits returns a pointer to a Limits struct. It can be used to +// track samples that enter the engine in one batch and limit it +// to a maximum number. +func NewLimits(maxSamples int, opts *query.Options) *Limits { + limits := &Limits{ + maxSamples: maxSamples, + samplesPerBatch: make([]*atomic.Int64, opts.NumSteps()), + } + for i := range limits.samplesPerBatch { + limits.samplesPerBatch[i] = atomic.NewInt64(0) } + return limits } -func (l *Limits) AccountSamplesForTimestamp(t int64, n int) error { +// AccountSamplesForTimestamp keeps track of the samples used for the batch for timestamp t. +// It will return an error if a batch wants to add use more samples then the configured +// maxSamples value. +func (l *Limits) addSamplesAndCheckLimits(batch, n int) error { if l.maxSamples == 0 { return nil } - v, _ := l.curSamplesPerTimestamp.LoadOrStore(t, atomic.NewInt64(0)) - av := v.(*atomic.Int64) - if cur := av.Load(); cur+int64(n) > int64(l.maxSamples) { + if l.samplesPerBatch[batch].Load()+int64(n) > int64(l.maxSamples) { return errors.New("query processing would load too many samples into memory in query execution") } + l.samplesPerBatch[batch].Add(int64(n)) - av.Add(int64(n)) + return nil +} + +func (l *Limits) Accounter() *Accounter { + return &Accounter{ + limits: l, + resolution: resolution, + } +} + +// Accounter is used to check limits in one batch. It will only +// check if the sample is safe to add every "resolution" samples. +// It is not safe for concurrent usage! +type Accounter struct { + limits *Limits + + curBatch int + samplesAdded int + resolution int +} + +func (acc *Accounter) StartNewBatch() { + acc.curBatch++ + acc.samplesAdded = 0 +} + +func (acc *Accounter) AddSample() error { + acc.samplesAdded++ + if acc.samplesAdded%acc.resolution == 0 { + if err := acc.limits.addSamplesAndCheckLimits(acc.curBatch-1, acc.samplesAdded); err != nil { + // No need to reset samples here; if we return error; processing stops and no more + // samples will be added. + return err + } + acc.samplesAdded = 0 + } return nil } diff --git a/execution/scan/matrix_selector.go b/execution/scan/matrix_selector.go index 0b66c853..456c1cf3 100644 --- a/execution/scan/matrix_selector.go +++ b/execution/scan/matrix_selector.go @@ -59,7 +59,7 @@ type matrixSelector struct { extLookbackDelta int64 model.OperatorTelemetry - limits *limits.Limits + acc *limits.Accounter } // NewMatrixSelector creates operator which selects vector of series over time. @@ -99,7 +99,7 @@ func NewMatrixSelector( extLookbackDelta: opts.ExtLookbackDelta.Milliseconds(), - limits: limits, + acc: limits.Accounter(), } m.OperatorTelemetry = &model.NoopTelemetry{} if opts.EnableAnalysis { @@ -144,6 +144,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { if o.currentStep > o.maxt { return nil, nil } + o.acc.StartNewBatch() if err := o.loadSeries(ctx); err != nil { return nil, err @@ -172,9 +173,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { var err error if !o.isExtFunction { - rangeSamples, err = selectPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples) + rangeSamples, err = selectPoints(series.samples, o.acc, seriesTs, mint, maxt, o.scanners[i].previousSamples) } else { - rangeSamples, err = selectExtPoints(series.samples, o.limits, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs) + rangeSamples, err = selectExtPoints(series.samples, o.acc, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs) } if err != nil { @@ -277,7 +278,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. -func selectPoints(it *storage.BufferedSeriesIterator, limits *limits.Limits, ts, mint, maxt int64, out []sample) ([]sample, error) { +func selectPoints(it *storage.BufferedSeriesIterator, acc *limits.Accounter, ts, mint, maxt int64, out []sample) ([]sample, error) { if len(out) > 0 && out[len(out)-1].T >= mint { // There is an overlap between previous and current ranges, retain common // points. In most such cases: @@ -314,7 +315,7 @@ loop: continue loop } if t >= mint { - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -326,7 +327,7 @@ loop: } // Values in the buffer are guaranteed to be smaller than maxt. if t >= mint { - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) @@ -339,7 +340,7 @@ loop: case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: t, fh := it.AtFloatHistogram() if t == maxt && !value.IsStaleNaN(fh.Sum) { - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -347,7 +348,7 @@ loop: case chunkenc.ValFloat: t, v := it.At() if t == maxt && !value.IsStaleNaN(v) { - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) @@ -365,7 +366,7 @@ loop: // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. -func selectExtPoints(it *storage.BufferedSeriesIterator, limits *limits.Limits, ts, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) { +func selectExtPoints(it *storage.BufferedSeriesIterator, acc *limits.Accounter, ts, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) { extMint := mint - extLookbackDelta if len(out) > 0 && out[len(out)-1].T >= mint { @@ -418,7 +419,7 @@ loop: *metricAppearedTs = &t } if t >= mint { - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -436,13 +437,13 @@ loop: // exists at or before range start, add it and then keep replacing // it with later points while not yet (strictly) inside the range. if t >= mint || !appendedPointBeforeMint { - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) appendedPointBeforeMint = true } else { - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out[len(out)-1] = sample{T: t, F: v} @@ -459,7 +460,7 @@ loop: if *metricAppearedTs == nil { *metricAppearedTs = &t } - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -470,7 +471,7 @@ loop: if *metricAppearedTs == nil { *metricAppearedTs = &t } - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { + if err := acc.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index f0397a31..09dfa444 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -50,7 +50,7 @@ type vectorSelector struct { numShards int model.OperatorTelemetry - limits *limits.Limits + acc *limits.Accounter } // NewVectorSelector creates operator which selects vector of series. @@ -77,7 +77,7 @@ func NewVectorSelector( shard: shard, numShards: numShards, - limits: limits, + acc: limits.Accounter(), } o.OperatorTelemetry = &model.NoopTelemetry{} if queryOpts.EnableAnalysis { @@ -116,6 +116,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { if o.currentStep > o.maxt { return nil, nil } + o.acc.StartNewBatch() if err := o.loadSeries(ctx); err != nil { return nil, err @@ -137,10 +138,13 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { ) for currStep := 0; currStep < o.numSteps && seriesTs <= o.maxt; currStep++ { - _, v, h, ok, err := selectPoint(series.samples, o.limits, seriesTs, o.lookbackDelta, o.offset) + _, v, h, ok, err := selectPoint(series.samples, seriesTs, o.lookbackDelta, o.offset) if err != nil { return nil, err } + if err := o.acc.AddSample(); err != nil { + return nil, err + } if ok { if h != nil { vectors[currStep].AppendHistogram(o.vectorPool, series.signature, h) @@ -186,16 +190,12 @@ func (o *vectorSelector) loadSeries(ctx context.Context) error { return err } -func selectPoint(it *storage.MemoizedSeriesIterator, limits *limits.Limits, ts, lookbackDelta, offset int64) (int64, float64, *histogram.FloatHistogram, bool, error) { +func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset int64) (int64, float64, *histogram.FloatHistogram, bool, error) { refTime := ts - offset var t int64 var v float64 var fh *histogram.FloatHistogram - if err := limits.AccountSamplesForTimestamp(ts, 1); err != nil { - return t, v, fh, false, err - } - valueType := it.Seek(refTime) switch valueType { case chunkenc.ValNone: From 8c465ad071f70079154487deaae38ee072b75da5 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Fri, 18 Aug 2023 17:01:49 +0200 Subject: [PATCH 3/3] misc: move tracker to engine, construct query opts in engine Signed-off-by: Michael Hoffmann --- engine/engine.go | 34 +++++++++- execution/execution.go | 105 +++++++++++++----------------- execution/limits/limits.go | 83 ----------------------- execution/remote/operator.go | 6 +- execution/scan/matrix_selector.go | 36 +++++----- execution/scan/vector_selector.go | 12 ++-- execution/tracking/tracker.go | 79 ++++++++++++++++++++++ 7 files changed, 184 insertions(+), 171 deletions(-) delete mode 100644 execution/limits/limits.go create mode 100644 execution/tracking/tracker.go diff --git a/engine/engine.go b/engine/engine.go index 5d27e226..a67435f4 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -30,9 +30,11 @@ import ( "github.com/thanos-io/promql-engine/execution" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" + "github.com/thanos-io/promql-engine/execution/tracking" "github.com/thanos-io/promql-engine/execution/warnings" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) type QueryType int @@ -47,6 +49,8 @@ const ( subsystem string = "engine" InstantQuery QueryType = 1 RangeQuery QueryType = 2 + + stepsBatch = 10 ) type Opts struct { @@ -270,7 +274,20 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta, e.extLookbackDelta, e.maxSamples, e.enableAnalysis) + qopts := &query.Options{ + Context: ctx, + Start: ts, + End: ts, + Step: 1, + LookbackDelta: e.lookbackDelta, + StepsBatch: stepsBatch, + ExtLookbackDelta: e.extLookbackDelta, + EnableAnalysis: e.enableAnalysis, + } + + tracker := tracking.NewTracker(e.maxSamples, qopts) + + exec, err := execution.New(ctx, tracker, lplan.Expr(), q, qopts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) @@ -322,7 +339,20 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query }) lplan = lplan.Optimize(e.logicalOptimizers) - exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta, e.extLookbackDelta, e.maxSamples, e.enableAnalysis) + qopts := &query.Options{ + Context: ctx, + Start: start, + End: end, + Step: step, + LookbackDelta: e.lookbackDelta, + StepsBatch: stepsBatch, + ExtLookbackDelta: e.extLookbackDelta, + EnableAnalysis: e.enableAnalysis, + } + + tracker := tracking.NewTracker(e.maxSamples, qopts) + + exec, err := execution.New(ctx, tracker, lplan.Expr(), q, qopts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) diff --git a/execution/execution.go b/execution/execution.go index 90dc7dd1..5dd4bcc3 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -32,7 +32,6 @@ import ( "github.com/thanos-io/promql-engine/execution/binary" "github.com/thanos-io/promql-engine/execution/exchange" "github.com/thanos-io/promql-engine/execution/function" - "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/noop" "github.com/thanos-io/promql-engine/execution/parse" @@ -40,57 +39,45 @@ import ( "github.com/thanos-io/promql-engine/execution/scan" "github.com/thanos-io/promql-engine/execution/step_invariant" engstore "github.com/thanos-io/promql-engine/execution/storage" + "github.com/thanos-io/promql-engine/execution/tracking" "github.com/thanos-io/promql-engine/execution/unary" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/promql-engine/parser" "github.com/thanos-io/promql-engine/query" ) -const stepsBatch = 10 - // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. -func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, maxSamples int, enableAnalysis bool) (model.VectorOperator, error) { - opts := &query.Options{ - Context: ctx, - Start: mint, - End: maxt, - Step: step, - LookbackDelta: lookbackDelta, - StepsBatch: stepsBatch, - ExtLookbackDelta: extLookbackDelta, - EnableAnalysis: enableAnalysis, - } +func New(ctx context.Context, tracker *tracking.Tracker, expr parser.Expr, queryable storage.Queryable, opts *query.Options) (model.VectorOperator, error) { selectorPool := engstore.NewSelectorPool(queryable) hints := storage.SelectHints{ - Start: mint.UnixMilli(), - End: maxt.UnixMilli(), + Start: opts.Start.UnixMilli(), + End: opts.End.UnixMilli(), // TODO(fpetkovski): Adjust the step for sub-queries once they are supported. - Step: step.Milliseconds(), + Step: opts.Step.Milliseconds(), } - limits := limits.NewLimits(maxSamples, opts) - return newOperator(expr, selectorPool, opts, hints, limits) + return newOperator(expr, selectorPool, opts, hints, tracker) } -func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { +func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) { switch e := expr.(type) { case *parser.NumberLiteral: - return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, e.Val), nil + return scan.NewNumberLiteralSelector(model.NewVectorPool(int(opts.StepsBatch)), opts, e.Val), nil case *parser.VectorSelector: start, end := getTimeRangesForVectorSelector(e, opts, 0) hints.Start = start hints.End = end filter := storage.GetSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, hints) - return newShardedVectorSelector(filter, opts, e.Offset, limits) + return newShardedVectorSelector(filter, opts, e.Offset, tracker) case *logicalplan.FilteredSelector: start, end := getTimeRangesForVectorSelector(e.VectorSelector, opts, 0) hints.Start = start hints.End = end selector := storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints) - return newShardedVectorSelector(selector, opts, e.Offset, limits) + return newShardedVectorSelector(selector, opts, e.Offset, tracker) case *parser.Call: hints.Func = e.Func.Name @@ -99,10 +86,10 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O for i := range e.Args { switch t := e.Args[i].(type) { case *parser.MatrixSelector: - return newRangeVectorFunction(e, t, storage, opts, hints, limits) + return newRangeVectorFunction(e, t, storage, opts, hints, tracker) } } - return newInstantVectorFunction(e, storage, opts, hints, limits) + return newInstantVectorFunction(e, storage, opts, hints, tracker) case *parser.AggregateExpr: hints.Func = e.Op.String() @@ -110,22 +97,22 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O hints.By = !e.Without var paramOp model.VectorOperator - next, err := newOperator(e.Expr, storage, opts, hints, limits) + next, err := newOperator(e.Expr, storage, opts, hints, tracker) if err != nil { return nil, err } if e.Param != nil && e.Param.Type() != parser.ValueTypeString { - paramOp, err = newOperator(e.Param, storage, opts, hints, limits) + paramOp, err = newOperator(e.Param, storage, opts, hints, tracker) if err != nil { return nil, err } } if e.Op == parser.TOPK || e.Op == parser.BOTTOMK { - next, err = aggregate.NewKHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch, opts) + next, err = aggregate.NewKHashAggregate(model.NewVectorPool(int(opts.StepsBatch)), next, paramOp, e.Op, !e.Without, e.Grouping, int(opts.StepsBatch), opts) } else { - next, err = aggregate.NewHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch) + next, err = aggregate.NewHashAggregate(model.NewVectorPool(int(opts.StepsBatch)), next, paramOp, e.Op, !e.Without, e.Grouping, int(opts.StepsBatch)) } if err != nil { @@ -136,16 +123,16 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O case *parser.BinaryExpr: if e.LHS.Type() == parser.ValueTypeScalar || e.RHS.Type() == parser.ValueTypeScalar { - return newScalarBinaryOperator(e, storage, opts, hints, limits) + return newScalarBinaryOperator(e, storage, opts, hints, tracker) } - return newVectorBinaryOperator(e, storage, opts, hints, limits) + return newVectorBinaryOperator(e, storage, opts, hints, tracker) case *parser.ParenExpr: - return newOperator(e.Expr, storage, opts, hints, limits) + return newOperator(e.Expr, storage, opts, hints, tracker) case *parser.UnaryExpr: - next, err := newOperator(e.Expr, storage, opts, hints, limits) + next, err := newOperator(e.Expr, storage, opts, hints, tracker) if err != nil { return nil, err } @@ -153,7 +140,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O case parser.ADD: return next, nil case parser.SUB: - return unary.NewUnaryNegation(next, stepsBatch) + return unary.NewUnaryNegation(next, int(opts.StepsBatch)) default: // This shouldn't happen as Op was validated when parsing already // https://github.com/prometheus/prometheus/blob/v2.38.0/promql/parser/parse.go#L573. @@ -163,13 +150,13 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O case *parser.StepInvariantExpr: switch t := e.Expr.(type) { case *parser.NumberLiteral: - return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, t.Val), nil + return scan.NewNumberLiteralSelector(model.NewVectorPool(int(opts.StepsBatch)), opts, t.Val), nil } - next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints, limits) + next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints, tracker) if err != nil { return nil, err } - return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(stepsBatch, 1), next, e.Expr, opts, stepsBatch) + return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(int(opts.StepsBatch), 1), next, e.Expr, opts, int(opts.StepsBatch)) case logicalplan.Deduplicate: // The Deduplicate operator will deduplicate samples using a last-sample-wins strategy. @@ -182,14 +169,14 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O operators := make([]model.VectorOperator, len(e.Expressions)) for i, expr := range e.Expressions { - operator, err := newOperator(expr, storage, opts, hints, limits) + operator, err := newOperator(expr, storage, opts, hints, tracker) if err != nil { return nil, err } operators[i] = operator } - coalesce := exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...) - dedup := exchange.NewDedupOperator(model.NewVectorPool(stepsBatch), coalesce) + coalesce := exchange.NewCoalesce(model.NewVectorPool(int(opts.StepsBatch)), opts, operators...) + dedup := exchange.NewDedupOperator(model.NewVectorPool(int(opts.StepsBatch)), coalesce) return exchange.NewConcurrent(dedup, 2), nil case logicalplan.RemoteExecution: @@ -204,12 +191,12 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O // We need to set the lookback for the selector to 0 since the remote query already applies one lookback. selectorOpts := *opts selectorOpts.LookbackDelta = 0 - remoteExec := remote.NewExecution(qry, model.NewVectorPool(stepsBatch), e.QueryRangeStart, &selectorOpts, limits) + remoteExec := remote.NewExecution(qry, model.NewVectorPool(int(opts.StepsBatch)), e.QueryRangeStart, &selectorOpts, tracker) return exchange.NewConcurrent(remoteExec, 2), nil case logicalplan.Noop: return noop.NewOperator(), nil case logicalplan.UserDefinedExpr: - return e.MakeExecutionOperator(stepsBatch, model.NewVectorPool(stepsBatch), storage, opts, hints) + return e.MakeExecutionOperator(int(opts.StepsBatch), model.NewVectorPool(int(opts.StepsBatch)), storage, opts, hints) default: return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e) } @@ -226,7 +213,7 @@ func unpackVectorSelector(t *parser.MatrixSelector) (*parser.VectorSelector, []* } } -func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { +func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) { // TODO(saswatamcode): Range vector result might need new operator // before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39 vs, filters, err := unpackVectorSelector(t) @@ -252,34 +239,34 @@ func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *e operators := make([]model.VectorOperator, 0, numShards) for i := 0; i < numShards; i++ { - operator, err := scan.NewMatrixSelector(model.NewVectorPool(stepsBatch), filter, e, opts, t.Range, vs.Offset, i, numShards, limits) + operator, err := scan.NewMatrixSelector(model.NewVectorPool(int(opts.StepsBatch)), filter, e, opts, t.Range, vs.Offset, i, numShards, tracker) if err != nil { return nil, err } operators = append(operators, exchange.NewConcurrent(operator, 2)) } - return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil + return exchange.NewCoalesce(model.NewVectorPool(int(opts.StepsBatch)), opts, operators...), nil } -func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { +func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) { nextOperators := make([]model.VectorOperator, 0, len(e.Args)) for i := range e.Args { // Strings don't need an operator if e.Args[i].Type() == parser.ValueTypeString { continue } - next, err := newOperator(e.Args[i], storage, opts, hints, limits) + next, err := newOperator(e.Args[i], storage, opts, hints, tracker) if err != nil { return nil, err } nextOperators = append(nextOperators, next) } - return function.NewFunctionOperator(e, nextOperators, stepsBatch, opts) + return function.NewFunctionOperator(e, nextOperators, int(opts.StepsBatch), opts) } -func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration, limits *limits.Limits) (model.VectorOperator, error) { +func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration, tracker *tracking.Tracker) (model.VectorOperator, error) { numShards := runtime.GOMAXPROCS(0) / 2 if numShards < 1 { numShards = 1 @@ -288,31 +275,31 @@ func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Opti for i := 0; i < numShards; i++ { operator := exchange.NewConcurrent( scan.NewVectorSelector( - model.NewVectorPool(stepsBatch), selector, opts, offset, i, numShards, limits), 2) + model.NewVectorPool(int(opts.StepsBatch)), selector, opts, offset, i, numShards, tracker), 2) operators = append(operators, operator) } - return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil + return exchange.NewCoalesce(model.NewVectorPool(int(opts.StepsBatch)), opts, operators...), nil } -func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { - leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints, limits) +func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) { + leftOperator, err := newOperator(e.LHS, selectorPool, opts, hints, tracker) if err != nil { return nil, err } - rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints, limits) + rightOperator, err := newOperator(e.RHS, selectorPool, opts, hints, tracker) if err != nil { return nil, err } - return binary.NewVectorOperator(model.NewVectorPool(stepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts) + return binary.NewVectorOperator(model.NewVectorPool(int(opts.StepsBatch)), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts) } -func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, limits *limits.Limits) (model.VectorOperator, error) { - lhs, err := newOperator(e.LHS, selectorPool, opts, hints, limits) +func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints, tracker *tracking.Tracker) (model.VectorOperator, error) { + lhs, err := newOperator(e.LHS, selectorPool, opts, hints, tracker) if err != nil { return nil, err } - rhs, err := newOperator(e.RHS, selectorPool, opts, hints, limits) + rhs, err := newOperator(e.RHS, selectorPool, opts, hints, tracker) if err != nil { return nil, err } @@ -325,7 +312,7 @@ func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.Select scalarSide = binary.ScalarSideLeft } - return binary.NewScalar(model.NewVectorPoolWithSize(stepsBatch, 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool, opts) + return binary.NewScalar(model.NewVectorPoolWithSize(int(opts.StepsBatch), 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool, opts) } // Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791. diff --git a/execution/limits/limits.go b/execution/limits/limits.go deleted file mode 100644 index ae4a9c55..00000000 --- a/execution/limits/limits.go +++ /dev/null @@ -1,83 +0,0 @@ -package limits - -import ( - "github.com/efficientgo/core/errors" - "go.uber.org/atomic" - - "github.com/thanos-io/promql-engine/query" -) - -// We only check every 100 added samples if the limit is breached. -// Doing so for every sample would be prohibitively expensive. -const resolution = 100 - -type Limits struct { - maxSamples int - samplesPerBatch []*atomic.Int64 -} - -// NewLimits returns a pointer to a Limits struct. It can be used to -// track samples that enter the engine in one batch and limit it -// to a maximum number. -func NewLimits(maxSamples int, opts *query.Options) *Limits { - limits := &Limits{ - maxSamples: maxSamples, - samplesPerBatch: make([]*atomic.Int64, opts.NumSteps()), - } - for i := range limits.samplesPerBatch { - limits.samplesPerBatch[i] = atomic.NewInt64(0) - } - return limits -} - -// AccountSamplesForTimestamp keeps track of the samples used for the batch for timestamp t. -// It will return an error if a batch wants to add use more samples then the configured -// maxSamples value. -func (l *Limits) addSamplesAndCheckLimits(batch, n int) error { - if l.maxSamples == 0 { - return nil - } - - if l.samplesPerBatch[batch].Load()+int64(n) > int64(l.maxSamples) { - return errors.New("query processing would load too many samples into memory in query execution") - } - l.samplesPerBatch[batch].Add(int64(n)) - - return nil -} - -func (l *Limits) Accounter() *Accounter { - return &Accounter{ - limits: l, - resolution: resolution, - } -} - -// Accounter is used to check limits in one batch. It will only -// check if the sample is safe to add every "resolution" samples. -// It is not safe for concurrent usage! -type Accounter struct { - limits *Limits - - curBatch int - samplesAdded int - resolution int -} - -func (acc *Accounter) StartNewBatch() { - acc.curBatch++ - acc.samplesAdded = 0 -} - -func (acc *Accounter) AddSample() error { - acc.samplesAdded++ - if acc.samplesAdded%acc.resolution == 0 { - if err := acc.limits.addSamplesAndCheckLimits(acc.curBatch-1, acc.samplesAdded); err != nil { - // No need to reset samples here; if we return error; processing stops and no more - // samples will be added. - return err - } - acc.samplesAdded = 0 - } - return nil -} diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 2b21add7..a2ece588 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -12,10 +12,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" - "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/scan" engstore "github.com/thanos-io/promql-engine/execution/storage" + "github.com/thanos-io/promql-engine/execution/tracking" "github.com/thanos-io/promql-engine/query" ) @@ -28,14 +28,14 @@ type Execution struct { model.OperatorTelemetry } -func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options, limits *limits.Limits) *Execution { +func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options, tracker *tracking.Tracker) *Execution { storage := newStorageFromQuery(query, opts) e := &Execution{ storage: storage, query: query, opts: opts, queryRangeStart: queryRangeStart, - vectorSelector: scan.NewVectorSelector(pool, storage, opts, 0, 0, 1, limits), + vectorSelector: scan.NewVectorSelector(pool, storage, opts, 0, 0, 1, tracker), } e.OperatorTelemetry = &model.NoopTelemetry{} if opts.EnableAnalysis { diff --git a/execution/scan/matrix_selector.go b/execution/scan/matrix_selector.go index 456c1cf3..4bec7231 100644 --- a/execution/scan/matrix_selector.go +++ b/execution/scan/matrix_selector.go @@ -15,10 +15,10 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/promql-engine/execution/function" - "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" engstore "github.com/thanos-io/promql-engine/execution/storage" + "github.com/thanos-io/promql-engine/execution/tracking" "github.com/thanos-io/promql-engine/parser" "github.com/thanos-io/promql-engine/query" ) @@ -59,7 +59,7 @@ type matrixSelector struct { extLookbackDelta int64 model.OperatorTelemetry - acc *limits.Accounter + lim *tracking.Limiter } // NewMatrixSelector creates operator which selects vector of series over time. @@ -70,7 +70,7 @@ func NewMatrixSelector( opts *query.Options, selectRange, offset time.Duration, shard, numShard int, - limits *limits.Limits, + tracker *tracking.Tracker, ) (model.VectorOperator, error) { call, ok := rangeVectorFuncs[funcExpr.Func.Name] @@ -99,7 +99,7 @@ func NewMatrixSelector( extLookbackDelta: opts.ExtLookbackDelta.Milliseconds(), - acc: limits.Accounter(), + lim: tracker.Limiter(), } m.OperatorTelemetry = &model.NoopTelemetry{} if opts.EnableAnalysis { @@ -144,7 +144,7 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { if o.currentStep > o.maxt { return nil, nil } - o.acc.StartNewBatch() + o.lim.StartNewBatch() if err := o.loadSeries(ctx); err != nil { return nil, err @@ -173,9 +173,9 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) { var err error if !o.isExtFunction { - rangeSamples, err = selectPoints(series.samples, o.acc, seriesTs, mint, maxt, o.scanners[i].previousSamples) + rangeSamples, err = selectPoints(series.samples, o.lim, mint, maxt, o.scanners[i].previousSamples) } else { - rangeSamples, err = selectExtPoints(series.samples, o.acc, seriesTs, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs) + rangeSamples, err = selectExtPoints(series.samples, o.lim, mint, maxt, o.scanners[i].previousSamples, o.extLookbackDelta, &o.scanners[i].metricAppearedTs) } if err != nil { @@ -278,7 +278,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. -func selectPoints(it *storage.BufferedSeriesIterator, acc *limits.Accounter, ts, mint, maxt int64, out []sample) ([]sample, error) { +func selectPoints(it *storage.BufferedSeriesIterator, lim *tracking.Limiter, mint, maxt int64, out []sample) ([]sample, error) { if len(out) > 0 && out[len(out)-1].T >= mint { // There is an overlap between previous and current ranges, retain common // points. In most such cases: @@ -315,7 +315,7 @@ loop: continue loop } if t >= mint { - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -327,7 +327,7 @@ loop: } // Values in the buffer are guaranteed to be smaller than maxt. if t >= mint { - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) @@ -340,7 +340,7 @@ loop: case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: t, fh := it.AtFloatHistogram() if t == maxt && !value.IsStaleNaN(fh.Sum) { - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -348,7 +348,7 @@ loop: case chunkenc.ValFloat: t, v := it.At() if t == maxt && !value.IsStaleNaN(v) { - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) @@ -366,7 +366,7 @@ loop: // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. -func selectExtPoints(it *storage.BufferedSeriesIterator, acc *limits.Accounter, ts, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) { +func selectExtPoints(it *storage.BufferedSeriesIterator, lim *tracking.Limiter, mint, maxt int64, out []sample, extLookbackDelta int64, metricAppearedTs **int64) ([]sample, error) { extMint := mint - extLookbackDelta if len(out) > 0 && out[len(out)-1].T >= mint { @@ -419,7 +419,7 @@ loop: *metricAppearedTs = &t } if t >= mint { - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -437,13 +437,13 @@ loop: // exists at or before range start, add it and then keep replacing // it with later points while not yet (strictly) inside the range. if t >= mint || !appendedPointBeforeMint { - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) appendedPointBeforeMint = true } else { - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out[len(out)-1] = sample{T: t, F: v} @@ -460,7 +460,7 @@ loop: if *metricAppearedTs == nil { *metricAppearedTs = &t } - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, H: fh}) @@ -471,7 +471,7 @@ loop: if *metricAppearedTs == nil { *metricAppearedTs = &t } - if err := acc.AddSample(); err != nil { + if err := lim.AddSample(); err != nil { return out, err } out = append(out, sample{T: t, F: v}) diff --git a/execution/scan/vector_selector.go b/execution/scan/vector_selector.go index 09dfa444..29cc8912 100644 --- a/execution/scan/vector_selector.go +++ b/execution/scan/vector_selector.go @@ -12,9 +12,9 @@ import ( "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/thanos-io/promql-engine/execution/limits" "github.com/thanos-io/promql-engine/execution/model" engstore "github.com/thanos-io/promql-engine/execution/storage" + "github.com/thanos-io/promql-engine/execution/tracking" "github.com/thanos-io/promql-engine/query" "github.com/prometheus/prometheus/model/histogram" @@ -50,7 +50,7 @@ type vectorSelector struct { numShards int model.OperatorTelemetry - acc *limits.Accounter + lim *tracking.Limiter } // NewVectorSelector creates operator which selects vector of series. @@ -60,7 +60,7 @@ func NewVectorSelector( queryOpts *query.Options, offset time.Duration, shard, numShards int, - limits *limits.Limits, + tracker *tracking.Tracker, ) model.VectorOperator { o := &vectorSelector{ storage: selector, @@ -77,7 +77,7 @@ func NewVectorSelector( shard: shard, numShards: numShards, - acc: limits.Accounter(), + lim: tracker.Limiter(), } o.OperatorTelemetry = &model.NoopTelemetry{} if queryOpts.EnableAnalysis { @@ -116,7 +116,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { if o.currentStep > o.maxt { return nil, nil } - o.acc.StartNewBatch() + o.lim.StartNewBatch() if err := o.loadSeries(ctx); err != nil { return nil, err @@ -142,7 +142,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { if err != nil { return nil, err } - if err := o.acc.AddSample(); err != nil { + if err := o.lim.AddSample(); err != nil { return nil, err } if ok { diff --git a/execution/tracking/tracker.go b/execution/tracking/tracker.go new file mode 100644 index 00000000..6e4ab873 --- /dev/null +++ b/execution/tracking/tracker.go @@ -0,0 +1,79 @@ +package tracking + +import ( + "github.com/efficientgo/core/errors" + "go.uber.org/atomic" + + "github.com/thanos-io/promql-engine/query" +) + +// Tracker can be used to track samples that enter the engine and limit them. +type Tracker struct { + maxSamples int + samplesPerBatch []*atomic.Int64 +} + +func NewTracker(maxSamples int, opts *query.Options) *Tracker { + res := &Tracker{ + maxSamples: maxSamples, + samplesPerBatch: make([]*atomic.Int64, opts.NumSteps()), + } + for i := range res.samplesPerBatch { + res.samplesPerBatch[i] = atomic.NewInt64(0) + } + return res +} + +func (t *Tracker) addSamplesAndCheckLimits(batch, n int) error { + if t.maxSamples == 0 { + return nil + } + + if t.samplesPerBatch[batch].Load()+int64(n) > int64(t.maxSamples) { + return errors.New("query processing would load too many samples into memory in query execution") + } + t.samplesPerBatch[batch].Add(int64(n)) + + return nil +} + +// We only check every 100 added samples if the limit is breached. +// Doing so for every sample would be prohibitively expensive. +const resolution = 100 + +// Limiter provides a Limiter for the operator to track samples with. +func (t *Tracker) Limiter() *Limiter { + return &Limiter{ + tracker: t, + resolution: resolution, + } +} + +// Limiter is used to check limits in one batch. It will only +// check if the sample is safe to add every "resolution" samples. +// It is not safe for concurrent usage! +type Limiter struct { + tracker *Tracker + + curBatch int + samplesAdded int + resolution int +} + +func (l *Limiter) StartNewBatch() { + l.curBatch++ + l.samplesAdded = 0 +} + +func (l *Limiter) AddSample() error { + l.samplesAdded++ + if l.samplesAdded%l.resolution == 0 { + if err := l.tracker.addSamplesAndCheckLimits(l.curBatch-1, l.samplesAdded); err != nil { + // No need to reset samples here; if we return error; processing stops and no more + // samples will be added. + return err + } + l.samplesAdded = 0 + } + return nil +}