Skip to content

Commit

Permalink
Rename StageExecutionStrategy to StageExecutionDescriptor
Browse files Browse the repository at this point in the history
In later commit, StageExecutionStrategy will be added as an enum
indicates stage execution strategy. Besides UNGROUPED_EXECUTION,
GROUPED_EXECUTION can be further divided into fixed lifespan schedule
and dynamic lifespan schedule.

StageExecutionDescriptor will contain more stage execution details, such
as the execution strategy for each individual scan nodes.

Extracted-From: https://github.com/prestodb/presto
  • Loading branch information
wenleix authored and electrum committed Feb 5, 2019
1 parent 7ff3b40 commit c2f23eb
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 85 deletions.
Expand Up @@ -35,7 +35,7 @@
import io.prestosql.operator.DriverStats;
import io.prestosql.operator.PipelineContext;
import io.prestosql.operator.PipelineExecutionStrategy;
import io.prestosql.operator.StageExecutionStrategy;
import io.prestosql.operator.StageExecutionDescriptor;
import io.prestosql.operator.TaskContext;
import io.prestosql.sql.planner.LocalExecutionPlanner.LocalExecutionPlan;
import io.prestosql.sql.planner.plan.PlanNodeId;
Expand Down Expand Up @@ -221,7 +221,7 @@ private SqlTaskExecution(
taskContext,
localExecutionPlan.getDriverFactories().stream()
.collect(toImmutableMap(DriverFactory::getPipelineId, DriverFactory::getPipelineExecutionStrategy)));
this.schedulingLifespanManager = new SchedulingLifespanManager(localExecutionPlan.getPartitionedSourceOrder(), localExecutionPlan.getStageExecutionStrategy(), this.status);
this.schedulingLifespanManager = new SchedulingLifespanManager(localExecutionPlan.getPartitionedSourceOrder(), localExecutionPlan.getStageExecutionDescriptor(), this.status);

checkArgument(this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(partitionedSources),
"Fragment is partitioned, but not all partitioned drivers were found");
Expand Down Expand Up @@ -763,7 +763,7 @@ private static class SchedulingLifespanManager
// Note that different drivers in a task may have different pipelineExecutionStrategy.

private final List<PlanNodeId> sourceStartOrder;
private final StageExecutionStrategy stageExecutionStrategy;
private final StageExecutionDescriptor stageExecutionDescriptor;
private final Status status;

private final Map<Lifespan, SchedulingLifespan> lifespans = new HashMap<>();
Expand All @@ -774,10 +774,10 @@ private static class SchedulingLifespanManager

private int maxScheduledPlanNodeOrdinal;

public SchedulingLifespanManager(List<PlanNodeId> sourceStartOrder, StageExecutionStrategy stageExecutionStrategy, Status status)
public SchedulingLifespanManager(List<PlanNodeId> sourceStartOrder, StageExecutionDescriptor stageExecutionDescriptor, Status status)
{
this.sourceStartOrder = ImmutableList.copyOf(sourceStartOrder);
this.stageExecutionStrategy = stageExecutionStrategy;
this.stageExecutionDescriptor = stageExecutionDescriptor;
this.status = requireNonNull(status, "status is null");
}

Expand Down Expand Up @@ -872,7 +872,7 @@ public Optional<PlanNodeId> getSchedulingPlanNode()
// i.e. One of the following bullet points is true:
// * The execution strategy of the plan node is grouped. And lifespan represents a driver group.
// * The execution strategy of the plan node is ungrouped. And lifespan is task wide.
if (manager.stageExecutionStrategy.isGroupedExecution(manager.sourceStartOrder.get(schedulingPlanNodeOrdinal)) != lifespan.isTaskWide()) {
if (manager.stageExecutionDescriptor.isGroupedExecution(manager.sourceStartOrder.get(schedulingPlanNodeOrdinal)) != lifespan.isTaskWide()) {
return Optional.of(manager.sourceStartOrder.get(schedulingPlanNodeOrdinal));
}
// This lifespan is incompatible with the plan node. As a result, this method should either
Expand Down
Expand Up @@ -77,7 +77,7 @@ public SqlTaskExecution create(Session session, QueryContext queryContext, TaskS
fragment.getRoot(),
TypeProvider.copyOf(fragment.getSymbols()),
fragment.getPartitioningScheme(),
fragment.getStageExecutionStrategy(),
fragment.getStageExecutionDescriptor(),
fragment.getPartitionedSources(),
outputBuffer);
}
Expand Down
Expand Up @@ -26,7 +26,7 @@
import io.prestosql.execution.scheduler.group.FixedLifespanScheduler;
import io.prestosql.execution.scheduler.group.LifespanScheduler;
import io.prestosql.metadata.Split;
import io.prestosql.operator.StageExecutionStrategy;
import io.prestosql.operator.StageExecutionDescriptor;
import io.prestosql.spi.Node;
import io.prestosql.spi.connector.ConnectorPartitionHandle;
import io.prestosql.split.SplitSource;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class FixedSourcePartitionedScheduler
public FixedSourcePartitionedScheduler(
SqlStageExecution stage,
Map<PlanNodeId, SplitSource> splitSources,
StageExecutionStrategy stageExecutionStrategy,
StageExecutionDescriptor stageExecutionDescriptor,
List<PlanNodeId> schedulingOrder,
List<Node> nodes,
BucketNodeMap bucketNodeMap,
Expand All @@ -91,7 +91,7 @@ public FixedSourcePartitionedScheduler(

ArrayList<SourceScheduler> sourceSchedulers = new ArrayList<>();
checkArgument(
partitionHandles.equals(ImmutableList.of(NOT_PARTITIONED)) != stageExecutionStrategy.isAnyScanGroupedExecution(),
partitionHandles.equals(ImmutableList.of(NOT_PARTITIONED)) != stageExecutionDescriptor.isAnyScanGroupedExecution(),
"PartitionHandles should be [NOT_PARTITIONED] if and only if all scan nodes use ungrouped execution strategy");
int nodeCount = nodes.size();
int concurrentLifespans;
Expand All @@ -106,7 +106,7 @@ public FixedSourcePartitionedScheduler(
Optional<LifespanScheduler> groupedLifespanScheduler = Optional.empty();
for (PlanNodeId planNodeId : schedulingOrder) {
SplitSource splitSource = splitSources.get(planNodeId);
boolean groupedExecutionForScanNode = stageExecutionStrategy.isGroupedExecution(planNodeId);
boolean groupedExecutionForScanNode = stageExecutionDescriptor.isGroupedExecution(planNodeId);
SourceScheduler sourceScheduler = newSourcePartitionedSchedulerAsSourceScheduler(
stage,
planNodeId,
Expand All @@ -115,14 +115,14 @@ public FixedSourcePartitionedScheduler(
Math.max(splitBatchSize / concurrentLifespans, 1),
groupedExecutionForScanNode);

if (stageExecutionStrategy.isAnyScanGroupedExecution() && !groupedExecutionForScanNode) {
if (stageExecutionDescriptor.isAnyScanGroupedExecution() && !groupedExecutionForScanNode) {
sourceScheduler = new AsGroupedSourceScheduler(sourceScheduler);
}
sourceSchedulers.add(sourceScheduler);

if (firstPlanNode) {
firstPlanNode = false;
if (!stageExecutionStrategy.isAnyScanGroupedExecution()) {
if (!stageExecutionDescriptor.isAnyScanGroupedExecution()) {
sourceScheduler.startLifespan(Lifespan.taskWide(), NOT_PARTITIONED);
sourceScheduler.noMoreLifespans();
}
Expand Down
Expand Up @@ -327,7 +327,7 @@ private List<SqlStageExecution> createStages(
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId);
SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stage::getAllTasks);

checkArgument(!plan.getFragment().getStageExecutionStrategy().isAnyScanGroupedExecution());
checkArgument(!plan.getFragment().getStageExecutionDescriptor().isAnyScanGroupedExecution());
stageSchedulers.put(stageId, newSourcePartitionedSchedulerAsStageScheduler(stage, planNodeId, splitSource, placementPolicy, splitBatchSize));
bucketToPartition = Optional.of(new int[1]);
}
Expand All @@ -341,7 +341,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
List<PlanNodeId> schedulingOrder = plan.getFragment().getPartitionedSources();
ConnectorId connectorId = partitioningHandle.getConnectorId().orElseThrow(IllegalStateException::new);
List<ConnectorPartitionHandle> connectorPartitionHandles;
boolean groupedExecutionForStage = plan.getFragment().getStageExecutionStrategy().isAnyScanGroupedExecution();
boolean groupedExecutionForStage = plan.getFragment().getStageExecutionDescriptor().isAnyScanGroupedExecution();
if (groupedExecutionForStage) {
connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
checkState(!ImmutableList.of(NOT_PARTITIONED).equals(connectorPartitionHandles));
Expand Down Expand Up @@ -378,7 +378,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
stageSchedulers.put(stageId, new FixedSourcePartitionedScheduler(
stage,
splitSources,
plan.getFragment().getStageExecutionStrategy(),
plan.getFragment().getStageExecutionDescriptor(),
schedulingOrder,
stageNodeList,
bucketNodeMap,
Expand Down
Expand Up @@ -24,25 +24,25 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class StageExecutionStrategy
public class StageExecutionDescriptor
{
private final Set<PlanNodeId> groupedExecutionScanNodes;

private StageExecutionStrategy(Set<PlanNodeId> groupedExecutionScanNodes)
private StageExecutionDescriptor(Set<PlanNodeId> groupedExecutionScanNodes)
{
this.groupedExecutionScanNodes = groupedExecutionScanNodes;
}

public static StageExecutionStrategy ungroupedExecution()
public static StageExecutionDescriptor ungroupedExecution()
{
return new StageExecutionStrategy(ImmutableSet.of());
return new StageExecutionDescriptor(ImmutableSet.of());
}

public static StageExecutionStrategy groupedExecution(List<PlanNodeId> capableScanNodes)
public static StageExecutionDescriptor groupedExecution(List<PlanNodeId> capableScanNodes)
{
requireNonNull(capableScanNodes, "capableScanNodes is null");
checkArgument(!capableScanNodes.isEmpty());
return new StageExecutionStrategy(ImmutableSet.copyOf(capableScanNodes));
return new StageExecutionDescriptor(ImmutableSet.copyOf(capableScanNodes));
}

public boolean isAnyScanGroupedExecution()
Expand All @@ -56,10 +56,10 @@ public boolean isGroupedExecution(PlanNodeId scanNodeId)
}

@JsonCreator
public static StageExecutionStrategy jsonCreator(
public static StageExecutionDescriptor jsonCreator(
@JsonProperty("groupedExecutionScanNodes") Set<PlanNodeId> groupedExecutionCapableScanNodes)
{
return new StageExecutionStrategy(ImmutableSet.copyOf(requireNonNull(groupedExecutionCapableScanNodes, "groupedExecutionScanNodes is null")));
return new StageExecutionDescriptor(ImmutableSet.copyOf(requireNonNull(groupedExecutionCapableScanNodes, "groupedExecutionScanNodes is null")));
}

@JsonProperty("groupedExecutionScanNodes")
Expand Down
Expand Up @@ -17,7 +17,7 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.prestosql.Session;
import io.prestosql.operator.StageExecutionStrategy;
import io.prestosql.operator.StageExecutionDescriptor;
import io.prestosql.split.SampledSplitSource;
import io.prestosql.split.SplitManager;
import io.prestosql.split.SplitSource;
Expand Down Expand Up @@ -106,7 +106,7 @@ private StageExecutionPlan doPlan(SubPlan root, Session session, ImmutableList.B
PlanFragment currentFragment = root.getFragment();

// get splits for this fragment, this is lazy so split assignments aren't actually calculated here
Map<PlanNodeId, SplitSource> splitSources = currentFragment.getRoot().accept(new Visitor(session, currentFragment.getStageExecutionStrategy(), allSplitSources), null);
Map<PlanNodeId, SplitSource> splitSources = currentFragment.getRoot().accept(new Visitor(session, currentFragment.getStageExecutionDescriptor(), allSplitSources), null);

// create child stages
ImmutableList.Builder<StageExecutionPlan> dependencies = ImmutableList.builder();
Expand All @@ -124,13 +124,13 @@ private final class Visitor
extends PlanVisitor<Map<PlanNodeId, SplitSource>, Void>
{
private final Session session;
private final StageExecutionStrategy stageExecutionStrategy;
private final StageExecutionDescriptor stageExecutionDescriptor;
private final ImmutableList.Builder<SplitSource> splitSources;

private Visitor(Session session, StageExecutionStrategy stageExecutionStrategy, ImmutableList.Builder<SplitSource> allSplitSources)
private Visitor(Session session, StageExecutionDescriptor stageExecutionDescriptor, ImmutableList.Builder<SplitSource> allSplitSources)
{
this.session = session;
this.stageExecutionStrategy = stageExecutionStrategy;
this.stageExecutionDescriptor = stageExecutionDescriptor;
this.splitSources = allSplitSources;
}

Expand All @@ -147,7 +147,7 @@ public Map<PlanNodeId, SplitSource> visitTableScan(TableScanNode node, Void cont
SplitSource splitSource = splitManager.getSplits(
session,
node.getLayout().get(),
stageExecutionStrategy.isGroupedExecution(node.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING);
stageExecutionDescriptor.isGroupedExecution(node.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING);

splitSources.add(splitSource);

Expand Down

0 comments on commit c2f23eb

Please sign in to comment.