From a502e0777c3c3442bb97e347497931ce82912e9f Mon Sep 17 00:00:00 2001 From: Haozhun Jin Date: Thu, 25 Jun 2015 09:41:41 -0700 Subject: [PATCH] Wrap PreferredProperties in a new context in AddExchange --- .../planner/optimizations/AddExchanges.java | 152 ++++++++++-------- 1 file changed, 87 insertions(+), 65 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 2c1552eeaff4..2837f45dd096 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -132,12 +132,32 @@ public PlanNode optimize(PlanNode plan, Session session, Map types boolean distributedJoinEnabled = SystemSessionProperties.isDistributedJoinEnabled(session, distributedJoins); boolean redistributeWrites = SystemSessionProperties.isRedistributeWrites(session, this.redistributeWrites); boolean preferStreamingOperators = SystemSessionProperties.preferStreamingOperators(session, false); - PlanWithProperties result = plan.accept(new Rewriter(symbolAllocator, idAllocator, symbolAllocator, session, distributedIndexJoins, distributedJoinEnabled, preferStreamingOperators, redistributeWrites), PreferredProperties.any()); + PlanWithProperties result = plan.accept(new Rewriter(symbolAllocator, idAllocator, symbolAllocator, session, distributedIndexJoins, distributedJoinEnabled, preferStreamingOperators, redistributeWrites), new Context(PreferredProperties.any())); return result.getNode(); } + private static class Context + { + private PreferredProperties preferredProperties; + + Context(PreferredProperties preferredProperties) + { + this.preferredProperties = preferredProperties; + } + + Context withPreferredProperties(PreferredProperties preferredProperties) + { + return new Context(preferredProperties); + } + + PreferredProperties getPreferredProperties() + { + return preferredProperties; + } + } + private class Rewriter - extends PlanVisitor + extends PlanVisitor { private final SymbolAllocator allocator; private final PlanNodeIdAllocator idAllocator; @@ -161,24 +181,24 @@ public Rewriter(SymbolAllocator allocator, PlanNodeIdAllocator idAllocator, Symb } @Override - protected PlanWithProperties visitPlan(PlanNode node, PreferredProperties preferred) + protected PlanWithProperties visitPlan(PlanNode node, Context context) { - return rebaseAndDeriveProperties(node, planChild(node, preferred)); + return rebaseAndDeriveProperties(node, planChild(node, context)); } @Override - public PlanWithProperties visitProject(ProjectNode node, PreferredProperties preferred) + public PlanWithProperties visitProject(ProjectNode node, Context context) { Map identities = computeIdentityTranslations(node.getAssignments()); - PreferredProperties translatedPreferred = preferred.translate(symbol -> Optional.ofNullable(identities.get(symbol))); + PreferredProperties translatedPreferred = context.getPreferredProperties().translate(symbol -> Optional.ofNullable(identities.get(symbol))); - return rebaseAndDeriveProperties(node, planChild(node, translatedPreferred)); + return rebaseAndDeriveProperties(node, planChild(node, context.withPreferredProperties(translatedPreferred))); } @Override - public PlanWithProperties visitOutput(OutputNode node, PreferredProperties preferred) + public PlanWithProperties visitOutput(OutputNode node, Context context) { - PlanWithProperties child = planChild(node, PreferredProperties.any()); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.any())); if (child.getProperties().isDistributed()) { child = withDerivedProperties( @@ -190,7 +210,7 @@ public PlanWithProperties visitOutput(OutputNode node, PreferredProperties prefe } @Override - public PlanWithProperties visitAggregation(AggregationNode node, PreferredProperties preferred) + public PlanWithProperties visitAggregation(AggregationNode node, Context context) { boolean decomposable = node.getFunctions() .values().stream() @@ -200,9 +220,9 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper PreferredProperties preferredProperties = node.getGroupBy().isEmpty() ? PreferredProperties.any() - : PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getGroupBy()), Optional.of(node.getGroupBy()), grouped(node.getGroupBy())); + : PreferredProperties.derivePreferences(context.getPreferredProperties(), ImmutableSet.copyOf(node.getGroupBy()), Optional.of(node.getGroupBy()), grouped(node.getGroupBy())); - PlanWithProperties child = planChild(node, preferredProperties); + PlanWithProperties child = planChild(node, context.withPreferredProperties(preferredProperties)); if (!child.getProperties().isDistributed()) { // If already unpartitioned, just drop the single aggregation back on @@ -296,10 +316,10 @@ private PlanWithProperties splitAggregation(AggregationNode node, PlanWithProper } @Override - public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, PreferredProperties preferred) + public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, Context context) { - PreferredProperties preferredChildProperties = PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getDistinctSymbols()), Optional.of(node.getDistinctSymbols()), grouped(node.getDistinctSymbols())); - PlanWithProperties child = node.getSource().accept(this, preferredChildProperties); + PreferredProperties preferredChildProperties = PreferredProperties.derivePreferences(context.getPreferredProperties(), ImmutableSet.copyOf(node.getDistinctSymbols()), Optional.of(node.getDistinctSymbols()), grouped(node.getDistinctSymbols())); + PlanWithProperties child = node.getSource().accept(this, context.withPreferredProperties(preferredChildProperties)); if (!child.getProperties().isDistributed() || !child.getProperties().isPartitionedOn(node.getDistinctSymbols())) { @@ -316,7 +336,7 @@ public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, PreferredProp } @Override - public PlanWithProperties visitWindow(WindowNode node, PreferredProperties preferred) + public PlanWithProperties visitWindow(WindowNode node, Context context) { List> desiredProperties = new ArrayList<>(); if (!node.getPartitionBy().isEmpty()) { @@ -326,7 +346,9 @@ public PlanWithProperties visitWindow(WindowNode node, PreferredProperties prefe desiredProperties.add(new SortingProperty<>(symbol, node.getOrderings().get(symbol))); } - PlanWithProperties child = planChild(node, PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getPartitionBy()), desiredProperties)); + PlanWithProperties child = planChild( + node, + context.withPreferredProperties(PreferredProperties.derivePreferences(context.getPreferredProperties(), ImmutableSet.copyOf(node.getPartitionBy()), desiredProperties))); if (!child.getProperties().isPartitionedOn(node.getPartitionBy())) { if (node.getPartitionBy().isEmpty()) { @@ -376,10 +398,10 @@ public PlanWithProperties visitWindow(WindowNode node, PreferredProperties prefe } @Override - public PlanWithProperties visitRowNumber(RowNumberNode node, PreferredProperties preferred) + public PlanWithProperties visitRowNumber(RowNumberNode node, Context context) { if (node.getPartitionBy().isEmpty()) { - PlanWithProperties child = planChild(node, PreferredProperties.undistributed()); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.undistributed())); if (child.getProperties().isDistributed()) { child = withDerivedProperties( @@ -390,7 +412,7 @@ public PlanWithProperties visitRowNumber(RowNumberNode node, PreferredProperties return rebaseAndDeriveProperties(node, child); } - PlanWithProperties child = planChild(node, PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy()))); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.derivePreferences(context.getPreferredProperties(), ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy())))); // TODO: add config option/session property to force parallel plan if child is unpartitioned and window has a PARTITION BY clause if (!child.getProperties().isPartitionedOn(node.getPartitionBy())) { @@ -409,7 +431,7 @@ public PlanWithProperties visitRowNumber(RowNumberNode node, PreferredProperties } @Override - public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, PreferredProperties preferred) + public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, Context context) { PreferredProperties preferredChildProperties; Function addExchange; @@ -419,11 +441,11 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, PreferredPr addExchange = partial -> gatheringExchange(idAllocator.getNextId(), partial); } else { - preferredChildProperties = PreferredProperties.derivePreferences(preferred, ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy())); + preferredChildProperties = PreferredProperties.derivePreferences(context.getPreferredProperties(), ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy())); addExchange = partial -> partitionedExchange(idAllocator.getNextId(), partial, Optional.of(node.getPartitionBy()), node.getHashSymbol()); } - PlanWithProperties child = planChild(node, preferredChildProperties); + PlanWithProperties child = planChild(node, context.withPreferredProperties(preferredChildProperties)); if (!child.getProperties().isPartitionedOn(node.getPartitionBy())) { // add exchange + push function to child child = withDerivedProperties( @@ -446,9 +468,9 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, PreferredPr } @Override - public PlanWithProperties visitTopN(TopNNode node, PreferredProperties preferred) + public PlanWithProperties visitTopN(TopNNode node, Context context) { - PlanWithProperties child = planChild(node, PreferredProperties.any()); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.any())); if (child.getProperties().isDistributed()) { child = withDerivedProperties( @@ -464,9 +486,9 @@ public PlanWithProperties visitTopN(TopNNode node, PreferredProperties preferred } @Override - public PlanWithProperties visitSort(SortNode node, PreferredProperties preferred) + public PlanWithProperties visitSort(SortNode node, Context context) { - PlanWithProperties child = planChild(node, PreferredProperties.undistributed()); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.undistributed())); if (child.getProperties().isDistributed()) { child = withDerivedProperties( @@ -478,9 +500,9 @@ public PlanWithProperties visitSort(SortNode node, PreferredProperties preferred } @Override - public PlanWithProperties visitLimit(LimitNode node, PreferredProperties preferred) + public PlanWithProperties visitLimit(LimitNode node, Context context) { - PlanWithProperties child = planChild(node, PreferredProperties.any()); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.any())); if (child.getProperties().isDistributed()) { child = withDerivedProperties( @@ -496,9 +518,9 @@ public PlanWithProperties visitLimit(LimitNode node, PreferredProperties preferr } @Override - public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, PreferredProperties preferred) + public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, Context context) { - PlanWithProperties child = planChild(node, PreferredProperties.any()); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.any())); if (child.getProperties().isDistributed()) { child = withDerivedProperties( @@ -516,23 +538,23 @@ public PlanWithProperties visitDistinctLimit(DistinctLimitNode node, PreferredPr } @Override - public PlanWithProperties visitFilter(FilterNode node, PreferredProperties preferred) + public PlanWithProperties visitFilter(FilterNode node, Context context) { if (node.getSource() instanceof TableScanNode) { - return planTableScan((TableScanNode) node.getSource(), node.getPredicate(), preferred); + return planTableScan((TableScanNode) node.getSource(), node.getPredicate(), context); } - return rebaseAndDeriveProperties(node, planChild(node, preferred)); + return rebaseAndDeriveProperties(node, planChild(node, context)); } @Override - public PlanWithProperties visitTableScan(TableScanNode node, PreferredProperties preferred) + public PlanWithProperties visitTableScan(TableScanNode node, Context context) { - return planTableScan(node, BooleanLiteral.TRUE_LITERAL, preferred); + return planTableScan(node, BooleanLiteral.TRUE_LITERAL, context); } @Override - public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProperties context) + public PlanWithProperties visitTableWriter(TableWriterNode node, Context context) { PlanWithProperties source = node.getSource().accept(this, context); if (redistributeWrites) { @@ -544,7 +566,7 @@ public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProper return rebaseAndDeriveProperties(node, source); } - private PlanWithProperties planTableScan(TableScanNode node, Expression predicate, PreferredProperties preferred) + private PlanWithProperties planTableScan(TableScanNode node, Expression predicate, Context context) { // don't include non-deterministic predicates Expression deterministicPredicate = stripNonDeterministicConjuncts(predicate); @@ -617,7 +639,7 @@ private PlanWithProperties planTableScan(TableScanNode node, Expression predicat }) .collect(toList()); - return pickPlan(possiblePlans, preferred); + return pickPlan(possiblePlans, context); } private Predicate layoutHasAllNeededOutputs(TableScanNode node) @@ -629,13 +651,13 @@ private Predicate layoutHasAllNeededOutputs(TableScanNode nod /** * possiblePlans should be provided in layout preference order */ - private PlanWithProperties pickPlan(List possiblePlans, PreferredProperties preferred) + private PlanWithProperties pickPlan(List possiblePlans, Context context) { checkArgument(!possiblePlans.isEmpty()); if (preferStreamingOperators) { possiblePlans = new ArrayList<>(possiblePlans); - Collections.sort(possiblePlans, Comparator.comparing(PlanWithProperties::getProperties, streamingExecutionPreference(preferred))); // stable sort; is Collections.min() guaranteed to be stable? + Collections.sort(possiblePlans, Comparator.comparing(PlanWithProperties::getProperties, streamingExecutionPreference(context.getPreferredProperties()))); // stable sort; is Collections.min() guaranteed to be stable? } return possiblePlans.get(0); @@ -660,15 +682,15 @@ private boolean shouldPrune(Expression predicate, Map assi } @Override - public PlanWithProperties visitValues(ValuesNode node, PreferredProperties preferred) + public PlanWithProperties visitValues(ValuesNode node, Context context) { return new PlanWithProperties(node, ActualProperties.undistributed()); } @Override - public PlanWithProperties visitTableCommit(TableCommitNode node, PreferredProperties preferred) + public PlanWithProperties visitTableCommit(TableCommitNode node, Context context) { - PlanWithProperties child = planChild(node, PreferredProperties.any()); + PlanWithProperties child = planChild(node, context.withPreferredProperties(PreferredProperties.any())); if (child.getProperties().isDistributed() || !child.getProperties().isCoordinatorOnly()) { child = withDerivedProperties( gatheringExchange(idAllocator.getNextId(), child.getNode()), @@ -679,7 +701,7 @@ public PlanWithProperties visitTableCommit(TableCommitNode node, PreferredProper } @Override - public PlanWithProperties visitJoin(JoinNode node, PreferredProperties preferred) + public PlanWithProperties visitJoin(JoinNode node, Context context) { List leftSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getLeft); List rightSymbols = Lists.transform(node.getCriteria(), JoinNode.EquiJoinClause::getRight); @@ -690,8 +712,8 @@ public PlanWithProperties visitJoin(JoinNode node, PreferredProperties preferred if (distributedJoins || node.getType() == FULL || node.getType() == RIGHT) { // The implementation of full outer join only works if the data is hash partitioned. See LookupJoinOperators#buildSideOuterJoinUnvisitedPositions - left = node.getLeft().accept(this, PreferredProperties.hashPartitioned(leftSymbols)); - right = node.getRight().accept(this, PreferredProperties.hashPartitioned(rightSymbols)); + left = node.getLeft().accept(this, context.withPreferredProperties(PreferredProperties.hashPartitioned(leftSymbols))); + right = node.getRight().accept(this, context.withPreferredProperties(PreferredProperties.hashPartitioned(rightSymbols))); // force partitioning if (!left.getProperties().isHashPartitionedOn(leftSymbols)) { @@ -709,8 +731,8 @@ public PlanWithProperties visitJoin(JoinNode node, PreferredProperties preferred else { // It can only be INNER or LEFT here. Therefore, no flipping is necessary even though the below code assumes the node is not RIGHT. - left = node.getLeft().accept(this, PreferredProperties.any()); - right = node.getRight().accept(this, PreferredProperties.any()); + left = node.getLeft().accept(this, context.withPreferredProperties(PreferredProperties.any())); + right = node.getRight().accept(this, context.withPreferredProperties(PreferredProperties.any())); if (!left.getProperties().isDistributed() && right.getProperties().isDistributed()) { // force single-node join @@ -742,10 +764,10 @@ else if (left.getProperties().isDistributed() && !(left.getProperties().isHashPa } @Override - public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties preferred) + public PlanWithProperties visitSemiJoin(SemiJoinNode node, Context context) { - PlanWithProperties source = node.getSource().accept(this, PreferredProperties.any()); - PlanWithProperties filteringSource = node.getFilteringSource().accept(this, PreferredProperties.any()); + PlanWithProperties source = node.getSource().accept(this, context.withPreferredProperties(PreferredProperties.any())); + PlanWithProperties filteringSource = node.getFilteringSource().accept(this, context.withPreferredProperties(PreferredProperties.any())); // make filtering source match requirements of source if (source.getProperties().isDistributed()) { @@ -772,23 +794,23 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p } @Override - public PlanWithProperties visitIndexJoin(IndexJoinNode node, PreferredProperties preferredProperties) + public PlanWithProperties visitIndexJoin(IndexJoinNode node, Context context) { List joinColumns = Lists.transform(node.getCriteria(), IndexJoinNode.EquiJoinClause::getProbe); // Only prefer grouping on join columns if no parent local property preferences - List> desiredLocalProperties = preferredProperties.getLocalProperties().isEmpty() ? grouped(joinColumns) : ImmutableList.of(); + List> desiredLocalProperties = context.getPreferredProperties().getLocalProperties().isEmpty() ? grouped(joinColumns) : ImmutableList.of(); - PlanWithProperties probeSource = node.getProbeSource().accept(this, PreferredProperties.derivePreferences(preferredProperties, ImmutableSet.copyOf(joinColumns), desiredLocalProperties)); + PlanWithProperties probeSource = node.getProbeSource().accept(this, context.withPreferredProperties(PreferredProperties.derivePreferences(context.getPreferredProperties(), ImmutableSet.copyOf(joinColumns), desiredLocalProperties))); ActualProperties probeProperties = probeSource.getProperties(); - PlanWithProperties indexSource = node.getIndexSource().accept(this, PreferredProperties.any()); + PlanWithProperties indexSource = node.getIndexSource().accept(this, context.withPreferredProperties(PreferredProperties.any())); // TODO: allow repartitioning if unpartitioned to increase parallelism if (distributedIndexJoins && probeProperties.isDistributed()) { // Force partitioned exchange if we are not effectively partitioned on the join keys, or if the probe is currently executing as a single stream // and the repartitioning will make a difference. - boolean parentPartitioningPreferences = preferredProperties.getGlobalProperties() + boolean parentPartitioningPreferences = context.getPreferredProperties().getGlobalProperties() .flatMap(PreferredProperties.Global::getPartitioningProperties) .isPresent(); boolean enableSinglePartitionRedistribute = !parentPartitioningPreferences || !preferStreamingOperators; @@ -807,15 +829,15 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, PreferredProperties } @Override - public PlanWithProperties visitIndexSource(IndexSourceNode node, PreferredProperties context) + public PlanWithProperties visitIndexSource(IndexSourceNode node, Context context) { return new PlanWithProperties(node, ActualProperties.undistributed()); } @Override - public PlanWithProperties visitUnion(UnionNode node, PreferredProperties preferred) + public PlanWithProperties visitUnion(UnionNode node, Context context) { - if (!preferred.getGlobalProperties().isPresent() || !preferred.getGlobalProperties().get().isHashPartitioned()) { + if (!context.getPreferredProperties().getGlobalProperties().isPresent() || !context.getPreferredProperties().getGlobalProperties().get().isHashPartitioned()) { // first, classify children into partitioned and unpartitioned List unpartitionedChildren = new ArrayList<>(); List> unpartitionedOutputLayouts = new ArrayList<>(); @@ -825,7 +847,7 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties preferr List sources = node.getSources(); for (int i = 0; i < sources.size(); i++) { - PlanWithProperties child = sources.get(i).accept(this, PreferredProperties.any()); + PlanWithProperties child = sources.get(i).accept(this, context.withPreferredProperties(PreferredProperties.any())); if (!child.getProperties().isDistributed()) { unpartitionedChildren.add(child.getNode()); unpartitionedOutputLayouts.add(node.sourceOutputLayout(i)); @@ -870,7 +892,7 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties preferr } // hash partition the sources - List hashingColumns = preferred.getGlobalProperties().get().getPartitioningProperties().get().getHashingOrder().get(); + List hashingColumns = context.getPreferredProperties().getGlobalProperties().get().getPartitioningProperties().get().getHashingOrder().get(); ImmutableList.Builder partitionedSources = ImmutableList.builder(); ImmutableListMultimap.Builder outputToSourcesMapping = ImmutableListMultimap.builder(); @@ -882,7 +904,7 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties preferr } List sourceHashColumns = hashColumnsBuilder.build(); - PlanWithProperties source = node.getSources().get(sourceIndex).accept(this, PreferredProperties.hashPartitioned(sourceHashColumns)); + PlanWithProperties source = node.getSources().get(sourceIndex).accept(this, context.withPreferredProperties(PreferredProperties.hashPartitioned(sourceHashColumns))); if (!source.getProperties().isHashPartitionedOn(sourceHashColumns)) { source = withDerivedProperties( partitionedExchange( @@ -901,9 +923,9 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties preferr return new PlanWithProperties(new UnionNode(node.getId(), partitionedSources.build(), outputToSourcesMapping.build()), ActualProperties.hashPartitioned(hashingColumns)); } - private PlanWithProperties planChild(PlanNode node, PreferredProperties preferred) + private PlanWithProperties planChild(PlanNode node, Context context) { - return getOnlyElement(node.getSources()).accept(this, preferred); + return getOnlyElement(node.getSources()).accept(this, context); } private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, PlanWithProperties child)