diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index ba2d275a1e..6f5b255be9 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -211,8 +211,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { this.rssResubmitStage = rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) && RssSparkShuffleUtils.isStageResubmitSupported(); - this.taskBlockSendFailureRetry = - rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED); + this.taskBlockSendFailureRetry = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED); this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED); this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetry || rssResubmitStage || blockIdSelfManagedEnabled; diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 983a2a0692..aeb29d16d3 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -229,7 +229,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) && RssSparkShuffleUtils.isStageResubmitSupported(); this.taskBlockSendFailureRetryEnabled = - rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED); + rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED); // The feature of partition reassign is exclusive with multiple replicas and stage retry. if (taskBlockSendFailureRetryEnabled) { diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index 06c8772750..85c700929e 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -208,8 +208,8 @@ private RssShuffleWriter( this.blockFailSentRetryEnabled = sparkConf.getBoolean( RssSparkConfig.SPARK_RSS_CONFIG_PREFIX - + RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), - RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue()); + + RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.key(), + RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue()); this.blockFailSentRetryMaxTimes = RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES); } diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index 4ecd103c9c..b45abc8719 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -204,8 +204,8 @@ public class RssClientConf { "This option is only valid when the remote storage path is specified. If ture, " + "the remote storage conf will use the client side hadoop configuration loaded from the classpath."); - public static final ConfigOption RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED = - ConfigOptions.key("rss.client.blockSendFailureRetry.enabled") + public static final ConfigOption RSS_CLIENT_REASSIGN_ENABLED = + ConfigOptions.key("rss.client.reassign.enabled") .booleanType() .defaultValue(false) .withDescription( diff --git a/docs/client_guide/spark_client_guide.md b/docs/client_guide/spark_client_guide.md index 2f2dcbbc34..65bd6a5ccd 100644 --- a/docs/client_guide/spark_client_guide.md +++ b/docs/client_guide/spark_client_guide.md @@ -149,4 +149,20 @@ Other configuration: |---|---|---| |spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator| |spark.rss.client.access.retry.interval.ms|20000|The interval between retries fallback to SortShuffleManager| -|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager| \ No newline at end of file +|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager| + +### Partition reassign in one shuffle attempt + +To achieve better task stability, partition's reassign mechanism that requests the new replacement shuffleServers is introduced to overcome server instability +that is in unhealthy or high memory pressure in one shuffle attempt. On current stage, this feature is not compatible with stage retry and multiple replicas mechanism (More tests should be added). + +Using the following configs to enable this feature + +```bash +# whether to enable reassign mechanism +spark.rss.client.reassign.enabled true +# The max reassign server num for one partition when using partition reassign mechanism. +spark.rss.client.reassign.maxReassignServerNum 10 +# The block retry max times when partition reassign is enabled. +spark.rss.client.reassign.blockRetryMaxTimes 1 +``` \ No newline at end of file diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java index 0a1bd15791..4dd2bab8ea 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java @@ -37,7 +37,7 @@ import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX; -import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED; +import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED; /** This class is to basic test the mechanism of partition block data reassignment */ public class PartitionBlockDataReassignBasicTest extends SparkSQLTest { @@ -105,7 +105,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) { sparkConf.set( "spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, String.valueOf(grpcShuffleServers.size())); - sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true"); + sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true"); } @Override diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java index 5ee3d7b62e..a01b695e33 100644 --- a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java @@ -38,7 +38,7 @@ import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX; -import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED; +import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED; import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY; /** This class is to test the partition reassign mechanism of multiple retries. */ @@ -86,7 +86,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) { sparkConf.set("spark.sql.shuffle.partitions", "4"); sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2"); sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1"); - sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true"); + sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true"); sparkConf.set("spark." + RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "10"); // simulate the grpc servers has different free memory