Skip to content

Commit

Permalink
execution: add count_values aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Mar 31, 2024
1 parent 7939b4f commit c1dd8ba
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 4 deletions.
19 changes: 19 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
end time.Time
step time.Duration
}{
{
name: "count_values",
load: `load 30s
version{foo="bar"} 1+1x40
version{foo="baz"} 1+2x40
version{foo="quz"} 2+1x40
`,
query: `count_values("val", version)`,
},
{
name: "duplicate label fuzz",
load: `load 30s
Expand Down Expand Up @@ -2771,6 +2780,16 @@ func TestInstantQuery(t *testing.T) {
query string
queryTime time.Time
}{
{
name: "count_values",
load: `load 30s
version{foo="bar"} 1
version{foo="baz"} 1
version{foo="quz"} 2
`,
query: `count_values("val", version)`,
queryTime: time.Unix(0, 0),
},
{
name: "binary pairing early exit fuzz",
load: `load 30s
Expand Down
4 changes: 2 additions & 2 deletions engine/enginefuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
// bottomk and topk sometimes lead to random failures since their result on equal values is essentially random
promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.QUANTILE}),
promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.COUNT_VALUES, parser.QUANTILE}),
}
ps := promqlsmith.New(rnd, seriesSet, psOpts...)

Expand Down Expand Up @@ -160,7 +160,7 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithAtModifierMaxTimestamp(180 * 1000),
// bottomk and topk sometimes lead to random failures since their result on equal values is essentially random
promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.QUANTILE}),
promqlsmith.WithEnabledAggrs([]parser.ItemType{parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.COUNT_VALUES, parser.QUANTILE}),
}
ps := promqlsmith.New(rnd, seriesSet, psOpts...)

Expand Down
148 changes: 148 additions & 0 deletions execution/aggregate/count_values.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package aggregate

import (
"context"
"fmt"
"strconv"
"sync"

"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/query"
)

type countValuesOperator struct {
model.OperatorTelemetry

pool *model.VectorPool
next model.VectorOperator
param string

stepsBatch int
curStep int

ts []int64
counts []map[float64]int
seriesIds map[float64]int
series []labels.Labels

once sync.Once
}

func NewCountValues(pool *model.VectorPool, next model.VectorOperator, param string, opts *query.Options) model.VectorOperator {
return &countValuesOperator{
pool: pool,
next: next,
param: param,
stepsBatch: opts.StepsBatch,
}
}

func (c *countValuesOperator) Explain() []model.VectorOperator {
return []model.VectorOperator{c.next}
}

func (c *countValuesOperator) GetPool() *model.VectorPool {
return c.pool
}

func (c *countValuesOperator) String() string {
return fmt.Sprintf("[countValues] '%s'", c.param)
}

func (c *countValuesOperator) Series(ctx context.Context) ([]labels.Labels, error) {
//start := time.Now()
//defer func() { c.AddExecutionTimeTaken(time.Since(start)) }()

var err error
c.once.Do(func() { err = c.initSeriesOnce(ctx) })
return c.series, err
}

func (c *countValuesOperator) Next(ctx context.Context) ([]model.StepVector, error) {
//start := time.Now()
//defer func() { c.AddExecutionTimeTaken(time.Since(start)) }()

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

var err error
c.once.Do(func() { err = c.initSeriesOnce(ctx) })
if err != nil {
return nil, err
}

if c.curStep >= len(c.ts) {
return nil, nil
}

batch := c.pool.GetVectorBatch()
for i := 0; i < c.stepsBatch; i++ {
if c.curStep >= len(c.ts) {
break
}
sv := c.pool.GetStepVector(c.ts[c.curStep])
for k, v := range c.counts[c.curStep] {
sv.AppendSample(c.pool, uint64(c.seriesIds[k]), float64(v))
}
batch = append(batch, sv)
c.curStep++
}
return batch, nil
}

func (c *countValuesOperator) initSeriesOnce(ctx context.Context) error {
_, err := c.next.Series(ctx)
if err != nil {
return err
}
ts := make([]int64, 0)
counts := make([]map[float64]int, 0)
series := make([]labels.Labels, 0)
seriesIds := make(map[float64]int, 0)

seenSeries := make(map[string]struct{})
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

in, err := c.next.Next(ctx)
if err != nil {
return err
}
if in == nil {
break
}
for i := range in {
ts = append(ts, in[i].T)
count := make(map[float64]int)
for j := range in[i].Samples {
count[in[i].Samples[j]]++
}
for k := range count {
fval := strconv.FormatFloat(k, 'f', -1, 64)
if _, ok := seenSeries[fval]; !ok {
seenSeries[fval] = struct{}{}
series = append(series, labels.FromStrings(c.param, fval))
seriesIds[k] = len(series) - 1
}
}
counts = append(counts, count)
}
c.next.GetPool().PutVectors(in)
}

c.ts = ts
c.counts = counts
c.series = series
c.seriesIds = seriesIds

return nil
}
6 changes: 4 additions & 2 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,17 @@ func newAggregateExpression(e *logicalplan.Aggregation, scanners storage.Scanner
if err != nil {
return nil, err
}
if e.Op == parser.COUNT_VALUES {
param := logicalplan.UnsafeUnwrapString(e.Param)
return aggregate.NewCountValues(model.NewVectorPool(opts.StepsBatch), next, param, opts), nil
}

if e.Param != nil && e.Param.ReturnType() != parser.ValueTypeString {
paramOp, err = newOperator(e.Param, scanners, opts, hints)
if err != nil {
return nil, err
}
}

if e.Op == parser.TOPK || e.Op == parser.BOTTOMK {
next, err = aggregate.NewKHashAggregate(model.NewVectorPool(opts.StepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, opts)
} else {
Expand All @@ -259,7 +262,6 @@ func newAggregateExpression(e *logicalplan.Aggregation, scanners storage.Scanner
}

return exchange.NewConcurrent(next, 2, opts), nil

}

func newBinaryExpression(e *logicalplan.Binary, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
Expand Down

0 comments on commit c1dd8ba

Please sign in to comment.