Skip to content

Commit

Permalink
Tighten check for splitting aggregation node
Browse files Browse the repository at this point in the history
When a single-step aggregation has empty and non-empty grouping sets and
runs in a partitioned stage (i.e., sits on a partitioning exchange),
there needs to be a partial below the exchange that will produce default
intermediate output for the empty grouping set.

AddExchanges currently produces a "broken" plan without the partial,
which is fixed by PushPartialAggregationThroughExchange. The current
check is too broad though, as it forces the aggregation to be split
even if it runs in a single node (i.e., sits on top of a gathering
exchange). If the aggregation is not decomposable, it's not possible
to do so, and the planning fails.

This change tightens the check to force the split only for an
aggregation on top of a partitioning exchange.
  • Loading branch information
martint committed Mar 27, 2018
1 parent 0095d1d commit 3d4d6ca
Showing 1 changed file with 12 additions and 8 deletions.
Expand Up @@ -71,11 +71,22 @@ public Pattern<AggregationNode> getPattern()
@Override
public Result apply(AggregationNode aggregationNode, Captures captures, Context context)
{
PlanNode childNode = context.getLookup().resolve(aggregationNode.getSource());
if (!(childNode instanceof ExchangeNode)) {
return Result.empty();
}

ExchangeNode exchangeNode = (ExchangeNode) childNode;

boolean decomposable = aggregationNode.isDecomposable(functionRegistry);

if (aggregationNode.getStep().equals(SINGLE) &&
aggregationNode.hasEmptyGroupingSet() &&
aggregationNode.hasNonEmptyGroupingSet()) {
aggregationNode.hasNonEmptyGroupingSet() &&
exchangeNode.getType() == REPARTITION) {
// single-step aggregation w/ empty grouping sets in a partitioned stage, so we need a partial that will produce
// the default intermediates for the empty grouping set that will be routed to the appropriate final aggregation.
// TODO: technically, AddExchanges generates a broken plan that this rule "fixes"
checkState(
decomposable,
"Distributed aggregation with empty grouping set requires partial but functions are not decomposable");
Expand All @@ -86,13 +97,6 @@ public Result apply(AggregationNode aggregationNode, Captures captures, Context
return Result.empty();
}

PlanNode childNode = context.getLookup().resolve(aggregationNode.getSource());
if (!(childNode instanceof ExchangeNode)) {
return Result.empty();
}

ExchangeNode exchangeNode = (ExchangeNode) childNode;

// partial aggregation can only be pushed through exchange that doesn't change
// the cardinality of the stream (i.e., gather or repartition)
if ((exchangeNode.getType() != GATHER && exchangeNode.getType() != REPARTITION) ||
Expand Down

0 comments on commit 3d4d6ca

Please sign in to comment.