Skip to content

Commit

Permalink
Minor refactor in LocalExecutionPlanner
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Oct 23, 2015
1 parent 478305e commit f471934
Showing 1 changed file with 5 additions and 5 deletions.
Expand Up @@ -702,13 +702,13 @@ public PhysicalOperation visitAggregation(AggregationNode node, LocalExecutionPl

OperatorFactory exchangeSource = createRandomDistribution(context.getNextOperatorId(), exchange);
source = new PhysicalOperation(exchangeSource, source.getLayout());
return planGroupByAggregation(node, source, context, Optional.empty());
return planGroupByAggregation(node, source, context.getNextOperatorId(), Optional.empty());
}

int aggregationConcurrency = getTaskAggregationConcurrency(session);
if (node.getStep() == Step.PARTIAL || !context.isAllowLocalParallel() || context.getDriverInstanceCount() > 1 || aggregationConcurrency <= 1) {
PhysicalOperation source = node.getSource().accept(this, context);
return planGroupByAggregation(node, source, context, Optional.empty());
return planGroupByAggregation(node, source, context.getNextOperatorId(), Optional.empty());
}

// create context for parallel operators
Expand Down Expand Up @@ -749,7 +749,7 @@ public PhysicalOperation visitAggregation(AggregationNode node, LocalExecutionPl
source = new PhysicalOperation(hashPartitionMask, source.getLayout(), source);

// plan aggregation
PhysicalOperation operation = planGroupByAggregation(node, source, parallelContext, Optional.of(defaultMaskChannel));
PhysicalOperation operation = planGroupByAggregation(node, source, parallelContext.getNextOperatorId(), Optional.of(defaultMaskChannel));

// merge parallel tasks back into a single stream
operation = addInMemoryExchange(context, operation, parallelContext);
Expand Down Expand Up @@ -1712,7 +1712,7 @@ private PhysicalOperation planGlobalAggregation(int operatorId, AggregationNode
return new PhysicalOperation(operatorFactory, outputMappings.build(), source);
}

private PhysicalOperation planGroupByAggregation(AggregationNode node, PhysicalOperation source, LocalExecutionPlanContext context, Optional<Integer> defaultMaskChannel)
private PhysicalOperation planGroupByAggregation(AggregationNode node, PhysicalOperation source, int operatorId, Optional<Integer> defaultMaskChannel)
{
List<Symbol> groupBySymbols = node.getGroupBy();

Expand Down Expand Up @@ -1752,7 +1752,7 @@ private PhysicalOperation planGroupByAggregation(AggregationNode node, PhysicalO
Optional<Integer> hashChannel = node.getHashSymbol().map(channelGetter(source));

OperatorFactory operatorFactory = new HashAggregationOperatorFactory(
context.getNextOperatorId(),
operatorId,
groupByTypes,
groupByChannels,
node.getStep(),
Expand Down

0 comments on commit f471934

Please sign in to comment.