Skip to content

Commit

Permalink
Rename PrioritizeUtilizationExecutionPolicy as PhasedExecutionPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Jan 11, 2022
1 parent 1dd01de commit cb40d37
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import java.util.Collection;

public class PrioritizeUtilizationExecutionPolicy
public class PhasedExecutionPolicy
implements ExecutionPolicy
{
@Override
public ExecutionSchedule createExecutionSchedule(Collection<StageExecution> stages)
{
return PrioritizeUtilizationExecutionSchedule.forStages(stages);
return PhasedExecutionSchedule.forStages(stages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@
* Schedules stages choosing to order to provide the best resource utilization.
* This means that stages which output won't be consumed (e.g. join probe side) will
* not be scheduled until dependent stages finish (e.g. join build source stages).
* Contrary to {@link LegacyPhasedExecutionPolicy}, {@link PrioritizeUtilizationExecutionSchedule} will
* Contrary to {@link LegacyPhasedExecutionPolicy}, {@link PhasedExecutionSchedule} will
* schedule multiple source stages in order to fully utilize IO.
*/
public class PrioritizeUtilizationExecutionSchedule
public class PhasedExecutionSchedule
implements ExecutionSchedule
{
/**
Expand All @@ -85,14 +85,14 @@ public class PrioritizeUtilizationExecutionSchedule
@GuardedBy("this")
private SettableFuture<Void> rescheduleFuture = SettableFuture.create();

public static PrioritizeUtilizationExecutionSchedule forStages(Collection<StageExecution> stages)
public static PhasedExecutionSchedule forStages(Collection<StageExecution> stages)
{
PrioritizeUtilizationExecutionSchedule schedule = new PrioritizeUtilizationExecutionSchedule(stages);
PhasedExecutionSchedule schedule = new PhasedExecutionSchedule(stages);
schedule.init(stages);
return schedule;
}

private PrioritizeUtilizationExecutionSchedule(Collection<StageExecution> stages)
private PhasedExecutionSchedule(Collection<StageExecution> stages)
{
fragmentDependency = new DefaultDirectedGraph<>(new FragmentsEdgeFactory());
fragmentTopology = new DefaultDirectedGraph<>(new FragmentsEdgeFactory());
Expand Down Expand Up @@ -424,7 +424,7 @@ public FragmentSubGraph visitRemoteSource(RemoteSourceNode node, PlanFragmentId
@Override
public FragmentSubGraph visitExchange(ExchangeNode node, PlanFragmentId currentFragmentId)
{
checkArgument(node.getScope() == LOCAL, "Only local exchanges are supported in the prioritize utilization scheduler");
checkArgument(node.getScope() == LOCAL, "Only local exchanges are supported in the phased execution scheduler");
return visitPlan(node, currentFragmentId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import io.trino.execution.scheduler.policy.AllAtOnceExecutionPolicy;
import io.trino.execution.scheduler.policy.ExecutionPolicy;
import io.trino.execution.scheduler.policy.LegacyPhasedExecutionPolicy;
import io.trino.execution.scheduler.policy.PrioritizeUtilizationExecutionPolicy;
import io.trino.execution.scheduler.policy.PhasedExecutionPolicy;
import io.trino.failuredetector.FailureDetectorModule;
import io.trino.memory.ClusterMemoryManager;
import io.trino.memory.ForMemoryManager;
Expand Down Expand Up @@ -286,7 +286,7 @@ protected void setup(Binder binder)
MapBinder<String, ExecutionPolicy> executionPolicyBinder = newMapBinder(binder, String.class, ExecutionPolicy.class);
executionPolicyBinder.addBinding("all-at-once").to(AllAtOnceExecutionPolicy.class);
executionPolicyBinder.addBinding("legacy-phased").to(LegacyPhasedExecutionPolicy.class);
executionPolicyBinder.addBinding("prioritize-utilization").to(PrioritizeUtilizationExecutionPolicy.class);
executionPolicyBinder.addBinding("phased").to(PhasedExecutionPolicy.class);

install(new QueryExecutionFactoryModule());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.trino.execution.TaskStatus;
import io.trino.execution.scheduler.StageExecution;
import io.trino.execution.scheduler.TaskLifecycleListener;
import io.trino.execution.scheduler.policy.PrioritizeUtilizationExecutionSchedule.FragmentsEdge;
import io.trino.execution.scheduler.policy.PhasedExecutionSchedule.FragmentsEdge;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.sql.planner.PlanFragment;
Expand Down Expand Up @@ -53,7 +53,7 @@
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;

public class TestPrioritizeUtilizationExecutionSchedule
public class TestPhasedExecutionSchedule
{
@Test
public void testPartitionedJoin()
Expand All @@ -66,7 +66,7 @@ public void testPartitionedJoin()
TestingStageExecution probeStage = new TestingStageExecution(probeFragment);
TestingStageExecution joinStage = new TestingStageExecution(joinFragment);

PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of(buildStage, probeStage, joinStage));
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(buildStage, probeStage, joinStage));
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();

// single dependency between build and probe stages
Expand Down Expand Up @@ -103,7 +103,7 @@ public void testBroadcastSourceJoin()
TestingStageExecution buildStage = new TestingStageExecution(buildFragment);
TestingStageExecution joinSourceStage = new TestingStageExecution(joinSourceFragment);

PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of(buildStage, joinSourceStage));
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(buildStage, joinSourceStage));
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();

// single dependency between build and join stages
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testAggregation()
TestingStageExecution buildStage = new TestingStageExecution(buildFragment);
TestingStageExecution joinStage = new TestingStageExecution(joinFragment);

PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage));
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(sourceStage, aggregationStage, buildStage, joinStage));
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();

// aggregation and source stage should start immediately, join stage should wait for build stage to complete
Expand All @@ -152,7 +152,7 @@ public void testStageWithBroadcastAndPartitionedJoin()
TestingStageExecution probeStage = new TestingStageExecution(probeFragment);
TestingStageExecution joinStage = new TestingStageExecution(joinFragment);

PrioritizeUtilizationExecutionSchedule schedule = PrioritizeUtilizationExecutionSchedule.forStages(ImmutableSet.of(
PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(
broadcastBuildStage, partitionedBuildStage, probeStage, joinStage));
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();

Expand Down Expand Up @@ -181,7 +181,7 @@ public void testStageWithBroadcastAndPartitionedJoin()
joinFragment.getId());
}

private Set<PlanFragmentId> getActiveFragments(PrioritizeUtilizationExecutionSchedule schedule)
private Set<PlanFragmentId> getActiveFragments(PhasedExecutionSchedule schedule)
{
return schedule.getActiveStages().stream()
.map(stage -> stage.getFragment().getId())
Expand Down

0 comments on commit cb40d37

Please sign in to comment.