Skip to content

Commit

Permalink
[apache#1608][part-7] improvement(doc): add doc and optimize reassign…
Browse files Browse the repository at this point in the history
… config options
  • Loading branch information
zuston committed May 11, 2024
1 parent 313d4e0 commit fbe96f1
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetry =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
this.taskBlockSendFailureRetry = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
taskBlockSendFailureRetry || rssResubmitStage || blockIdSelfManagedEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetryEnabled =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);

// The feature of partition reassign is exclusive with multiple replicas and stage retry.
if (taskBlockSendFailureRetryEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ private RssShuffleWriter(
this.blockFailSentRetryEnabled =
sparkConf.getBoolean(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
+ RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.key(),
RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue());
this.blockFailSentRetryMaxTimes =
RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ public class RssClientConf {
"This option is only valid when the remote storage path is specified. If ture, "
+ "the remote storage conf will use the client side hadoop configuration loaded from the classpath.");

public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
ConfigOptions.key("rss.client.blockSendFailureRetry.enabled")
public static final ConfigOption<Boolean> RSS_CLIENT_REASSIGN_ENABLED =
ConfigOptions.key("rss.client.reassign.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
Expand Down
18 changes: 17 additions & 1 deletion docs/client_guide/spark_client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,20 @@ Other configuration:
|---|---|---|
|spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator|
|spark.rss.client.access.retry.interval.ms|20000|The interval between retries fallback to SortShuffleManager|
|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|
|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|

### Partition reassign in one shuffle attempt

To achieve better task stability, partition's reassign mechanism that requests the new replacement shuffleServers is introduced to overcome server instability
that is in unhealthy or high memory pressure in one shuffle attempt. On current stage, this feature is not compatible with stage retry and multiple replicas mechanism (More tests should be added).

Using the following configs to enable this feature

```bash
# whether to enable reassign mechanism
spark.rss.client.reassign.enabled true
# The max reassign server num for one partition when using partition reassign mechanism.
spark.rss.client.reassign.maxReassignServerNum 10
# The block retry max times when partition reassign is enabled.
spark.rss.client.reassign.blockRetryMaxTimes 1
```
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;

/** This class is to basic test the mechanism of partition block data reassignment */
public class PartitionBlockDataReassignBasicTest extends SparkSQLTest {
Expand Down Expand Up @@ -105,7 +105,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set(
"spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
String.valueOf(grpcShuffleServers.size()));
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY;

/** This class is to test the partition reassign mechanism of multiple retries. */
Expand Down Expand Up @@ -86,7 +86,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "10");

// simulate the grpc servers has different free memory
Expand Down

0 comments on commit fbe96f1

Please sign in to comment.