Skip to content

Commit

Permalink
Fix query concurrency (#861)
Browse files Browse the repository at this point in the history
* Make concurrency configurable as physicalplan option

* Support single concurrency

The aggregators exepct to have a final aggregation.
In the concurrency = 1 case we weren't correctly aggregating
because no final aggregation was created.

This removes the requirement for concurrency to be > 1 for a syncronizer
and final aggregation to be created.

Ideally in the future we would just schedule a single aggregation that
handles both being a normal and final aggregation but this is a simple
fix for an edge case that isn't important at this point.
  • Loading branch information
thorfour committed May 10, 2024
1 parent 35845d4 commit 7697384
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 69 deletions.
119 changes: 68 additions & 51 deletions query/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,83 @@ import (
"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
)

func TestUniqueAggregation(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
tests := map[string]struct {
execOptions []physicalplan.Option
}{
"no concurrency": {
execOptions: []physicalplan.Option{
physicalplan.WithConcurrency(1),
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)
},
"default": {
execOptions: []physicalplan.Option{},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)

rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()
schema, err := dynparquet.SchemaFromDefinition(&schemapb.Schema{
Name: "test",
Columns: []*schemapb.Column{{
Name: "example",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}, {
Name: "timestamp",
StorageLayout: &schemapb.StorageLayout{
Type: schemapb.StorageLayout_TYPE_INT64,
},
}},
})
require.NoError(t, err)

rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)
rb := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{{
Name: "example",
Type: arrow.PrimitiveTypes.Int64,
}, {
Name: "timestamp",
Type: arrow.PrimitiveTypes.Int64,
}}, nil))
defer rb.Release()

r := rb.NewRecord()
defer r.Release()
rb.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
rb.Field(1).(*array.Int64Builder).AppendValues([]int64{1, 1, 3}, nil)

ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
r := rb.NewRecord()
defer r.Release()

ran := false
err = NewEngine(mem, &FakeTableProvider{
Tables: map[string]logicalplan.TableReader{
"test": &FakeTableReader{
FrostdbSchema: schema,
Records: []arrow.Record{r},
},
},
}, WithPhysicalplanOptions(test.execOptions...)).ScanTable("test").
Aggregate(
[]*logicalplan.AggregationFunction{logicalplan.Unique(logicalplan.Col("example"))},
[]logicalplan.Expr{logicalplan.Col("timestamp")},
).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
require.Equal(t, []int64{1, 3}, r.Column(0).(*array.Int64).Int64Values())
require.True(t, r.Column(1).(*array.Int64).IsNull(0))
require.True(t, r.Column(1).(*array.Int64).IsValid(1))
require.Equal(t, int64(3), r.Column(1).(*array.Int64).Value(1))
ran = true
return nil
})
require.NoError(t, err)
require.True(t, ran)
})
require.NoError(t, err)
require.True(t, ran)
}
}

func TestAndAggregation(t *testing.T) {
Expand Down
47 changes: 29 additions & 18 deletions query/physicalplan/physicalplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import (
"github.com/polarsignals/frostdb/recovery"
)

// TODO: Make this smarter.
var concurrencyHardcoded = runtime.GOMAXPROCS(0)
var defaultConcurrency = runtime.GOMAXPROCS(0)

type PhysicalPlan interface {
Callback(ctx context.Context, r arrow.Record) error
Expand Down Expand Up @@ -255,34 +254,47 @@ func (p *noopOperator) Draw() *Diagram {
return p.next.Draw()
}

type execOptions struct {
type ExecOptions struct {
orderedAggregations bool
overrideInput []PhysicalPlan
readMode logicalplan.ReadMode
concurrency int
}

type Option func(o *execOptions)
func NewExecOptions() ExecOptions {
return ExecOptions{
concurrency: defaultConcurrency,
}
}

type Option func(o *ExecOptions)

func WithReadMode(m logicalplan.ReadMode) Option {
return func(o *execOptions) {
return func(o *ExecOptions) {
o.readMode = m
}
}

func WithOrderedAggregations() Option {
return func(o *execOptions) {
return func(o *ExecOptions) {
o.orderedAggregations = true
}
}

// WithOverrideInput can be used to provide an input stage on top of which the
// Build function can build the physical plan.
func WithOverrideInput(input []PhysicalPlan) Option {
return func(o *execOptions) {
return func(o *ExecOptions) {
o.overrideInput = input
}
}

func WithConcurrency(concurrency int) Option {
return func(o *ExecOptions) {
o.concurrency = concurrency
}
}

func Build(
ctx context.Context,
pool memory.Allocator,
Expand All @@ -294,7 +306,7 @@ func Build(
_, span := tracer.Start(ctx, "PhysicalPlan/Build")
defer span.End()

execOpts := execOptions{}
execOpts := NewExecOptions()
for _, o := range options {
o(&execOpts)
}
Expand All @@ -318,7 +330,7 @@ func Build(
// Create noop operators since we don't know what to push the scan
// results to. In a following node visit, these noops will have
// SetNext called on them and push to the correct operator.
plans := make([]PhysicalPlan, concurrencyHardcoded)
plans := make([]PhysicalPlan, execOpts.concurrency)
for i := range plans {
plans[i] = &noopOperator{}
}
Expand All @@ -333,7 +345,7 @@ func Build(
// Create noop operators since we don't know what to push the scan
// results to. In a following node visit, these noops will have
// SetNext called on them and push to the correct operator.
plans := make([]PhysicalPlan, concurrencyHardcoded)
plans := make([]PhysicalPlan, execOpts.concurrency)
for i := range plans {
plans[i] = &noopOperator{}
}
Expand Down Expand Up @@ -435,13 +447,12 @@ func Build(
ordered = false
}
var sync PhysicalPlan
if len(prev) > 1 {
// These aggregate operators need to be synchronized.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
// These aggregate operators need to be synchronized.
// NOTE: that in the case of concurrency 1 we still add a syncronizer because the Aggregation operator expects a final aggregation to be performed.
if ordered && len(plan.Aggregation.GroupExprs) > 0 {
sync = NewOrderedSynchronizer(pool, len(prev), plan.Aggregation.GroupExprs)
} else {
sync = Synchronize(len(prev))
}
seed := maphash.MakeSeed()
for i := 0; i < len(prev); i++ {
Expand Down Expand Up @@ -500,7 +511,7 @@ func Build(
}

func shouldPlanOrderedAggregate(
execOpts execOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation,
execOpts ExecOptions, oInfo *planOrderingInfo, agg *logicalplan.Aggregation,
) (bool, error) {
if !execOpts.orderedAggregations {
// Ordered aggregations disabled.
Expand Down

0 comments on commit 7697384

Please sign in to comment.