Skip to content

Commit

Permalink
feat gen4: allow pushing aggregations inside derived tables (#10128)
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Apr 22, 2022
1 parent b45af44 commit e91438d
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 21 deletions.
41 changes: 25 additions & 16 deletions go/vt/vtgate/planbuilder/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,30 @@ func (hp *horizonPlanning) pushAggregation(
grouping []abstract.GroupBy,
aggregations []abstract.Aggr,
ignoreOutputOrder bool,
) (groupingOffsets []offsets, outputAggrsOffset [][]offsets, err error) {
) (output logicalPlan, groupingOffsets []offsets, outputAggrsOffset [][]offsets, err error) {
switch plan := plan.(type) {
case *routeGen4:
output = plan
groupingOffsets, outputAggrsOffset, err = pushAggrOnRoute(ctx, plan, aggregations, grouping, ignoreOutputOrder)
if err != nil {
return nil, nil, err
}
return

case *joinGen4:
return hp.pushAggrOnJoin(ctx, grouping, aggregations, plan)
output = plan
groupingOffsets, outputAggrsOffset, err = hp.pushAggrOnJoin(ctx, plan, grouping, aggregations)
return

case *semiJoin:
return hp.pushAggrOnSemiJoin(ctx, grouping, aggregations, plan, ignoreOutputOrder)
output = plan
groupingOffsets, outputAggrsOffset, err = hp.pushAggrOnSemiJoin(ctx, plan, grouping, aggregations, ignoreOutputOrder)
return

case *simpleProjection:
// we just remove the simpleProjection. We are doing an OA on top anyway, so no need to clean up the output columns
return hp.pushAggregation(ctx, plan.input, grouping, aggregations, ignoreOutputOrder)

default:
return nil, nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "using aggregation on top of a %T plan is not yet supported", plan)
err = vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "using aggregation on top of a %T plan is not yet supported", plan)
return
}
}

Expand Down Expand Up @@ -195,9 +202,9 @@ vtgate level, we can offload most of the work to MySQL, and at the vtgate just s
*/
func (hp *horizonPlanning) pushAggrOnJoin(
ctx *plancontext.PlanningContext,
join *joinGen4,
grouping []abstract.GroupBy,
aggregations []abstract.Aggr,
join *joinGen4,
) ([]offsets, [][]offsets, error) {
// First we separate aggregations according to which side the dependencies are coming from
lhsAggrs, rhsAggrs, err := splitAggregationsToLeftAndRight(ctx, aggregations, join)
Expand All @@ -220,15 +227,16 @@ func (hp *horizonPlanning) pushAggrOnJoin(
}

// Next we push the aggregations to both sides
lhsOffsets, lhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Left, lhsGrouping, lhsAggrs, true)
newLHS, lhsOffsets, lhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Left, lhsGrouping, lhsAggrs, true)
if err != nil {
return nil, nil, err
}

rhsOffsets, rhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Right, rhsGrouping, rhsAggrs, true)
newRHS, rhsOffsets, rhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Right, rhsGrouping, rhsAggrs, true)
if err != nil {
return nil, nil, err
}
join.Left, join.Right = newLHS, newRHS

// Next, we have to pass through the grouping values through the join and the projection we add on top
// We added new groupings to the LHS because of the join condition, so we don't want to pass through everything,
Expand Down Expand Up @@ -283,9 +291,9 @@ That way we get the aggregation grouped by the column we need to use to decide i
*/
func (hp *horizonPlanning) pushAggrOnSemiJoin(
ctx *plancontext.PlanningContext,
join *semiJoin,
grouping []abstract.GroupBy,
aggregations []abstract.Aggr,
join *semiJoin,
ignoreOutputOrder bool,
) ([]offsets, [][]offsets, error) {
// We need to group by the columns used in the join condition.
Expand All @@ -296,10 +304,11 @@ func (hp *horizonPlanning) pushAggrOnSemiJoin(
}

totalGrouping := append(grouping, lhsCols...)
groupingOffsets, aggrParams, err := hp.pushAggregation(ctx, join.lhs, totalGrouping, aggregations, ignoreOutputOrder)
newLeft, groupingOffsets, aggrParams, err := hp.pushAggregation(ctx, join.lhs, totalGrouping, aggregations, ignoreOutputOrder)
if err != nil {
return nil, nil, err
}
join.lhs = newLeft

outputGroupings := make([]offsets, 0, len(grouping))
for idx := range grouping {
Expand All @@ -321,7 +330,7 @@ func (hp *horizonPlanning) filteredPushAggregation(
grouping []abstract.GroupBy,
aggregations []*abstract.Aggr,
ignoreOutputOrder bool,
) (groupingOffsets []offsets, outputAggrs [][]offsets, err error) {
) (out logicalPlan, groupingOffsets []offsets, outputAggrs [][]offsets, err error) {
used := make([]bool, len(aggregations))
var aggrs []abstract.Aggr

Expand All @@ -331,9 +340,9 @@ func (hp *horizonPlanning) filteredPushAggregation(
aggrs = append(aggrs, *aggr)
}
}
groupingOffsets, pushedAggrs, err := hp.pushAggregation(ctx, plan, grouping, aggrs, ignoreOutputOrder)
newplan, groupingOffsets, pushedAggrs, err := hp.pushAggregation(ctx, plan, grouping, aggrs, ignoreOutputOrder)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
idx := 0
for _, b := range used {
Expand All @@ -344,7 +353,7 @@ func (hp *horizonPlanning) filteredPushAggregation(
outputAggrs = append(outputAggrs, pushedAggrs[idx])
idx++
}
return groupingOffsets, outputAggrs, nil
return newplan, groupingOffsets, outputAggrs, nil
}

func isMinOrMax(in engine.AggregateOpcode) bool {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,13 @@ func (hp *horizonPlanning) planAggrUsingOA(
oa.preProcess = true
}

groupingOffsets, aggrParamOffsets, err := hp.pushAggregation(ctx, plan, grouping, aggrs, false)
newPlan, groupingOffsets, aggrParamOffsets, err := hp.pushAggregation(ctx, plan, grouping, aggrs, false)
if err != nil {
return nil, err
}

plan = newPlan

_, isRoute := plan.(*routeGen4)
needsProj := !isRoute
var aggPlan = plan
Expand Down
52 changes: 52 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3895,3 +3895,55 @@ Gen4 plan same as above
"select count(distinct user_id, name) from user"
"unsupported: only one expression allowed inside aggregates: count(distinct user_id, `name`)"
Gen4 error: aggregate functions take a single argument 'count(distinct user_id, `name`)'

"select sum(col) from (select user.col as col, 32 from user join user_extra) t"
"unsupported: cross-shard query with aggregates"
{
"QueryType": "SELECT",
"Original": "select sum(col) from (select user.col as col, 32 from user join user_extra) t",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "sum(0) AS sum(col)",
"Inputs": [
{
"OperatorType": "Projection",
"Expressions": [
"[COLUMN 2] * [COLUMN 3] as sum(col)"
],
"Inputs": [
{
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "L:0,L:1,L:2,R:0",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.col as col, 32, sum(col) from `user` where 1 != 1",
"Query": "select `user`.col as col, 32, sum(col) from `user`",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*) from user_extra where 1 != 1",
"Query": "select count(*) from user_extra",
"Table": "user_extra"
}
]
}
]
}
]
}
}
Loading

0 comments on commit e91438d

Please sign in to comment.