Skip to content

Commit

Permalink
[apache#1608][part-7] improvement(doc): add doc and optimize reassign…
Browse files Browse the repository at this point in the history
… config options (apache#1693)

### What changes were proposed in this pull request?

1. add docs about reassign mechanism
2. rename the config from "rss.client.blockSendFailureRetry.enabled" to "rss.client.reassign.enabled"

### Why are the changes needed?

Fix: apache#1608

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Needn't
  • Loading branch information
zuston committed May 15, 2024
1 parent 4f4f7e3 commit de4b261
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetry =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
this.taskBlockSendFailureRetry = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
taskBlockSendFailureRetry || rssResubmitStage || blockIdSelfManagedEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetryEnabled =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED);
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);

// The feature of partition reassign is exclusive with multiple replicas and stage retry.
if (taskBlockSendFailureRetryEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ private RssShuffleWriter(
this.blockFailSentRetryEnabled =
sparkConf.getBoolean(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
+ RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.key(),
RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue());
this.blockFailSentRetryMaxTimes =
RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ public class RssClientConf {
"This option is only valid when the remote storage path is specified. If ture, "
+ "the remote storage conf will use the client side hadoop configuration loaded from the classpath.");

public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
ConfigOptions.key("rss.client.blockSendFailureRetry.enabled")
public static final ConfigOption<Boolean> RSS_CLIENT_REASSIGN_ENABLED =
ConfigOptions.key("rss.client.reassign.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
Expand Down
18 changes: 17 additions & 1 deletion docs/client_guide/spark_client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,20 @@ Other configuration:
|---|---|---|
|spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator|
|spark.rss.client.access.retry.interval.ms|20000|The interval between retries fallback to SortShuffleManager|
|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|
|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|

### Partition reassign in one shuffle attempt

To achieve better task stability, the partition reassignment mechanism has been introduced, which requests new replacement shuffle servers to overcome server instability caused by unhealthy conditions or high memory pressure in a single shuffle attempt.
Currently, this feature is not compatible with stage retry and multiple replica mechanisms (additional testing is required).

Using the following configs to enable this feature

```bash
# whether to enable reassign mechanism
spark.rss.client.reassign.enabled true
# The max reassign server num for one partition when using partition reassign mechanism.
spark.rss.client.reassign.maxReassignServerNum 10
# The block retry max times when partition reassign is enabled.
spark.rss.client.reassign.blockRetryMaxTimes 1
```
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;

/** This class is to basic test the mechanism of partition block data reassignment */
public class PartitionBlockDataReassignBasicTest extends SparkSQLTest {
Expand Down Expand Up @@ -105,7 +105,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set(
"spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER,
String.valueOf(grpcShuffleServers.size()));
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;
import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY;

/** This class is to test the partition reassign mechanism of multiple retries. */
Expand Down Expand Up @@ -86,7 +86,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
sparkConf.set("spark." + RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "10");

// simulate the grpc servers has different free memory
Expand Down

0 comments on commit de4b261

Please sign in to comment.