Skip to content

Commit

Permalink
[apache#1608] improvement(spark3): Output more task level infos in dr…
Browse files Browse the repository at this point in the history
…iver side when reassigning on block sent failure
  • Loading branch information
zuston committed Jun 11, 2024
1 parent 648931c commit be0dda7
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void getPartitionToShufflerServer(
@Override
public void reassignShuffleServers(
RssProtos.ReassignServersRequest request,
StreamObserver<RssProtos.ReassignServersReponse> responseObserver) {
StreamObserver<RssProtos.ReassignServersResponse> responseObserver) {
int stageId = request.getStageId();
int stageAttemptNumber = request.getStageAttemptNumber();
int shuffleId = request.getShuffleId();
Expand All @@ -223,8 +223,8 @@ public void reassignShuffleServers(
shuffleManager.reassignAllShuffleServersForWholeStage(
stageId, stageAttemptNumber, shuffleId, numPartitions);
RssProtos.StatusCode code = RssProtos.StatusCode.SUCCESS;
RssProtos.ReassignServersReponse reply =
RssProtos.ReassignServersReponse.newBuilder()
RssProtos.ReassignServersResponse reply =
RssProtos.ReassignServersResponse.newBuilder()
.setStatus(code)
.setNeedReassign(needReassign)
.build();
Expand All @@ -241,6 +241,13 @@ public void reassignOnBlockSendFailure(
RssProtos.StatusCode code = RssProtos.StatusCode.INTERNAL_ERROR;
RssProtos.RssReassignOnBlockSendFailureResponse reply;
try {
LOG.info(
"Accepted reassign request on block sent failure for shuffleId: {}, stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {}",
request.getShuffleId(),
request.getStageId(),
request.getStageAttemptNumber(),
request.getTaskAttemptId(),
request.getExecutorId());
MutableShuffleHandleInfo handle =
shuffleManager.reassignOnBlockSendFailure(
request.getShuffleId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleBlockInfo;
Expand Down Expand Up @@ -548,11 +548,11 @@ private void throwFetchFailedIfNecessary(Exception e) {
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
RssReassignServersReponse rssReassignServersReponse =
RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersReponse.isNeedReassign());
rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,18 +1380,21 @@ private Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds);
LOG.info("Finished reassign");
LOG.info("Finished the shuffle assignment request to coordinator.");
if (reassignmentHandler != null) {
response = reassignmentHandler.apply(response);
}
LOG.info(
"Register the partition->servers assignment. {}",
response.getServerToPartitionRanges());
registerShuffleServers(
id.get(), shuffleId, response.getServerToPartitionRanges(), getRemoteStorageInfo());
return response.getPartitionToServers();
},
retryInterval,
retryTimes);
} catch (Throwable throwable) {
throw new RssException("registerShuffle failed!", throwable);
throw new RssException("Errors on requesting shuffle assignment!", throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
Expand All @@ -76,7 +77,7 @@
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ReceivingFailureServer;
Expand Down Expand Up @@ -582,8 +583,18 @@ private void doReassignOnBlockSendFailure(
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
try (ShuffleManagerClient shuffleManagerClient = createShuffleManagerClient(driver, port)) {
String executorId = SparkEnv.get().executorId();
long taskAttemptId = taskContext.taskAttemptId();
int stageId = taskContext.stageId();
int stageAttemptNum = taskContext.stageAttemptNumber();
RssReassignOnBlockSendFailureRequest request =
new RssReassignOnBlockSendFailureRequest(shuffleId, failurePartitionToServers);
new RssReassignOnBlockSendFailureRequest(
shuffleId,
failurePartitionToServers,
executorId,
taskAttemptId,
stageId,
stageAttemptNum);
RssReassignOnBlockSendFailureResponse response =
shuffleManagerClient.reassignOnBlockSendFailure(request);
if (response.getStatusCode() != StatusCode.SUCCESS) {
Expand Down Expand Up @@ -815,11 +826,11 @@ private void throwFetchFailedIfNecessary(Exception e) {
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
RssReassignServersReponse rssReassignServersReponse =
RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersReponse.isNeedReassign());
rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
Expand All @@ -51,7 +51,7 @@ RssPartitionToShuffleServerResponse getPartitionToShufflerServer(
RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
RssReportShuffleWriteFailureRequest req);

RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest req);
RssReassignServersResponse reassignShuffleServers(RssReassignServersRequest req);

RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
RssReassignOnBlockSendFailureRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
Expand Down Expand Up @@ -116,11 +116,11 @@ public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
}

@Override
public RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest req) {
public RssReassignServersResponse reassignShuffleServers(RssReassignServersRequest req) {
RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
RssProtos.ReassignServersReponse reassignServersReponse =
RssProtos.ReassignServersResponse reassignServersResponse =
getBlockingStub().reassignShuffleServers(reassignServersRequest);
return RssReassignServersReponse.fromProto(reassignServersReponse);
return RssReassignServersResponse.fromProto(reassignServersResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,24 @@
public class RssReassignOnBlockSendFailureRequest {
private int shuffleId;
private Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers;
private String executorId;
private long taskAttemptId;
private int stageId;
private int stageAttemptNumber;

public RssReassignOnBlockSendFailureRequest(
int shuffleId, Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers) {
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers,
String executorId,
long taskAttemptId,
int stageId,
int stageAttemptNum) {
this.shuffleId = shuffleId;
this.failurePartitionToServers = failurePartitionToServers;
this.executorId = executorId;
this.taskAttemptId = taskAttemptId;
this.stageId = stageId;
this.stageAttemptNumber = stageAttemptNum;
}

public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
Expand All @@ -43,6 +56,10 @@ public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
.collect(
Collectors.toMap(
Map.Entry::getKey, x -> ReceivingFailureServer.toProto(x.getValue()))))
.setExecutorId(request.executorId)
.setStageId(request.stageId)
.setStageAttemptNumber(request.stageAttemptNumber)
.setTaskAttemptId(request.taskAttemptId)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.proto.RssProtos;

public class RssReassignServersReponse extends ClientResponse {
public class RssReassignServersResponse extends ClientResponse {

private boolean needReassign;

public RssReassignServersReponse(StatusCode statusCode, String message, boolean needReassign) {
public RssReassignServersResponse(StatusCode statusCode, String message, boolean needReassign) {
super(statusCode, message);
this.needReassign = needReassign;
}
Expand All @@ -33,8 +33,8 @@ public boolean isNeedReassign() {
return needReassign;
}

public static RssReassignServersReponse fromProto(RssProtos.ReassignServersReponse response) {
return new RssReassignServersReponse(
public static RssReassignServersResponse fromProto(RssProtos.ReassignServersResponse response) {
return new RssReassignServersResponse(
// todo: [issue#780] add fromProto for StatusCode issue
StatusCode.valueOf(response.getStatus().name()),
response.getMsg(),
Expand Down
10 changes: 7 additions & 3 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ service ShuffleManager {
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns (ReportShuffleWriteFailureResponse);
// Reassign the RPC interface of the ShuffleServer list
rpc reassignShuffleServers(ReassignServersRequest) returns (ReassignServersReponse);
rpc reassignShuffleServers(ReassignServersRequest) returns (ReassignServersResponse);
// Reassign on block send failure that occurs in writer
rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns (RssReassignOnBlockSendFailureResponse);
rpc reportShuffleResult (ReportShuffleResultRequest) returns (ReportShuffleResultResponse);
Expand Down Expand Up @@ -609,15 +609,19 @@ message ReassignServersRequest{
int32 numPartitions = 4;
}

message ReassignServersReponse{
message ReassignServersResponse {
StatusCode status = 1;
bool needReassign = 2;
string msg = 3;
}

message RssReassignOnBlockSendFailureRequest{
message RssReassignOnBlockSendFailureRequest {
int32 shuffleId = 1;
map<int32, ReceivingFailureServers> failurePartitionToServerIds = 2;
int64 taskAttemptId = 3;
int32 stageId = 4;
int32 stageAttemptNumber = 5;
string executorId = 6;
}

message ReceivingFailureServers {
Expand Down

0 comments on commit be0dda7

Please sign in to comment.