Skip to content

Commit

Permalink
Remove deprecated grouped execution properties
Browse files Browse the repository at this point in the history
grouped_execution_for_aggregation and grouped_execution_for_join have
been replaced by a single property grouped_execution.

dynamic_schedule_for_grouped_execution is enabled by default and doesn't
need a replacement.
  • Loading branch information
rschlussel committed Oct 16, 2020
1 parent 029ae8d commit cf398d7
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static com.facebook.airlift.log.Level.WARN;
import static com.facebook.presto.SystemSessionProperties.COLOCATED_JOIN;
import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_AGGREGATION;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
import static com.facebook.presto.spi.security.SelectedRole.Type.ROLE;
Expand Down Expand Up @@ -214,7 +214,7 @@ public static DistributedQueryRunner createMaterializingQueryRunner(Iterable<Tpc
"query.exchange-materialization-strategy", "ALL",
"query.hash-partition-count", "11",
"colocated-joins-enabled", "true",
"grouped-execution-for-aggregation-enabled", "true"),
"grouped-execution-enabled", "true"),
Optional.empty());
}

Expand Down Expand Up @@ -285,7 +285,7 @@ public static Session createMaterializeExchangesSession(Optional<SelectedRole> r
.setSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ExchangeMaterializationStrategy.ALL.toString())
.setSystemProperty(HASH_PARTITION_COUNT, "13")
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(GROUPED_EXECUTION, "true")
.setCatalog(HIVE_CATALOG)
.setSchema(TPCH_SCHEMA)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private void setUp()
.put("optimizer.join-reordering-strategy", "ELIMINATE_CROSS_JOINS")
.put("query.hash-partition-count", "11")
.put("colocated-joins-enabled", "true")
.put("grouped-execution-for-aggregation-enabled", "true")
.put("grouped-execution-enabled", "true")
.build(),
Optional.empty());
queryRunner.installPlugin(new TestingEventListenerPlugin(generatedEvents));
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@

import static com.facebook.presto.SystemSessionProperties.COLOCATED_JOIN;
import static com.facebook.presto.SystemSessionProperties.CONCURRENT_LIFESPANS_PER_NODE;
import static com.facebook.presto.SystemSessionProperties.DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_AGGREGATION;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT;
import static com.facebook.presto.SystemSessionProperties.MAX_STAGE_RETRIES;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
Expand Down Expand Up @@ -438,8 +437,7 @@ private static Session createRecoverableSession(int writerConcurrency)
return testSessionBuilder()
.setIdentity(identity)
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "true")
.setSystemProperty(GROUPED_EXECUTION, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1")
.setSystemProperty(RECOVERABLE_GROUPED_EXECUTION, "true")
.setSystemProperty(SCALE_WRITERS, "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ public final class SystemSessionProperties
public static final String PARTITIONING_PROVIDER_CATALOG = "partitioning_provider_catalog";
public static final String EXCHANGE_MATERIALIZATION_STRATEGY = "exchange_materialization_strategy";
public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct";
public static final String GROUPED_EXECUTION_FOR_AGGREGATION = "grouped_execution_for_aggregation";
public static final String GROUPED_EXECUTION_FOR_JOIN = "grouped_execution_for_join";
public static final String GROUPED_EXECUTION = "grouped_execution";
public static final String DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION = "dynamic_schedule_for_grouped_execution";
public static final String RECOVERABLE_GROUPED_EXECUTION = "recoverable_grouped_execution";
public static final String MAX_FAILED_TASK_PERCENTAGE = "max_failed_task_percentage";
public static final String MAX_STAGE_RETRIES = "max_stage_retries";
Expand Down Expand Up @@ -257,26 +254,11 @@ public SystemSessionProperties(
"Use streaming instead of materialization for mark distinct with materialized exchange enabled",
queryManagerConfig.getUseStreamingExchangeForMarkDistinct(),
false),
booleanProperty(
GROUPED_EXECUTION_FOR_AGGREGATION,
"Use grouped execution for aggregation when possible",
featuresConfig.isGroupedExecutionForAggregationEnabled(),
false),
booleanProperty(
GROUPED_EXECUTION_FOR_JOIN,
"Use grouped execution for foin when possible",
featuresConfig.isGroupedExecutionForJoinEnabled(),
false),
booleanProperty(
GROUPED_EXECUTION,
"Use grouped execution when possible",
featuresConfig.isGroupedExecutionEnabled(),
false),
booleanProperty(
DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION,
"Experimental: Use dynamic schedule for grouped execution when possible",
featuresConfig.isDynamicScheduleForGroupedExecutionEnabled(),
false),
doubleProperty(
MAX_FAILED_TASK_PERCENTAGE,
"Max percentage of failed tasks that are retryable for recoverable dynamic scheduling",
Expand Down Expand Up @@ -967,26 +949,11 @@ public static boolean isUseStreamingExchangeForMarkDistinctEnabled(Session sessi
return session.getSystemProperty(USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, Boolean.class);
}

public static boolean isGroupedExecutionForAggregationEnabled(Session session)
{
return session.getSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, Boolean.class) && isGroupedExecutionEnabled(session);
}

public static boolean isGroupedExecutionForJoinEnabled(Session session)
{
return session.getSystemProperty(GROUPED_EXECUTION_FOR_JOIN, Boolean.class) && isGroupedExecutionEnabled(session);
}

public static boolean isGroupedExecutionEnabled(Session session)
{
return session.getSystemProperty(GROUPED_EXECUTION, Boolean.class);
}

public static boolean isDynamicScheduleForGroupedExecution(Session session)
{
return session.getSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, Boolean.class);
}

public static boolean isRecoverableGroupedExecutionEnabled(Session session)
{
return session.getSystemProperty(RECOVERABLE_GROUPED_EXECUTION, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ public class FeaturesConfig
private JoinDistributionType joinDistributionType = PARTITIONED;
private DataSize joinMaxBroadcastTableSize;
private boolean colocatedJoinsEnabled = true;
private boolean groupedExecutionForAggregationEnabled = true;
private boolean groupedExecutionForJoinEnabled = true;
private boolean groupedExecutionEnabled = true;
private boolean dynamicScheduleForGroupedExecution = true;
private boolean recoverableGroupedExecutionEnabled;
private double maxFailedTaskPercentage = 0.3;
private int maxStageRetries;
Expand Down Expand Up @@ -403,32 +400,6 @@ public FeaturesConfig setJoinMaxBroadcastTableSize(DataSize joinMaxBroadcastTabl
return this;
}

public boolean isGroupedExecutionForAggregationEnabled()
{
return groupedExecutionForAggregationEnabled;
}

@Config("grouped-execution-for-aggregation-enabled")
@ConfigDescription("Use grouped execution for aggregation when possible")
public FeaturesConfig setGroupedExecutionForAggregationEnabled(boolean groupedExecutionForAggregationEnabled)
{
this.groupedExecutionForAggregationEnabled = groupedExecutionForAggregationEnabled;
return this;
}

public boolean isGroupedExecutionForJoinEnabled()
{
return groupedExecutionForJoinEnabled;
}

@Config("grouped-execution-for-join-enabled")
@ConfigDescription("Use grouped execution for join when possible")
public FeaturesConfig setGroupedExecutionForJoinEnabled(boolean groupedExecutionForJoinEnabled)
{
this.groupedExecutionForJoinEnabled = groupedExecutionForJoinEnabled;
return this;
}

public boolean isGroupedExecutionEnabled()
{
return groupedExecutionEnabled;
Expand All @@ -442,19 +413,6 @@ public FeaturesConfig setGroupedExecutionEnabled(boolean groupedExecutionEnabled
return this;
}

public boolean isDynamicScheduleForGroupedExecutionEnabled()
{
return dynamicScheduleForGroupedExecution;
}

@Config("dynamic-schedule-for-grouped-execution")
@ConfigDescription("Experimental: Use dynamic schedule for grouped execution when possible")
public FeaturesConfig setDynamicScheduleForGroupedExecutionEnabled(boolean dynamicScheduleForGroupedExecution)
{
this.dynamicScheduleForGroupedExecution = dynamicScheduleForGroupedExecution;
return this;
}

public boolean isRecoverableGroupedExecutionEnabled()
{
return recoverableGroupedExecutionEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount;
import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.isDynamicScheduleForGroupedExecution;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForJoinEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand Down Expand Up @@ -252,8 +250,7 @@ private SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, boolea
PlanFragment fragment = subPlan.getFragment();
GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager), null);
if (properties.isSubTreeUseful()) {
boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)
&& isDynamicScheduleForGroupedExecution(session);
boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE);
BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, fragment.getPartitioning(), preferDynamic);
if (bucketNodeMap.isDynamic()) {
/*
Expand Down Expand Up @@ -981,16 +978,14 @@ private static class GroupedExecutionTagger
private final Session session;
private final Metadata metadata;
private final NodePartitioningManager nodePartitioningManager;
private final boolean groupedExecutionForAggregation;
private final boolean groupedExecutionForJoin;
private final boolean groupedExecutionEnabled;

public GroupedExecutionTagger(Session session, Metadata metadata, NodePartitioningManager nodePartitioningManager)
{
this.session = requireNonNull(session, "session is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.groupedExecutionForAggregation = isGroupedExecutionForAggregationEnabled(session);
this.groupedExecutionForJoin = isGroupedExecutionForJoinEnabled(session);
this.groupedExecutionEnabled = isGroupedExecutionEnabled(session);
}

@Override
Expand All @@ -1008,7 +1003,7 @@ public GroupedExecutionProperties visitJoin(JoinNode node, Void context)
GroupedExecutionProperties left = node.getLeft().accept(this, null);
GroupedExecutionProperties right = node.getRight().accept(this, null);

if (!node.getDistributionType().isPresent() || !groupedExecutionForJoin) {
if (!node.getDistributionType().isPresent() || !groupedExecutionEnabled) {
// This is possible when the optimizers is invoked with `forceSingleNode` set to true.
return GroupedExecutionProperties.notCapable();
}
Expand Down Expand Up @@ -1074,7 +1069,7 @@ public GroupedExecutionProperties visitJoin(JoinNode node, Void context)
public GroupedExecutionProperties visitAggregation(AggregationNode node, Void context)
{
GroupedExecutionProperties properties = node.getSource().accept(this, null);
if (groupedExecutionForAggregation && properties.isCurrentNodeCapable()) {
if (groupedExecutionEnabled && properties.isCurrentNodeCapable()) {
switch (node.getStep()) {
case SINGLE:
case FINAL:
Expand Down Expand Up @@ -1108,7 +1103,7 @@ public GroupedExecutionProperties visitTopNRowNumber(TopNRowNumberNode node, Voi
private GroupedExecutionProperties processWindowFunction(PlanNode node)
{
GroupedExecutionProperties properties = getOnlyElement(node.getSources()).accept(this, null);
if (groupedExecutionForAggregation && properties.isCurrentNodeCapable()) {
if (groupedExecutionEnabled && properties.isCurrentNodeCapable()) {
return new GroupedExecutionProperties(true, true, properties.capableTableScanNodes, properties.totalLifespans, properties.recoveryEligible);
}
return GroupedExecutionProperties.notCapable();
Expand All @@ -1118,7 +1113,7 @@ private GroupedExecutionProperties processWindowFunction(PlanNode node)
public GroupedExecutionProperties visitMarkDistinct(MarkDistinctNode node, Void context)
{
GroupedExecutionProperties properties = getOnlyElement(node.getSources()).accept(this, null);
if (groupedExecutionForAggregation && properties.isCurrentNodeCapable()) {
if (groupedExecutionEnabled && properties.isCurrentNodeCapable()) {
return new GroupedExecutionProperties(true, true, properties.capableTableScanNodes, properties.totalLifespans, properties.recoveryEligible);
}
return GroupedExecutionProperties.notCapable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ public void testDefaults()
.setDistributedIndexJoinsEnabled(false)
.setJoinDistributionType(PARTITIONED)
.setJoinMaxBroadcastTableSize(null)
.setGroupedExecutionForAggregationEnabled(true)
.setGroupedExecutionForJoinEnabled(true)
.setDynamicScheduleForGroupedExecutionEnabled(true)
.setGroupedExecutionEnabled(true)
.setRecoverableGroupedExecutionEnabled(false)
.setMaxFailedTaskPercentage(0.3)
Expand Down Expand Up @@ -181,10 +178,7 @@ public void testExplicitPropertyMappings()
.put("distributed-index-joins-enabled", "true")
.put("join-distribution-type", "BROADCAST")
.put("join-max-broadcast-table-size", "42GB")
.put("grouped-execution-for-aggregation-enabled", "false")
.put("grouped-execution-for-join-enabled", "false")
.put("grouped-execution-enabled", "false")
.put("dynamic-schedule-for-grouped-execution", "false")
.put("recoverable-grouped-execution-enabled", "true")
.put("max-failed-task-percentage", "0.8")
.put("max-stage-retries", "10")
Expand Down Expand Up @@ -273,10 +267,7 @@ public void testExplicitPropertyMappings()
.setDistributedIndexJoinsEnabled(true)
.setJoinDistributionType(BROADCAST)
.setJoinMaxBroadcastTableSize(new DataSize(42, GIGABYTE))
.setGroupedExecutionForAggregationEnabled(false)
.setGroupedExecutionForJoinEnabled(false)
.setGroupedExecutionEnabled(false)
.setDynamicScheduleForGroupedExecutionEnabled(false)
.setRecoverableGroupedExecutionEnabled(true)
.setMaxFailedTaskPercentage(0.8)
.setMaxStageRetries(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getPartitioningProviderCatalog;
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isDynamicScheduleForGroupedExecution;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForJoinEnabled;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites;
import static com.facebook.presto.SystemSessionProperties.isScaleWriters;
Expand All @@ -48,12 +45,7 @@ public void verify(SparkContext sparkContext, Session session)
verify(!isDistributedSortEnabled(session), "distributed sort is not supported");
verify(getExchangeMaterializationStrategy(session) == NONE, "exchange materialization is not supported");
verify(getPartitioningProviderCatalog(session).equals(GlobalSystemConnector.NAME), "partitioning provider other that system is not supported");
verify(!isGroupedExecutionForAggregationEnabled(session) &&
!isRecoverableGroupedExecutionEnabled(session) &&
!isDynamicScheduleForGroupedExecution(session) &&
!isGroupedExecutionForJoinEnabled(session) &&
!isGroupedExecutionEnabled(session),
"grouped execution is not supported");
verify(!isRecoverableGroupedExecutionEnabled(session) && !isGroupedExecutionEnabled(session), "grouped execution is not supported");
verify(!isRedistributeWrites(session), "redistribute writes is not supported");
verify(!isScaleWriters(session), "scale writes is not supported");
verify(!isForceSingleNodeOutput(session), "force single node output is expected to be disabled");
Expand Down Expand Up @@ -91,7 +83,6 @@ public static void setDefaults(FeaturesConfig config)
config.setDistributedSortEnabled(false);
config.setGroupedExecutionEnabled(false);
config.setRecoverableGroupedExecutionEnabled(false);
config.setDynamicScheduleForGroupedExecutionEnabled(false);
config.setColocatedJoinsEnabled(true);
config.setRedistributeWrites(false);
config.setScaleWriters(false);
Expand Down

0 comments on commit cf398d7

Please sign in to comment.