Skip to content

Commit

Permalink
feat(query): add planner rule for converting aggregate window to a pu…
Browse files Browse the repository at this point in the history
…sh down (#23586)
  • Loading branch information
jsternberg committed Jul 26, 2022
1 parent 619eb1c commit a9f751f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 71 deletions.
56 changes: 54 additions & 2 deletions query/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ func init() {
PushDownWindowAggregateRule{},
PushDownWindowForceAggregateRule{},
PushDownWindowAggregateByTimeRule{},
PushDownAggregateWindowRule{},
PushDownBareAggregateRule{},
GroupWindowAggregateTransposeRule{},
PushDownGroupAggregateRule{},
)
// TODO(lesam): re-enable MergeFilterRule once it works with complex use cases
// such as filter() |> geo.strictFilter(). See geo_merge_filter flux test.
//plan.RegisterLogicalRules(
// plan.RegisterLogicalRules(
// MergeFiltersRule{},
//)
// )
}

type FromStorageRule struct{}
Expand Down Expand Up @@ -857,6 +858,57 @@ func (PushDownWindowAggregateByTimeRule) Rewrite(ctx context.Context, pn plan.No
return plan.CreateUniquePhysicalNode(ctx, "ReadWindowAggregateByTime", windowAggregateSpec), true, nil
}

type PushDownAggregateWindowRule struct{}

func (p PushDownAggregateWindowRule) Name() string {
return "PushDownAggregateWindowRule"
}

func (p PushDownAggregateWindowRule) Pattern() plan.Pattern {
return plan.Pat(universe.AggregateWindowKind,
plan.Pat(ReadRangePhysKind))
}

func (p PushDownAggregateWindowRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
aggregateWindowSpec := pn.ProcedureSpec().(*universe.AggregateWindowProcedureSpec)
fromNode := pn.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*ReadRangePhysSpec)

if !isPushableWindow(aggregateWindowSpec.WindowSpec) {
return pn, false, nil
}

if aggregateWindowSpec.ValueCol != execute.DefaultValueColLabel {
return pn, false, nil
}

switch aggregateWindowSpec.AggregateKind {
case universe.MinKind, universe.MaxKind,
universe.MeanKind, universe.CountKind, universe.SumKind,
universe.FirstKind, universe.LastKind:
// All of these are supported.
default:
return pn, false, nil
}

windowAggregateSpec := &ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
Aggregates: []plan.ProcedureKind{
aggregateWindowSpec.AggregateKind,
},
WindowEvery: aggregateWindowSpec.WindowSpec.Window.Every,
Offset: aggregateWindowSpec.WindowSpec.Window.Offset,
CreateEmpty: aggregateWindowSpec.WindowSpec.CreateEmpty,
TimeColumn: execute.DefaultStopColLabel,
ForceAggregate: aggregateWindowSpec.ForceAggregate,
}
if aggregateWindowSpec.UseStart {
windowAggregateSpec.TimeColumn = execute.DefaultStartColLabel
}

return plan.CreateUniquePhysicalNode(ctx, "ReadWindowAggregateByTime", windowAggregateSpec), true, nil
}

// PushDownBareAggregateRule is a rule that allows pushing down of aggregates
// that are directly over a ReadRange source.
type PushDownBareAggregateRule struct{}
Expand Down
Loading

0 comments on commit a9f751f

Please sign in to comment.