From ba4b80fbdae50c145df3efe1fbdc1a403ea9136e Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 3 Apr 2024 14:33:06 +0800 Subject: [PATCH] [#1608][part-4] feat(server)(spark3): best effort to resend when server is inactive --- .../spark/shuffle/RssShuffleManager.java | 20 +++++++-------- .../client/api/ShuffleWriteClient.java | 22 +++++++++++++++- .../client/impl/ShuffleWriteClientImpl.java | 6 +++-- .../apache/uniffle/common/rpc/StatusCode.java | 1 + .../impl/grpc/ShuffleServerGrpcClient.java | 8 ++++-- .../request/RssRegisterShuffleRequest.java | 25 +++++++++++++++++++ proto/src/main/proto/Rss.proto | 2 ++ .../server/ShuffleServerGrpcService.java | 19 ++++++++++++-- .../uniffle/server/ShuffleSpecification.java | 11 ++++++++ .../uniffle/server/ShuffleTaskInfo.java | 4 +++ .../uniffle/server/ShuffleTaskManager.java | 7 ++++-- .../netty/ShuffleServerNettyHandler.java | 19 ++++++++++++++ 12 files changed, 125 insertions(+), 19 deletions(-) diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index e629b23657..08be90987f 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -939,16 +939,16 @@ protected void registerShuffleServers( serverToPartitionRanges.entrySet(); entries.stream() .forEach( - entry -> { - shuffleWriteClient.registerShuffle( - entry.getKey(), - appId, - shuffleId, - entry.getValue(), - remoteStorage, - dataDistributionType, - maxConcurrencyPerPartitionToWrite); - }); + entry -> + shuffleWriteClient.registerShuffle( + entry.getKey(), + appId, + shuffleId, + entry.getValue(), + remoteStorage, + dataDistributionType, + maxConcurrencyPerPartitionToWrite, + taskBlockSendFailureRetryEnabled)); LOG.info( "Finish register shuffleId[" + shuffleId diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java index 7d8f533924..b119ff1e61 100644 --- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java +++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java @@ -44,6 +44,25 @@ SendShuffleDataResult sendShuffleData( void registerApplicationInfo(String appId, long timeoutMs, String user); + default void registerShuffle( + ShuffleServerInfo shuffleServerInfo, + String appId, + int shuffleId, + List partitionRanges, + RemoteStorageInfo remoteStorage, + ShuffleDataDistributionType dataDistributionType, + int maxConcurrencyPerPartitionToWrite) { + this.registerShuffle( + shuffleServerInfo, + appId, + shuffleId, + partitionRanges, + remoteStorage, + dataDistributionType, + maxConcurrencyPerPartitionToWrite, + false); + } + void registerShuffle( ShuffleServerInfo shuffleServerInfo, String appId, @@ -51,7 +70,8 @@ void registerShuffle( List partitionRanges, RemoteStorageInfo remoteStorage, ShuffleDataDistributionType dataDistributionType, - int maxConcurrencyPerPartitionToWrite); + int maxConcurrencyPerPartitionToWrite, + boolean blockFailureReassignEnabled); boolean sendCommit( Set shuffleServerInfoSet, String appId, int shuffleId, int numMaps); diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 42e60f3a83..16fc47dc85 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -531,7 +531,8 @@ public void registerShuffle( List partitionRanges, RemoteStorageInfo remoteStorage, ShuffleDataDistributionType dataDistributionType, - int maxConcurrencyPerPartitionToWrite) { + int maxConcurrencyPerPartitionToWrite, + boolean blockFailureReassignEnabled) { String user = null; try { user = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -547,7 +548,8 @@ public void registerShuffle( remoteStorage, user, dataDistributionType, - maxConcurrencyPerPartitionToWrite); + maxConcurrencyPerPartitionToWrite, + blockFailureReassignEnabled); RssRegisterShuffleResponse response = getShuffleServerClient(shuffleServerInfo).registerShuffle(request); diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java index 79e35ecabe..f5f4ef49cc 100644 --- a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java +++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java @@ -35,6 +35,7 @@ public enum StatusCode { ACCESS_DENIED(8), INVALID_REQUEST(9), NO_BUFFER_FOR_HUGE_PARTITION(10), + SERVER_INACTIVE(11), UNKNOWN(-1); static final Map VALUE_MAP = diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index f20cd85f51..bc9fbe1e33 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import io.netty.buffer.Unpooled; @@ -174,7 +175,8 @@ private ShuffleRegisterResponse doRegisterShuffle( RemoteStorageInfo remoteStorageInfo, String user, ShuffleDataDistributionType dataDistributionType, - int maxConcurrencyPerPartitionToWrite) { + int maxConcurrencyPerPartitionToWrite, + boolean blockFailureReassignEnabled) { ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder(); reqBuilder .setAppId(appId) @@ -182,6 +184,7 @@ private ShuffleRegisterResponse doRegisterShuffle( .setUser(user) .setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name())) .setMaxConcurrencyPerPartitionToWrite(maxConcurrencyPerPartitionToWrite) + .setBlockFailureReassignEnabled(BoolValue.of(blockFailureReassignEnabled)) .addAllPartitionRanges(toShufflePartitionRanges(partitionRanges)); RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder(); rsBuilder.setPath(remoteStorageInfo.getPath()); @@ -433,7 +436,8 @@ public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest requ request.getRemoteStorageInfo(), request.getUser(), request.getDataDistributionType(), - request.getMaxConcurrencyPerPartitionToWrite()); + request.getMaxConcurrencyPerPartitionToWrite(), + request.isBlockFailureReassignEnabled()); RssRegisterShuffleResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java index 2cd49bb6d3..ae28564685 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java @@ -35,6 +35,27 @@ public class RssRegisterShuffleRequest { private String user; private ShuffleDataDistributionType dataDistributionType; private int maxConcurrencyPerPartitionToWrite; + private boolean blockFailureReassignEnabled = false; + + public RssRegisterShuffleRequest( + String appId, + int shuffleId, + List partitionRanges, + RemoteStorageInfo remoteStorageInfo, + String user, + ShuffleDataDistributionType dataDistributionType, + int maxConcurrencyPerPartitionToWrite, + boolean blockFailureReassignEnabled) { + this( + appId, + shuffleId, + partitionRanges, + remoteStorageInfo, + user, + dataDistributionType, + maxConcurrencyPerPartitionToWrite); + this.blockFailureReassignEnabled = blockFailureReassignEnabled; + } public RssRegisterShuffleRequest( String appId, @@ -109,4 +130,8 @@ public ShuffleDataDistributionType getDataDistributionType() { public int getMaxConcurrencyPerPartitionToWrite() { return maxConcurrencyPerPartitionToWrite; } + + public boolean isBlockFailureReassignEnabled() { + return blockFailureReassignEnabled; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 97470f4ac1..f9a2ce88be 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -184,6 +184,7 @@ message ShuffleRegisterRequest { string user = 5; DataDistribution shuffleDataDistribution = 6; int32 maxConcurrencyPerPartitionToWrite = 7; + google.protobuf.BoolValue blockFailureReassignEnabled = 8; } enum DataDistribution { @@ -305,6 +306,7 @@ enum StatusCode { ACCESS_DENIED = 8; INVALID_REQUEST = 9; NO_BUFFER_FOR_HUGE_PARTITION = 10; + SERVER_INACTIVE = 11; // add more status } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index c088242c8c..041f2fd3fe 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -40,6 +40,7 @@ import org.apache.uniffle.common.BufferSegment; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.ShuffleDataDistributionType; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; @@ -194,7 +195,8 @@ public void registerShuffle( new RemoteStorageInfo(remoteStoragePath, remoteStorageConf), user, shuffleDataDistributionType, - maxConcurrencyPerPartitionToWrite); + maxConcurrencyPerPartitionToWrite, + req.hasBlockFailureReassignEnabled()); reply = ShuffleRegisterResponse.newBuilder().setStatus(result.toProto()).build(); responseObserver.onNext(reply); @@ -204,9 +206,22 @@ public void registerShuffle( @Override public void sendShuffleData( SendShuffleDataRequest req, StreamObserver responseObserver) { + String appId = req.getAppId(); + if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE + && shuffleServer + .getShuffleTaskManager() + .getShuffleTaskInfo(appId) + .isBlockFailureReassignEnabled()) { + responseObserver.onNext( + SendShuffleDataResponse.newBuilder() + .setStatus(StatusCode.SERVER_INACTIVE.toProto()) + .setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus()) + .build()); + responseObserver.onCompleted(); + return; + } SendShuffleDataResponse reply; - String appId = req.getAppId(); int shuffleId = req.getShuffleId(); long requireBufferId = req.getRequireBufferId(); long timestamp = req.getTimestamp(); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleSpecification.java b/server/src/main/java/org/apache/uniffle/server/ShuffleSpecification.java index 0fe8ac5b5f..3adf2210cd 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleSpecification.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleSpecification.java @@ -23,6 +23,12 @@ public class ShuffleSpecification { private int maxConcurrencyPerPartitionToWrite; private ShuffleDataDistributionType distributionType; + private boolean blockFailureReassignEnabled; + + public boolean isBlockFailureReassignEnabled() { + return blockFailureReassignEnabled; + } + public int getMaxConcurrencyPerPartitionToWrite() { return maxConcurrencyPerPartitionToWrite; } @@ -52,6 +58,11 @@ public Builder dataDistributionType(ShuffleDataDistributionType distributionType return this; } + public Builder blockFailureReassignEnabled(boolean blockFailureReassignEnabled) { + this.specification.blockFailureReassignEnabled = blockFailureReassignEnabled; + return this; + } + public ShuffleSpecification build() { return specification; } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index e657ec4db2..67adec9c4d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -113,6 +113,10 @@ public ShuffleDataDistributionType getDataDistType() { return specification.get().getDistributionType(); } + public boolean isBlockFailureReassignEnabled() { + return specification.get().isBlockFailureReassignEnabled(); + } + public void setSpecification(ShuffleSpecification specification) { this.specification.set(specification); } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 11d6545728..49ac3fc92d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -254,7 +254,8 @@ public StatusCode registerShuffle( remoteStorageInfo, user, ShuffleDataDistributionType.NORMAL, - -1); + -1, + false); } public StatusCode registerShuffle( @@ -264,7 +265,8 @@ public StatusCode registerShuffle( RemoteStorageInfo remoteStorageInfo, String user, ShuffleDataDistributionType dataDistType, - int maxConcurrencyPerPartitionToWrite) { + int maxConcurrencyPerPartitionToWrite, + boolean blockFailureReassignEnabled) { ReentrantReadWriteLock.WriteLock lock = getAppWriteLock(appId); lock.lock(); try { @@ -277,6 +279,7 @@ public StatusCode registerShuffle( .maxConcurrencyPerPartitionToWrite( getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, conf)) .dataDistributionType(dataDistType) + .blockFailureReassignEnabled(blockFailureReassignEnabled) .build()); partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index dbda25abc0..dbc01274eb 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.BufferSegment; +import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; @@ -99,6 +100,24 @@ public void exceptionCaught(Throwable cause, TransportClient client) { public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) { RpcResponse rpcResponse; String appId = req.getAppId(); + + if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE + && shuffleServer + .getShuffleTaskManager() + .getShuffleTaskInfo(appId) + .isBlockFailureReassignEnabled()) { + req.getPartitionToBlocks().values().stream() + .flatMap(Collection::stream) + .forEach(block -> block.getData().release()); + rpcResponse = + new RpcResponse( + req.getRequestId(), + StatusCode.SERVER_INACTIVE, + "Server is inactive, status: " + shuffleServer.getServerStatus()); + client.getChannel().writeAndFlush(rpcResponse); + return; + } + int shuffleId = req.getShuffleId(); long requireBufferId = req.getRequireId(); long timestamp = req.getTimestamp();