Skip to content

Commit

Permalink
Lazily allocate ChunkedWriteHandler#queue (#13711)
Browse files Browse the repository at this point in the history
Motivation:

When building an HTTP client, a `ChunkedWriteHandler` might be added to
the pipeline just in case a `ChunkedInput` might be written, but that
might never happen.
In this case, we don't need the extra queue allocation and
ChunkedWriteHandler should just behave as a pass-through.

Modifications:

* lazily initialize the queue
* bypass work when the queue is empty

Result:

Less allocations
  • Loading branch information
slandelle committed Jan 12, 2024
1 parent 09ba8c3 commit ce0366b
Showing 1 changed file with 37 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);

private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
private Queue<PendingWrite> queue;
private volatile ChannelHandlerContext ctx;

public ChunkedWriteHandler() {
Expand All @@ -86,6 +86,16 @@ public ChunkedWriteHandler(int maxPendingWrites) {
checkPositive(maxPendingWrites, "maxPendingWrites");
}

private void allocateQueue() {
if (queue == null) {
queue = new ArrayDeque<PendingWrite>();
}
}

private boolean queueIsEmpty() {
return queue == null || queue.isEmpty();
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
Expand Down Expand Up @@ -123,7 +133,12 @@ private void resumeTransfer0(ChannelHandlerContext ctx) {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
queue.add(new PendingWrite(msg, promise));
if (!queueIsEmpty() || msg instanceof ChunkedInput) {
allocateQueue();
queue.add(new PendingWrite(msg, promise));
} else {
ctx.write(msg, promise);
}
}

@Override
Expand All @@ -147,6 +162,9 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
}

private void discard(Throwable cause) {
if (queueIsEmpty()) {
return;
}
for (;;) {
PendingWrite currentWrite = queue.poll();

Expand All @@ -165,9 +183,7 @@ private void discard(Throwable cause) {
} catch (Exception e) {
closeInput(in);
currentWrite.fail(e);
if (logger.isWarnEnabled()) {
logger.warn(ChunkedInput.class.getSimpleName() + " failed", e);
}
logger.warn("ChunkedInput failed", e);
continue;
}

Expand Down Expand Up @@ -195,6 +211,11 @@ private void doFlush(final ChannelHandlerContext ctx) {
return;
}

if (queueIsEmpty()) {
ctx.flush();
return;
}

boolean requiresFlush = true;
ByteBufAllocator allocator = ctx.alloc();
while (channel.isWritable()) {
Expand All @@ -206,7 +227,7 @@ private void doFlush(final ChannelHandlerContext ctx) {

if (currentWrite.promise.isDone()) {
// This might happen e.g. in the case when a write operation
// failed, but there're still unconsumed chunks left.
// failed, but there are still unconsumed chunks left.
// Most chunked input sources would stop generating chunks
// and report end of input, but this doesn't work with any
// source wrapped in HttpChunkedInput.
Expand All @@ -228,13 +249,9 @@ private void doFlush(final ChannelHandlerContext ctx) {
try {
message = chunks.readChunk(allocator);
endOfInput = chunks.isEndOfInput();
// No need to suspend when reached at the end.
suspend = message == null && !endOfInput;

if (message == null) {
// No need to suspend when reached at the end.
suspend = !endOfInput;
} else {
suspend = false;
}
} catch (final Throwable t) {
queue.remove();

Expand Down Expand Up @@ -269,29 +286,29 @@ private void doFlush(final ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(message);
if (endOfInput) {
if (f.isDone()) {
handleEndOfInputFuture(f, currentWrite);
handleEndOfInputFuture(f, chunks, currentWrite);
} else {
// Register a listener which will close the input once the write is complete.
// This is needed because the Chunk may have some resource bound that can not
// be closed before its not written.
// be closed before it's not written.
//
// See https://github.com/netty/netty/issues/303
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
handleEndOfInputFuture(future, currentWrite);
handleEndOfInputFuture(future, chunks, currentWrite);
}
});
}
} else {
final boolean resume = !channel.isWritable();
if (f.isDone()) {
handleFuture(f, currentWrite, resume);
handleFuture(f, chunks, currentWrite, resume);
} else {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
handleFuture(future, currentWrite, resume);
handleFuture(future, chunks, currentWrite, resume);
}
});
}
Expand All @@ -314,8 +331,7 @@ public void operationComplete(ChannelFuture future) {
}
}

private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite) {
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
private static void handleEndOfInputFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite) {
if (!future.isSuccess()) {
closeInput(input);
currentWrite.fail(future.cause());
Expand All @@ -329,8 +345,7 @@ private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite cu
}
}

private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume) {
ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
private void handleFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite, boolean resume) {
if (!future.isSuccess()) {
closeInput(input);
currentWrite.fail(future.cause());
Expand All @@ -346,9 +361,7 @@ private static void closeInput(ChunkedInput<?> chunks) {
try {
chunks.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a chunked input.", t);
}
logger.warn("Failed to close a ChunkedInput.", t);
}
}

Expand Down

0 comments on commit ce0366b

Please sign in to comment.