Skip to content

Commit

Permalink
[apache#1596][FOLLOWUP] fix(netty): Send failed responses only when t…
Browse files Browse the repository at this point in the history
…he channel is writable
  • Loading branch information
rickyma committed Apr 11, 2024
1 parent 1431c8a commit 88b2564
Showing 1 changed file with 31 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -564,43 +564,45 @@ public void operationComplete(ChannelFuture future) {
+ ", "
+ cause.getMessage();
LOG.error(errorMsg, future.cause());
RpcResponse errorResponse;
if (request instanceof GetLocalShuffleDataRequest) {
errorResponse =
new GetLocalShuffleDataResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
} else if (request instanceof GetLocalShuffleIndexRequest) {
errorResponse =
new GetLocalShuffleIndexResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
Unpooled.EMPTY_BUFFER,
0L);
} else if (request instanceof GetMemoryShuffleDataRequest) {
errorResponse =
new GetMemoryShuffleDataResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
} else {
LOG.error("Cannot handle request {}", request.type());
return;
if (future.channel().isWritable()) {
RpcResponse errorResponse;
if (request instanceof GetLocalShuffleDataRequest) {
errorResponse =
new GetLocalShuffleDataResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
} else if (request instanceof GetLocalShuffleIndexRequest) {
errorResponse =
new GetLocalShuffleIndexResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
Unpooled.EMPTY_BUFFER,
0L);
} else if (request instanceof GetMemoryShuffleDataRequest) {
errorResponse =
new GetMemoryShuffleDataResponse(
request.getRequestId(),
StatusCode.INTERNAL_ERROR,
errorMsg,
Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
} else {
LOG.error("Cannot handle request {}", request.type());
return;
}
client.getChannel().writeAndFlush(errorResponse);
}
client.getChannel().writeAndFlush(errorResponse);
LOG.error(
"Failed to execute {} for {}. Took {} ms and could not retrieve {} bytes of data",
request.getOperationType(),
requestInfo,
readTime,
dataSize);
} else {
LOG.info(
LOG.debug(
"Successfully executed {} for {}. Took {} ms and retrieved {} bytes of data",
request.getOperationType(),
requestInfo,
Expand Down

0 comments on commit 88b2564

Please sign in to comment.