Skip to content

Commit

Permalink
[apache#1608][part-4] feat(server)(spark3): best effort to resend whe…
Browse files Browse the repository at this point in the history
…n server is inactive
  • Loading branch information
zuston committed Apr 23, 2024
1 parent 45ad0b8 commit 1958525
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -921,16 +921,16 @@ protected void registerShuffleServers(
serverToPartitionRanges.entrySet();
entries.stream()
.forEach(
entry -> {
shuffleWriteClient.registerShuffle(
entry.getKey(),
appId,
shuffleId,
entry.getValue(),
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite);
});
entry ->
shuffleWriteClient.registerShuffle(
entry.getKey(),
appId,
shuffleId,
entry.getValue(),
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
taskBlockSendFailureRetryEnabled));
LOG.info(
"Finish register shuffleId["
+ shuffleId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,34 @@ SendShuffleDataResult sendShuffleData(

void registerApplicationInfo(String appId, long timeoutMs, String user);

default void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite) {
this.registerShuffle(
shuffleServerInfo,
appId,
shuffleId,
partitionRanges,
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
false);
}

void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite);
int maxConcurrencyPerPartitionToWrite,
boolean blockFailureReassignEnabled);

boolean sendCommit(
Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,8 @@ public void registerShuffle(
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite) {
int maxConcurrencyPerPartitionToWrite,
boolean blockFailureReassignEnabled) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -547,7 +548,8 @@ public void registerShuffle(
remoteStorage,
user,
dataDistributionType,
maxConcurrencyPerPartitionToWrite);
maxConcurrencyPerPartitionToWrite,
blockFailureReassignEnabled);
RssRegisterShuffleResponse response =
getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum StatusCode {
ACCESS_DENIED(8),
INVALID_REQUEST(9),
NO_BUFFER_FOR_HUGE_PARTITION(10),
SERVER_INACTIVE(11),
UNKNOWN(-1);

static final Map<Integer, StatusCode> VALUE_MAP =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -174,14 +175,16 @@ private ShuffleRegisterResponse doRegisterShuffle(
RemoteStorageInfo remoteStorageInfo,
String user,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite) {
int maxConcurrencyPerPartitionToWrite,
boolean blockFailureReassignEnabled) {
ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder();
reqBuilder
.setAppId(appId)
.setShuffleId(shuffleId)
.setUser(user)
.setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name()))
.setMaxConcurrencyPerPartitionToWrite(maxConcurrencyPerPartitionToWrite)
.setBlockFailureReassignEnabled(BoolValue.of(blockFailureReassignEnabled))
.addAllPartitionRanges(toShufflePartitionRanges(partitionRanges));
RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
rsBuilder.setPath(remoteStorageInfo.getPath());
Expand Down Expand Up @@ -413,7 +416,8 @@ public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest requ
request.getRemoteStorageInfo(),
request.getUser(),
request.getDataDistributionType(),
request.getMaxConcurrencyPerPartitionToWrite());
request.getMaxConcurrencyPerPartitionToWrite(),
request.isBlockFailureReassignEnabled());

RssRegisterShuffleResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ public class RssRegisterShuffleRequest {
private String user;
private ShuffleDataDistributionType dataDistributionType;
private int maxConcurrencyPerPartitionToWrite;
private boolean blockFailureReassignEnabled = false;

public RssRegisterShuffleRequest(
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorageInfo,
String user,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
boolean blockFailureReassignEnabled) {
this(
appId,
shuffleId,
partitionRanges,
remoteStorageInfo,
user,
dataDistributionType,
maxConcurrencyPerPartitionToWrite);
this.blockFailureReassignEnabled = blockFailureReassignEnabled;
}

public RssRegisterShuffleRequest(
String appId,
Expand Down Expand Up @@ -109,4 +130,8 @@ public ShuffleDataDistributionType getDataDistributionType() {
public int getMaxConcurrencyPerPartitionToWrite() {
return maxConcurrencyPerPartitionToWrite;
}

public boolean isBlockFailureReassignEnabled() {
return blockFailureReassignEnabled;
}
}
2 changes: 2 additions & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ message ShuffleRegisterRequest {
string user = 5;
DataDistribution shuffleDataDistribution = 6;
int32 maxConcurrencyPerPartitionToWrite = 7;
google.protobuf.BoolValue blockFailureReassignEnabled = 8;
}

enum DataDistribution {
Expand Down Expand Up @@ -305,6 +306,7 @@ enum StatusCode {
ACCESS_DENIED = 8;
INVALID_REQUEST = 9;
NO_BUFFER_FOR_HUGE_PARTITION = 10;
SERVER_INACTIVE = 11;
// add more status
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
Expand Down Expand Up @@ -194,7 +195,8 @@ public void registerShuffle(
new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
user,
shuffleDataDistributionType,
maxConcurrencyPerPartitionToWrite);
maxConcurrencyPerPartitionToWrite,
req.hasBlockFailureReassignEnabled());

reply = ShuffleRegisterResponse.newBuilder().setStatus(result.toProto()).build();
responseObserver.onNext(reply);
Expand All @@ -204,9 +206,22 @@ public void registerShuffle(
@Override
public void sendShuffleData(
SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> responseObserver) {
String appId = req.getAppId();
if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
responseObserver.onNext(
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.SERVER_INACTIVE.toProto())
.setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus())
.build());
responseObserver.onCompleted();
return;
}

SendShuffleDataResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireBufferId();
long timestamp = req.getTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public class ShuffleSpecification {
private int maxConcurrencyPerPartitionToWrite;
private ShuffleDataDistributionType distributionType;

private boolean blockFailureReassignEnabled;

public boolean isBlockFailureReassignEnabled() {
return blockFailureReassignEnabled;
}

public int getMaxConcurrencyPerPartitionToWrite() {
return maxConcurrencyPerPartitionToWrite;
}
Expand Down Expand Up @@ -52,6 +58,11 @@ public Builder dataDistributionType(ShuffleDataDistributionType distributionType
return this;
}

public Builder blockFailureReassignEnabled(boolean blockFailureReassignEnabled) {
this.specification.blockFailureReassignEnabled = blockFailureReassignEnabled;
return this;
}

public ShuffleSpecification build() {
return specification;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public ShuffleDataDistributionType getDataDistType() {
return specification.get().getDistributionType();
}

public boolean isBlockFailureReassignEnabled() {
return specification.get().isBlockFailureReassignEnabled();
}

public void setSpecification(ShuffleSpecification specification) {
this.specification.set(specification);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ public StatusCode registerShuffle(
remoteStorageInfo,
user,
ShuffleDataDistributionType.NORMAL,
-1);
-1,
false);
}

public StatusCode registerShuffle(
Expand All @@ -264,7 +265,8 @@ public StatusCode registerShuffle(
RemoteStorageInfo remoteStorageInfo,
String user,
ShuffleDataDistributionType dataDistType,
int maxConcurrencyPerPartitionToWrite) {
int maxConcurrencyPerPartitionToWrite,
boolean blockFailureReassignEnabled) {
ReentrantReadWriteLock.WriteLock lock = getAppWriteLock(appId);
lock.lock();
try {
Expand All @@ -277,6 +279,7 @@ public StatusCode registerShuffle(
.maxConcurrencyPerPartitionToWrite(
getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, conf))
.dataDistributionType(dataDistType)
.blockFailureReassignEnabled(blockFailureReassignEnabled)
.build());

partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
Expand Down Expand Up @@ -99,6 +100,24 @@ public void exceptionCaught(Throwable cause, TransportClient client) {
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();

if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
req.getPartitionToBlocks().values().stream()
.flatMap(Collection::stream)
.forEach(block -> block.getData().release());
rpcResponse =
new RpcResponse(
req.getRequestId(),
StatusCode.SERVER_INACTIVE,
"Server is inactive, status: " + shuffleServer.getServerStatus());
client.getChannel().writeAndFlush(rpcResponse);
return;
}

int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireId();
long timestamp = req.getTimestamp();
Expand Down

0 comments on commit 1958525

Please sign in to comment.