diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java index 83dbcdf930f6..089fe09240b6 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java @@ -480,10 +480,7 @@ private QueryStats getQueryStats(Optional rootStage) processedInputPositions += stageStats.getProcessedInputPositions(); } - if (plan.isMaterializedExchangeSource()) { - writtenIntermediatePhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes(); - } - else { + if (plan.isOutputTableWriterFragment()) { writtenOutputPositions += stageInfo.getStageStats().getOperatorSummaries().stream() .filter(stats -> stats.getOperatorType().equals(TableWriterOperator.class.getSimpleName())) .mapToLong(OperatorStats::getInputPositions) @@ -494,6 +491,9 @@ private QueryStats getQueryStats(Optional rootStage) .sum(); writtenOutputPhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes(); } + else { + writtenIntermediatePhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes(); + } } stageGcStatistics.add(stageStats.getGcInfo()); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java index fa34654470c0..d3f650085eb8 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java @@ -50,7 +50,7 @@ public class PlanFragment private final List remoteSourceNodes; private final PartitioningScheme partitioningScheme; private final StageExecutionDescriptor stageExecutionDescriptor; - private final boolean materializedExchangeSource; + private final boolean outputTableWriterFragment; private final StatsAndCosts statsAndCosts; private final Optional jsonRepresentation; @@ -63,7 +63,7 @@ public PlanFragment( @JsonProperty("tableScanSchedulingOrder") List tableScanSchedulingOrder, @JsonProperty("partitioningScheme") PartitioningScheme partitioningScheme, @JsonProperty("stageExecutionDescriptor") StageExecutionDescriptor stageExecutionDescriptor, - @JsonProperty("materializedExchangeSource") boolean materializedExchangeSource, + @JsonProperty("outputTableWriterFragment") boolean outputTableWriterFragment, @JsonProperty("statsAndCosts") StatsAndCosts statsAndCosts, @JsonProperty("jsonRepresentation") Optional jsonRepresentation) { @@ -73,7 +73,7 @@ public PlanFragment( this.partitioning = requireNonNull(partitioning, "partitioning is null"); this.tableScanSchedulingOrder = ImmutableList.copyOf(requireNonNull(tableScanSchedulingOrder, "tableScanSchedulingOrder is null")); this.stageExecutionDescriptor = requireNonNull(stageExecutionDescriptor, "stageExecutionDescriptor is null"); - this.materializedExchangeSource = materializedExchangeSource; + this.outputTableWriterFragment = outputTableWriterFragment; this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null"); this.jsonRepresentation = requireNonNull(jsonRepresentation, "jsonRepresentation is null"); @@ -134,9 +134,9 @@ public StageExecutionDescriptor getStageExecutionDescriptor() } @JsonProperty - public boolean isMaterializedExchangeSource() + public boolean isOutputTableWriterFragment() { - return materializedExchangeSource; + return outputTableWriterFragment; } @JsonProperty @@ -199,17 +199,17 @@ private static void findRemoteSourceNodes(PlanNode node, Builder bucketToPartition) { - return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme.withBucketToPartition(bucketToPartition), stageExecutionDescriptor, materializedExchangeSource, statsAndCosts, jsonRepresentation); + return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme.withBucketToPartition(bucketToPartition), stageExecutionDescriptor, outputTableWriterFragment, statsAndCosts, jsonRepresentation); } public PlanFragment withFixedLifespanScheduleGroupedExecution(List capableTableScanNodes) { - return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.fixedLifespanScheduleGroupedExecution(capableTableScanNodes), materializedExchangeSource, statsAndCosts, jsonRepresentation); + return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.fixedLifespanScheduleGroupedExecution(capableTableScanNodes), outputTableWriterFragment, statsAndCosts, jsonRepresentation); } public PlanFragment withDynamicLifespanScheduleGroupedExecution(List capableTableScanNodes) { - return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.dynamicLifespanScheduleGroupedExecution(capableTableScanNodes), materializedExchangeSource, statsAndCosts, jsonRepresentation); + return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.dynamicLifespanScheduleGroupedExecution(capableTableScanNodes), outputTableWriterFragment, statsAndCosts, jsonRepresentation); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index 69e0bdf28fb7..57441db1a614 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -113,6 +113,8 @@ import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Maps.filterKeys; +import static com.google.common.collect.Streams.stream; +import static com.google.common.graph.Traverser.forTree; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; @@ -149,11 +151,10 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod warningCollector, sqlParser, idAllocator, - new SymbolAllocator(plan.getTypes().allTypes())); + new SymbolAllocator(plan.getTypes().allTypes()), + getTableWriterNodeIds(plan.getRoot())); - FragmentProperties properties = new FragmentProperties( - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getRoot().getOutputSymbols()), - false); + FragmentProperties properties = new FragmentProperties(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getRoot().getOutputSymbols())); if (forceSingleNode || isForceSingleNodeOutput(session)) { properties = properties.setSingleNodeDistribution(); } @@ -243,7 +244,7 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub outputPartitioningScheme.isReplicateNullsAndAny(), outputPartitioningScheme.getBucketToPartition()), fragment.getStageExecutionDescriptor(), - fragment.isMaterializedExchangeSource(), + fragment.isOutputTableWriterFragment(), fragment.getStatsAndCosts(), fragment.getJsonRepresentation()); @@ -254,6 +255,14 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub return new SubPlan(newFragment, childrenBuilder.build()); } + private static Set getTableWriterNodeIds(PlanNode plan) + { + return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan)) + .filter(node -> node instanceof TableWriterNode) + .map(PlanNode::getId) + .collect(toImmutableSet()); + } + private static class Fragmenter extends SimplePlanRewriter { @@ -268,6 +277,7 @@ private static class Fragmenter private final WarningCollector warningCollector; private final SqlParser sqlParser; private final LiteralEncoder literalEncoder; + private final Set outputTableWriterNodeIds; private int nextFragmentId = ROOT_FRAGMENT_ID + 1; public Fragmenter( @@ -278,7 +288,8 @@ public Fragmenter( WarningCollector warningCollector, SqlParser sqlParser, PlanNodeIdAllocator idAllocator, - SymbolAllocator symbolAllocator) + SymbolAllocator symbolAllocator, + Set outputTableWriterNodeIds) { this.session = requireNonNull(session, "session is null"); this.metadata = requireNonNull(metadata, "metadata is null"); @@ -289,6 +300,7 @@ public Fragmenter( this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null"); this.literalEncoder = new LiteralEncoder(metadata.getBlockEncodingSerde()); + this.outputTableWriterNodeIds = ImmutableSet.copyOf(requireNonNull(outputTableWriterNodeIds, "outputTableWriterNodeIds is null")); } public SubPlan buildRootFragment(PlanNode root, FragmentProperties properties) @@ -312,6 +324,17 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan Map fragmentSymbolTypes = filterKeys(symbolAllocator.getTypes().allTypes(), in(extractOutputSymbols(root))); planSanityChecker.validatePlanFragment(root, session, metadata, sqlParser, TypeProvider.viewOf(fragmentSymbolTypes), warningCollector); + + Set tableWriterNodeIds = getTableWriterNodeIds(root); + boolean outputTableWriterFragment = tableWriterNodeIds.stream().anyMatch(outputTableWriterNodeIds::contains); + if (outputTableWriterFragment) { + verify( + outputTableWriterNodeIds.containsAll(tableWriterNodeIds), + "outputTableWriterNodeIds %s must include either all or none of tableWriterNodeIds %s", + outputTableWriterNodeIds, + tableWriterNodeIds); + } + PlanFragment fragment = new PlanFragment( fragmentId, root, @@ -320,7 +343,7 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan schedulingOrder, properties.getPartitioningScheme(), StageExecutionDescriptor.ungroupedExecution(), - properties.isMaterializedExchangeSource(), + outputTableWriterFragment, statsAndCosts.getForSubplan(root), Optional.of(jsonFragmentPlan(root, fragmentSymbolTypes, metadata.getFunctionManager(), session))); @@ -422,9 +445,7 @@ else if (exchange.getType() == ExchangeNode.Type.REPARTITION) { ImmutableList.Builder builder = ImmutableList.builder(); for (int sourceIndex = 0; sourceIndex < exchange.getSources().size(); sourceIndex++) { - FragmentProperties childProperties = new FragmentProperties( - partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex)), - context.get().isMaterializedExchangeSource()); + FragmentProperties childProperties = new FragmentProperties(partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex))); builder.add(buildSubPlan(exchange.getSources().get(sourceIndex), childProperties, context)); } @@ -484,9 +505,7 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite partitioningSymbolAssignments.getConstants(), partitioningMetadata); - FragmentProperties writeProperties = new FragmentProperties( - new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), write.getOutputSymbols()), - true); + FragmentProperties writeProperties = new FragmentProperties(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), write.getOutputSymbols())); writeProperties.setCoordinatorOnlyDistribution(); List children = ImmutableList.of(buildSubPlan(write, writeProperties, context)); @@ -684,15 +703,13 @@ private static class FragmentProperties private final List children = new ArrayList<>(); private final PartitioningScheme partitioningScheme; - private final boolean materializedExchangeSource; private Optional partitioningHandle = Optional.empty(); private final Set partitionedSources = new HashSet<>(); - public FragmentProperties(PartitioningScheme partitioningScheme, boolean materializedExchangeSource) + public FragmentProperties(PartitioningScheme partitioningScheme) { this.partitioningScheme = partitioningScheme; - this.materializedExchangeSource = materializedExchangeSource; } public List getChildren() @@ -811,11 +828,6 @@ public PartitioningScheme getPartitioningScheme() return partitioningScheme; } - public boolean isMaterializedExchangeSource() - { - return materializedExchangeSource; - } - public PartitioningHandle getPartitioningHandle() { return partitioningHandle.get();