Skip to content

Commit

Permalink
Rename PhasedExecutionPolicy to LegacyPhasedExecutionPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Jan 11, 2022
1 parent 543b3d6 commit 1dd01de
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,13 @@ private void testRole(String roleParameterValue, ClientSelectedRole clientSelect
public void testSessionProperties()
throws SQLException
{
try (Connection connection = createConnection("roles=hive:admin&sessionProperties=hive.temporary_staging_directory_path:/tmp;execution_policy:phased")) {
try (Connection connection = createConnection("roles=hive:admin&sessionProperties=hive.temporary_staging_directory_path:/tmp;execution_policy:legacy-phased")) {
TrinoConnection trinoConnection = connection.unwrap(TrinoConnection.class);
assertThat(trinoConnection.getSessionProperties())
.extractingByKeys("hive.temporary_staging_directory_path", "execution_policy")
.containsExactly("/tmp", "phased");
.containsExactly("/tmp", "legacy-phased");
assertThat(listSession(connection)).containsAll(ImmutableSet.of(
"execution_policy|phased|all-at-once",
"execution_policy|legacy-phased|all-at-once",
"hive.temporary_staging_directory_path|/tmp|/tmp/presto-${USER}"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import java.util.Collection;

public class PhasedExecutionPolicy
public class LegacyPhasedExecutionPolicy
implements ExecutionPolicy
{
@Override
public ExecutionSchedule createExecutionSchedule(Collection<StageExecution> stages)
{
return new PhasedExecutionSchedule(stages);
return new LegacyPhasedExecutionSchedule(stages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
import static java.util.function.Function.identity;

@NotThreadSafe
public class PhasedExecutionSchedule
public class LegacyPhasedExecutionSchedule
implements ExecutionSchedule
{
private final List<Set<StageExecution>> schedulePhases;
private final Set<StageExecution> activeSources = new HashSet<>();

public PhasedExecutionSchedule(Collection<StageExecution> stages)
public LegacyPhasedExecutionSchedule(Collection<StageExecution> stages)
{
List<Set<PlanFragmentId>> phases = extractPhases(stages.stream().map(StageExecution::getFragment).collect(toImmutableList()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
* Schedules stages choosing to order to provide the best resource utilization.
* This means that stages which output won't be consumed (e.g. join probe side) will
* not be scheduled until dependent stages finish (e.g. join build source stages).
* Contrary to {@link PhasedExecutionPolicy}, {@link PrioritizeUtilizationExecutionSchedule} will
* Contrary to {@link LegacyPhasedExecutionPolicy}, {@link PrioritizeUtilizationExecutionSchedule} will
* schedule multiple source stages in order to fully utilize IO.
*/
public class PrioritizeUtilizationExecutionSchedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.policy.AllAtOnceExecutionPolicy;
import io.trino.execution.scheduler.policy.ExecutionPolicy;
import io.trino.execution.scheduler.policy.PhasedExecutionPolicy;
import io.trino.execution.scheduler.policy.LegacyPhasedExecutionPolicy;
import io.trino.execution.scheduler.policy.PrioritizeUtilizationExecutionPolicy;
import io.trino.failuredetector.FailureDetectorModule;
import io.trino.memory.ClusterMemoryManager;
Expand Down Expand Up @@ -285,7 +285,7 @@ protected void setup(Binder binder)

MapBinder<String, ExecutionPolicy> executionPolicyBinder = newMapBinder(binder, String.class, ExecutionPolicy.class);
executionPolicyBinder.addBinding("all-at-once").to(AllAtOnceExecutionPolicy.class);
executionPolicyBinder.addBinding("phased").to(PhasedExecutionPolicy.class);
executionPolicyBinder.addBinding("legacy-phased").to(LegacyPhasedExecutionPolicy.class);
executionPolicyBinder.addBinding("prioritize-utilization").to(PrioritizeUtilizationExecutionPolicy.class);

install(new QueryExecutionFactoryModule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testExplicitPropertyMappings()
.put("query.remote-task.min-error-duration", "30s")
.put("query.remote-task.max-error-duration", "60s")
.put("query.remote-task.max-callback-threads", "10")
.put("query.execution-policy", "phased")
.put("query.execution-policy", "legacy-phased")
.put("query.max-run-time", "2h")
.put("query.max-execution-time", "3h")
.put("query.max-planning-time", "1h")
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testExplicitPropertyMappings()
.setRemoteTaskMinErrorDuration(new Duration(60, SECONDS))
.setRemoteTaskMaxErrorDuration(new Duration(60, SECONDS))
.setRemoteTaskMaxCallbackThreads(10)
.setQueryExecutionPolicy("phased")
.setQueryExecutionPolicy("legacy-phased")
.setQueryMaxRunTime(new Duration(2, HOURS))
.setQueryMaxExecutionTime(new Duration(3, HOURS))
.setQueryMaxPlanningTime(new Duration(1, HOURS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import static io.trino.sql.planner.plan.JoinNode.Type.RIGHT;
import static org.testng.Assert.assertEquals;

public class TestPhasedExecutionSchedule
public class TestLegacyPhasedExecutionSchedule
{
@Test
public void testExchange()
Expand All @@ -41,7 +41,7 @@ public void testExchange()
PlanFragment cFragment = createTableScanPlanFragment("c");
PlanFragment exchangeFragment = createExchangePlanFragment("exchange", aFragment, bFragment, cFragment);

List<Set<PlanFragmentId>> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, exchangeFragment));
List<Set<PlanFragmentId>> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, exchangeFragment));
assertEquals(phases, ImmutableList.of(
ImmutableSet.of(exchangeFragment.getId()),
ImmutableSet.of(aFragment.getId()),
Expand All @@ -57,7 +57,7 @@ public void testUnion()
PlanFragment cFragment = createTableScanPlanFragment("c");
PlanFragment unionFragment = createUnionPlanFragment("union", aFragment, bFragment, cFragment);

List<Set<PlanFragmentId>> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, unionFragment));
List<Set<PlanFragmentId>> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(aFragment, bFragment, cFragment, unionFragment));
assertEquals(phases, ImmutableList.of(
ImmutableSet.of(unionFragment.getId()),
ImmutableSet.of(aFragment.getId()),
Expand All @@ -72,7 +72,7 @@ public void testJoin()
PlanFragment probeFragment = createTableScanPlanFragment("probe");
PlanFragment joinFragment = createJoinPlanFragment(INNER, "join", buildFragment, probeFragment);

List<Set<PlanFragmentId>> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment));
List<Set<PlanFragmentId>> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment));
assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId()), ImmutableSet.of(buildFragment.getId()), ImmutableSet.of(probeFragment.getId())));
}

Expand All @@ -83,7 +83,7 @@ public void testRightJoin()
PlanFragment probeFragment = createTableScanPlanFragment("probe");
PlanFragment joinFragment = createJoinPlanFragment(RIGHT, "join", buildFragment, probeFragment);

List<Set<PlanFragmentId>> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment));
List<Set<PlanFragmentId>> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment, probeFragment));
assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId()), ImmutableSet.of(buildFragment.getId()), ImmutableSet.of(probeFragment.getId())));
}

Expand All @@ -93,7 +93,7 @@ public void testBroadcastJoin()
PlanFragment buildFragment = createTableScanPlanFragment("build");
PlanFragment joinFragment = createBroadcastJoinPlanFragment("join", buildFragment);

List<Set<PlanFragmentId>> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment));
List<Set<PlanFragmentId>> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(joinFragment, buildFragment));
assertEquals(phases, ImmutableList.of(ImmutableSet.of(joinFragment.getId(), buildFragment.getId())));
}

Expand All @@ -108,7 +108,7 @@ public void testJoinWithDeepSources()
PlanFragment probeTopFragment = createExchangePlanFragment("probeTop", probeMiddleFragment);
PlanFragment joinFragment = createJoinPlanFragment(INNER, "join", buildTopFragment, probeTopFragment);

List<Set<PlanFragmentId>> phases = PhasedExecutionSchedule.extractPhases(ImmutableList.of(
List<Set<PlanFragmentId>> phases = LegacyPhasedExecutionSchedule.extractPhases(ImmutableList.of(
joinFragment,
buildTopFragment,
buildMiddleFragment,
Expand Down

0 comments on commit 1dd01de

Please sign in to comment.