Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat gen4: allow pushing aggregations inside derived tables #10128

Merged
merged 2 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions go/vt/vtgate/planbuilder/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,30 @@ func (hp *horizonPlanning) pushAggregation(
grouping []abstract.GroupBy,
aggregations []abstract.Aggr,
ignoreOutputOrder bool,
) (groupingOffsets []offsets, outputAggrsOffset [][]offsets, err error) {
) (output logicalPlan, groupingOffsets []offsets, outputAggrsOffset [][]offsets, err error) {
switch plan := plan.(type) {
case *routeGen4:
output = plan
groupingOffsets, outputAggrsOffset, err = pushAggrOnRoute(ctx, plan, aggregations, grouping, ignoreOutputOrder)
if err != nil {
return nil, nil, err
}
return

case *joinGen4:
return hp.pushAggrOnJoin(ctx, grouping, aggregations, plan)
output = plan
groupingOffsets, outputAggrsOffset, err = hp.pushAggrOnJoin(ctx, plan, grouping, aggregations)
return

case *semiJoin:
return hp.pushAggrOnSemiJoin(ctx, grouping, aggregations, plan, ignoreOutputOrder)
output = plan
groupingOffsets, outputAggrsOffset, err = hp.pushAggrOnSemiJoin(ctx, plan, grouping, aggregations, ignoreOutputOrder)
return

case *simpleProjection:
// we just remove the simpleProjection. We are doing an OA on top anyway, so no need to clean up the output columns
return hp.pushAggregation(ctx, plan.input, grouping, aggregations, ignoreOutputOrder)

default:
return nil, nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "using aggregation on top of a %T plan is not yet supported", plan)
err = vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "using aggregation on top of a %T plan is not yet supported", plan)
return
}
}

Expand Down Expand Up @@ -195,9 +202,9 @@ vtgate level, we can offload most of the work to MySQL, and at the vtgate just s
*/
func (hp *horizonPlanning) pushAggrOnJoin(
ctx *plancontext.PlanningContext,
join *joinGen4,
grouping []abstract.GroupBy,
aggregations []abstract.Aggr,
join *joinGen4,
) ([]offsets, [][]offsets, error) {
// First we separate aggregations according to which side the dependencies are coming from
lhsAggrs, rhsAggrs, err := splitAggregationsToLeftAndRight(ctx, aggregations, join)
Expand All @@ -220,15 +227,16 @@ func (hp *horizonPlanning) pushAggrOnJoin(
}

// Next we push the aggregations to both sides
lhsOffsets, lhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Left, lhsGrouping, lhsAggrs, true)
newLHS, lhsOffsets, lhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Left, lhsGrouping, lhsAggrs, true)
if err != nil {
return nil, nil, err
}

rhsOffsets, rhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Right, rhsGrouping, rhsAggrs, true)
newRHS, rhsOffsets, rhsAggrOffsets, err := hp.filteredPushAggregation(ctx, join.Right, rhsGrouping, rhsAggrs, true)
if err != nil {
return nil, nil, err
}
join.Left, join.Right = newLHS, newRHS

// Next, we have to pass through the grouping values through the join and the projection we add on top
// We added new groupings to the LHS because of the join condition, so we don't want to pass through everything,
Expand Down Expand Up @@ -283,9 +291,9 @@ That way we get the aggregation grouped by the column we need to use to decide i
*/
func (hp *horizonPlanning) pushAggrOnSemiJoin(
ctx *plancontext.PlanningContext,
join *semiJoin,
grouping []abstract.GroupBy,
aggregations []abstract.Aggr,
join *semiJoin,
ignoreOutputOrder bool,
) ([]offsets, [][]offsets, error) {
// We need to group by the columns used in the join condition.
Expand All @@ -296,10 +304,11 @@ func (hp *horizonPlanning) pushAggrOnSemiJoin(
}

totalGrouping := append(grouping, lhsCols...)
groupingOffsets, aggrParams, err := hp.pushAggregation(ctx, join.lhs, totalGrouping, aggregations, ignoreOutputOrder)
newLeft, groupingOffsets, aggrParams, err := hp.pushAggregation(ctx, join.lhs, totalGrouping, aggregations, ignoreOutputOrder)
if err != nil {
return nil, nil, err
}
join.lhs = newLeft

outputGroupings := make([]offsets, 0, len(grouping))
for idx := range grouping {
Expand All @@ -321,7 +330,7 @@ func (hp *horizonPlanning) filteredPushAggregation(
grouping []abstract.GroupBy,
aggregations []*abstract.Aggr,
ignoreOutputOrder bool,
) (groupingOffsets []offsets, outputAggrs [][]offsets, err error) {
) (out logicalPlan, groupingOffsets []offsets, outputAggrs [][]offsets, err error) {
used := make([]bool, len(aggregations))
var aggrs []abstract.Aggr

Expand All @@ -331,9 +340,9 @@ func (hp *horizonPlanning) filteredPushAggregation(
aggrs = append(aggrs, *aggr)
}
}
groupingOffsets, pushedAggrs, err := hp.pushAggregation(ctx, plan, grouping, aggrs, ignoreOutputOrder)
newplan, groupingOffsets, pushedAggrs, err := hp.pushAggregation(ctx, plan, grouping, aggrs, ignoreOutputOrder)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
idx := 0
for _, b := range used {
Expand All @@ -344,7 +353,7 @@ func (hp *horizonPlanning) filteredPushAggregation(
outputAggrs = append(outputAggrs, pushedAggrs[idx])
idx++
}
return groupingOffsets, outputAggrs, nil
return newplan, groupingOffsets, outputAggrs, nil
}

func isMinOrMax(in engine.AggregateOpcode) bool {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,13 @@ func (hp *horizonPlanning) planAggrUsingOA(
oa.preProcess = true
}

groupingOffsets, aggrParamOffsets, err := hp.pushAggregation(ctx, plan, grouping, aggrs, false)
newPlan, groupingOffsets, aggrParamOffsets, err := hp.pushAggregation(ctx, plan, grouping, aggrs, false)
if err != nil {
return nil, err
}

plan = newPlan

_, isRoute := plan.(*routeGen4)
needsProj := !isRoute
var aggPlan = plan
Expand Down
52 changes: 52 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3895,3 +3895,55 @@ Gen4 plan same as above
"select count(distinct user_id, name) from user"
"unsupported: only one expression allowed inside aggregates: count(distinct user_id, `name`)"
Gen4 error: aggregate functions take a single argument 'count(distinct user_id, `name`)'

"select sum(col) from (select user.col as col, 32 from user join user_extra) t"
"unsupported: cross-shard query with aggregates"
{
"QueryType": "SELECT",
"Original": "select sum(col) from (select user.col as col, 32 from user join user_extra) t",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "sum(0) AS sum(col)",
"Inputs": [
{
"OperatorType": "Projection",
"Expressions": [
"[COLUMN 2] * [COLUMN 3] as sum(col)"
],
"Inputs": [
{
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "L:0,L:1,L:2,R:0",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.col as col, 32, sum(col) from `user` where 1 != 1",
"Query": "select `user`.col as col, 32, sum(col) from `user`",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*) from user_extra where 1 != 1",
"Query": "select count(*) from user_extra",
"Table": "user_extra"
}
]
}
]
}
]
}
}
Loading