Skip to content

Commit

Permalink
Refactor PlanFragment
Browse files Browse the repository at this point in the history
Replace the materializedExchangeSource field with the outputTableWriterFragment
  • Loading branch information
arhimondr committed May 31, 2019
1 parent 7bd1426 commit effeaa6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 33 deletions.
Expand Up @@ -480,10 +480,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
processedInputPositions += stageStats.getProcessedInputPositions();
}

if (plan.isMaterializedExchangeSource()) {
writtenIntermediatePhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes();
}
else {
if (plan.isOutputTableWriterFragment()) {
writtenOutputPositions += stageInfo.getStageStats().getOperatorSummaries().stream()
.filter(stats -> stats.getOperatorType().equals(TableWriterOperator.class.getSimpleName()))
.mapToLong(OperatorStats::getInputPositions)
Expand All @@ -494,6 +491,9 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
.sum();
writtenOutputPhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes();
}
else {
writtenIntermediatePhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes();
}
}

stageGcStatistics.add(stageStats.getGcInfo());
Expand Down
Expand Up @@ -50,7 +50,7 @@ public class PlanFragment
private final List<RemoteSourceNode> remoteSourceNodes;
private final PartitioningScheme partitioningScheme;
private final StageExecutionDescriptor stageExecutionDescriptor;
private final boolean materializedExchangeSource;
private final boolean outputTableWriterFragment;
private final StatsAndCosts statsAndCosts;
private final Optional<String> jsonRepresentation;

Expand All @@ -63,7 +63,7 @@ public PlanFragment(
@JsonProperty("tableScanSchedulingOrder") List<PlanNodeId> tableScanSchedulingOrder,
@JsonProperty("partitioningScheme") PartitioningScheme partitioningScheme,
@JsonProperty("stageExecutionDescriptor") StageExecutionDescriptor stageExecutionDescriptor,
@JsonProperty("materializedExchangeSource") boolean materializedExchangeSource,
@JsonProperty("outputTableWriterFragment") boolean outputTableWriterFragment,
@JsonProperty("statsAndCosts") StatsAndCosts statsAndCosts,
@JsonProperty("jsonRepresentation") Optional<String> jsonRepresentation)
{
Expand All @@ -73,7 +73,7 @@ public PlanFragment(
this.partitioning = requireNonNull(partitioning, "partitioning is null");
this.tableScanSchedulingOrder = ImmutableList.copyOf(requireNonNull(tableScanSchedulingOrder, "tableScanSchedulingOrder is null"));
this.stageExecutionDescriptor = requireNonNull(stageExecutionDescriptor, "stageExecutionDescriptor is null");
this.materializedExchangeSource = materializedExchangeSource;
this.outputTableWriterFragment = outputTableWriterFragment;
this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null");
this.jsonRepresentation = requireNonNull(jsonRepresentation, "jsonRepresentation is null");

Expand Down Expand Up @@ -134,9 +134,9 @@ public StageExecutionDescriptor getStageExecutionDescriptor()
}

@JsonProperty
public boolean isMaterializedExchangeSource()
public boolean isOutputTableWriterFragment()
{
return materializedExchangeSource;
return outputTableWriterFragment;
}

@JsonProperty
Expand Down Expand Up @@ -199,17 +199,17 @@ private static void findRemoteSourceNodes(PlanNode node, Builder<RemoteSourceNod

public PlanFragment withBucketToPartition(Optional<int[]> bucketToPartition)
{
return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme.withBucketToPartition(bucketToPartition), stageExecutionDescriptor, materializedExchangeSource, statsAndCosts, jsonRepresentation);
return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme.withBucketToPartition(bucketToPartition), stageExecutionDescriptor, outputTableWriterFragment, statsAndCosts, jsonRepresentation);
}

public PlanFragment withFixedLifespanScheduleGroupedExecution(List<PlanNodeId> capableTableScanNodes)
{
return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.fixedLifespanScheduleGroupedExecution(capableTableScanNodes), materializedExchangeSource, statsAndCosts, jsonRepresentation);
return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.fixedLifespanScheduleGroupedExecution(capableTableScanNodes), outputTableWriterFragment, statsAndCosts, jsonRepresentation);
}

public PlanFragment withDynamicLifespanScheduleGroupedExecution(List<PlanNodeId> capableTableScanNodes)
{
return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.dynamicLifespanScheduleGroupedExecution(capableTableScanNodes), materializedExchangeSource, statsAndCosts, jsonRepresentation);
return new PlanFragment(id, root, symbols, partitioning, tableScanSchedulingOrder, partitioningScheme, StageExecutionDescriptor.dynamicLifespanScheduleGroupedExecution(capableTableScanNodes), outputTableWriterFragment, statsAndCosts, jsonRepresentation);
}

@Override
Expand Down
Expand Up @@ -113,6 +113,8 @@
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.Streams.stream;
import static com.google.common.graph.Traverser.forTree;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -149,11 +151,10 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod
warningCollector,
sqlParser,
idAllocator,
new SymbolAllocator(plan.getTypes().allTypes()));
new SymbolAllocator(plan.getTypes().allTypes()),
getTableWriterNodeIds(plan.getRoot()));

FragmentProperties properties = new FragmentProperties(
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getRoot().getOutputSymbols()),
false);
FragmentProperties properties = new FragmentProperties(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getRoot().getOutputSymbols()));
if (forceSingleNode || isForceSingleNodeOutput(session)) {
properties = properties.setSingleNodeDistribution();
}
Expand Down Expand Up @@ -243,7 +244,7 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub
outputPartitioningScheme.isReplicateNullsAndAny(),
outputPartitioningScheme.getBucketToPartition()),
fragment.getStageExecutionDescriptor(),
fragment.isMaterializedExchangeSource(),
fragment.isOutputTableWriterFragment(),
fragment.getStatsAndCosts(),
fragment.getJsonRepresentation());

Expand All @@ -254,6 +255,14 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub
return new SubPlan(newFragment, childrenBuilder.build());
}

private static Set<PlanNodeId> getTableWriterNodeIds(PlanNode plan)
{
return stream(forTree(PlanNode::getSources).depthFirstPreOrder(plan))
.filter(node -> node instanceof TableWriterNode)
.map(PlanNode::getId)
.collect(toImmutableSet());
}

private static class Fragmenter
extends SimplePlanRewriter<FragmentProperties>
{
Expand All @@ -268,6 +277,7 @@ private static class Fragmenter
private final WarningCollector warningCollector;
private final SqlParser sqlParser;
private final LiteralEncoder literalEncoder;
private final Set<PlanNodeId> outputTableWriterNodeIds;
private int nextFragmentId = ROOT_FRAGMENT_ID + 1;

public Fragmenter(
Expand All @@ -278,7 +288,8 @@ public Fragmenter(
WarningCollector warningCollector,
SqlParser sqlParser,
PlanNodeIdAllocator idAllocator,
SymbolAllocator symbolAllocator)
SymbolAllocator symbolAllocator,
Set<PlanNodeId> outputTableWriterNodeIds)
{
this.session = requireNonNull(session, "session is null");
this.metadata = requireNonNull(metadata, "metadata is null");
Expand All @@ -289,6 +300,7 @@ public Fragmenter(
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null");
this.literalEncoder = new LiteralEncoder(metadata.getBlockEncodingSerde());
this.outputTableWriterNodeIds = ImmutableSet.copyOf(requireNonNull(outputTableWriterNodeIds, "outputTableWriterNodeIds is null"));
}

public SubPlan buildRootFragment(PlanNode root, FragmentProperties properties)
Expand All @@ -312,6 +324,17 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan

Map<Symbol, Type> fragmentSymbolTypes = filterKeys(symbolAllocator.getTypes().allTypes(), in(extractOutputSymbols(root)));
planSanityChecker.validatePlanFragment(root, session, metadata, sqlParser, TypeProvider.viewOf(fragmentSymbolTypes), warningCollector);

Set<PlanNodeId> tableWriterNodeIds = getTableWriterNodeIds(root);
boolean outputTableWriterFragment = tableWriterNodeIds.stream().anyMatch(outputTableWriterNodeIds::contains);
if (outputTableWriterFragment) {
verify(
outputTableWriterNodeIds.containsAll(tableWriterNodeIds),
"outputTableWriterNodeIds %s must include either all or none of tableWriterNodeIds %s",
outputTableWriterNodeIds,
tableWriterNodeIds);
}

PlanFragment fragment = new PlanFragment(
fragmentId,
root,
Expand All @@ -320,7 +343,7 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
schedulingOrder,
properties.getPartitioningScheme(),
StageExecutionDescriptor.ungroupedExecution(),
properties.isMaterializedExchangeSource(),
outputTableWriterFragment,
statsAndCosts.getForSubplan(root),
Optional.of(jsonFragmentPlan(root, fragmentSymbolTypes, metadata.getFunctionManager(), session)));

Expand Down Expand Up @@ -422,9 +445,7 @@ else if (exchange.getType() == ExchangeNode.Type.REPARTITION) {

ImmutableList.Builder<SubPlan> builder = ImmutableList.builder();
for (int sourceIndex = 0; sourceIndex < exchange.getSources().size(); sourceIndex++) {
FragmentProperties childProperties = new FragmentProperties(
partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex)),
context.get().isMaterializedExchangeSource());
FragmentProperties childProperties = new FragmentProperties(partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex)));
builder.add(buildSubPlan(exchange.getSources().get(sourceIndex), childProperties, context));
}

Expand Down Expand Up @@ -484,9 +505,7 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite
partitioningSymbolAssignments.getConstants(),
partitioningMetadata);

FragmentProperties writeProperties = new FragmentProperties(
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), write.getOutputSymbols()),
true);
FragmentProperties writeProperties = new FragmentProperties(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), write.getOutputSymbols()));
writeProperties.setCoordinatorOnlyDistribution();

List<SubPlan> children = ImmutableList.of(buildSubPlan(write, writeProperties, context));
Expand Down Expand Up @@ -684,15 +703,13 @@ private static class FragmentProperties
private final List<SubPlan> children = new ArrayList<>();

private final PartitioningScheme partitioningScheme;
private final boolean materializedExchangeSource;

private Optional<PartitioningHandle> partitioningHandle = Optional.empty();
private final Set<PlanNodeId> partitionedSources = new HashSet<>();

public FragmentProperties(PartitioningScheme partitioningScheme, boolean materializedExchangeSource)
public FragmentProperties(PartitioningScheme partitioningScheme)
{
this.partitioningScheme = partitioningScheme;
this.materializedExchangeSource = materializedExchangeSource;
}

public List<SubPlan> getChildren()
Expand Down Expand Up @@ -811,11 +828,6 @@ public PartitioningScheme getPartitioningScheme()
return partitioningScheme;
}

public boolean isMaterializedExchangeSource()
{
return materializedExchangeSource;
}

public PartitioningHandle getPartitioningHandle()
{
return partitioningHandle.get();
Expand Down

0 comments on commit effeaa6

Please sign in to comment.