From 157a4a4c80df9a950f387befbc0f803a66680ad3 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Tue, 16 Jan 2024 14:52:23 +0100 Subject: [PATCH] Save slicing HTTP 2 headers & data Motivation: DefaultHttp2FrameWriter's always create aggregate promises and slices out of headers and data: both could be saved while reducing the amount of pipeline traversals in case the additional cost of creating a sliced buffer surpass the data to be written. Modifications: Small header and data could be copied directly in a single buffer, without any need to create aggregate promises. Result: Faster small data's writes --- .../codec/http2/DefaultHttp2FrameWriter.java | 368 ++++++++++++------ 1 file changed, 256 insertions(+), 112 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java index 9b608921c28..fab24980236 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2FrameWriter.java @@ -71,6 +71,10 @@ */ @UnstableApi public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSizePolicy, Configuration { + + // This is an internal const used to decide when it is worthy to create a retained slice of a buffer vs + // just copying the bytes to the new properly sized buffer + private static final int COPY_SLICE_CUTOFF_BYTES = 128; private static final String STREAM_ID = "Stream ID"; private static final String STREAM_DEPENDENCY = "Stream Dependency"; /** @@ -135,13 +139,25 @@ public void close() { } @Override public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise) { - final SimpleChannelPromiseAggregator promiseAggregator = - new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); - ByteBuf frameHeader = null; try { verifyStreamId(streamId, STREAM_ID); verifyPadding(padding); + } catch (Throwable t) { + return handleFailure(data, null, promise, t, null); + } + if (padding == 0) { + return writeDataNoPadding(ctx, streamId, data, endStream, promise); + } else { + return writeDataWithPadding(ctx, streamId, data, padding, endStream, promise); + } + } + private ChannelFuture writeDataWithPadding(ChannelHandlerContext ctx, int streamId, ByteBuf data, + int padding, boolean endStream, ChannelPromise promise) { + final SimpleChannelPromiseAggregator promiseAggregator = + new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); + ByteBuf frameHeader = null; + try { int remainingData = data.readableBytes(); Http2Flags flags = new Http2Flags(); flags.endOfStream(false); @@ -149,109 +165,157 @@ public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf // Fast path to write frames of payload size maxFrameSize first. if (remainingData > maxFrameSize) { frameHeader = ctx.alloc().buffer(FRAME_HEADER_LENGTH); - writeFrameHeaderInternal(frameHeader, maxFrameSize, DATA, flags, streamId); - do { - // Write the header. - ctx.write(frameHeader.retainedSlice(), promiseAggregator.newPromise()); - - // Write the payload. - ctx.write(data.readRetainedSlice(maxFrameSize), promiseAggregator.newPromise()); - - remainingData -= maxFrameSize; - // Stop iterating if remainingData == maxFrameSize so we can take care of reference counts below. - } while (remainingData > maxFrameSize); + remainingData = writeDataFrames(ctx, streamId, data, frameHeader, + flags, promiseAggregator, remainingData, maxFrameSize); + if (remainingData != maxFrameSize) { + frameHeader.release(); + frameHeader = null; + } } - - if (padding == 0) { + if (remainingData == maxFrameSize) { + remainingData = 0; // Write the header. - if (frameHeader != null) { - frameHeader.release(); + ByteBuf lastFrame; + if (frameHeader == null) { + lastFrame = ctx.alloc().buffer(FRAME_HEADER_LENGTH); + writeFrameHeaderInternal(lastFrame, maxFrameSize, DATA, flags, streamId); + } else { + lastFrame = frameHeader.slice(); frameHeader = null; } - ByteBuf frameHeader2 = ctx.alloc().buffer(FRAME_HEADER_LENGTH); - flags.endOfStream(endStream); - writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId); - ctx.write(frameHeader2, promiseAggregator.newPromise()); + ctx.write(lastFrame, promiseAggregator.newPromise()); // Write the payload. - ByteBuf lastFrame = data.readSlice(remainingData); + lastFrame = data.readableBytes() != maxFrameSize? data.readSlice(maxFrameSize) : data; data = null; ctx.write(lastFrame, promiseAggregator.newPromise()); - } else { - if (remainingData != maxFrameSize) { - if (frameHeader != null) { - frameHeader.release(); - frameHeader = null; - } - } else { - remainingData -= maxFrameSize; - // Write the header. - ByteBuf lastFrame; - if (frameHeader == null) { - lastFrame = ctx.alloc().buffer(FRAME_HEADER_LENGTH); - writeFrameHeaderInternal(lastFrame, maxFrameSize, DATA, flags, streamId); - } else { - lastFrame = frameHeader.slice(); - frameHeader = null; - } - ctx.write(lastFrame, promiseAggregator.newPromise()); + } - // Write the payload. - lastFrame = data.readableBytes() != maxFrameSize ? data.readSlice(maxFrameSize) : data; - data = null; - ctx.write(lastFrame, promiseAggregator.newPromise()); - } + do { + int frameDataBytes = min(remainingData, maxFrameSize); + int framePaddingBytes = min(padding, max(0, maxFrameSize - 1 - frameDataBytes)); - do { - int frameDataBytes = min(remainingData, maxFrameSize); - int framePaddingBytes = min(padding, max(0, maxFrameSize - 1 - frameDataBytes)); - - // Decrement the remaining counters. - padding -= framePaddingBytes; - remainingData -= frameDataBytes; - - // Write the header. - ByteBuf frameHeader2 = ctx.alloc().buffer(DATA_FRAME_HEADER_LENGTH); - flags.endOfStream(endStream && remainingData == 0 && padding == 0); - flags.paddingPresent(framePaddingBytes > 0); - writeFrameHeaderInternal(frameHeader2, framePaddingBytes + frameDataBytes, DATA, flags, streamId); - writePaddingLength(frameHeader2, framePaddingBytes); - ctx.write(frameHeader2, promiseAggregator.newPromise()); - - // Write the payload. - if (data != null) { // Make sure Data is not null - if (remainingData == 0) { - ByteBuf lastFrame = data.readSlice(frameDataBytes); - data = null; - ctx.write(lastFrame, promiseAggregator.newPromise()); - } else { - ctx.write(data.readRetainedSlice(frameDataBytes), promiseAggregator.newPromise()); - } - } - // Write the frame padding. - if (paddingBytes(framePaddingBytes) > 0) { - ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)), - promiseAggregator.newPromise()); + // Decrement the remaining counters. + padding -= framePaddingBytes; + remainingData -= frameDataBytes; + + // Write the header. + ByteBuf frameHeader2 = ctx.alloc().buffer(DATA_FRAME_HEADER_LENGTH); + flags.endOfStream(endStream && remainingData == 0 && padding == 0); + flags.paddingPresent(framePaddingBytes > 0); + writeFrameHeaderInternal(frameHeader2, framePaddingBytes + frameDataBytes, DATA, flags, + streamId); + writePaddingLength(frameHeader2, framePaddingBytes); + ctx.write(frameHeader2, promiseAggregator.newPromise()); + + // Write the payload. + if (data != null) { // Make sure Data is not null + if (remainingData == 0) { + ByteBuf lastFrame = data.readSlice(frameDataBytes); + data = null; + ctx.write(lastFrame, promiseAggregator.newPromise()); + } else { + ctx.write(data.readRetainedSlice(frameDataBytes), promiseAggregator.newPromise()); } - } while (remainingData != 0 || padding != 0); - } + } + // Write the frame padding. + if (paddingBytes(framePaddingBytes) > 0) { + ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)), + promiseAggregator.newPromise()); + } + } while (remainingData != 0 || padding != 0); + return promiseAggregator.doneAllocatingPromises(); } catch (Throwable cause) { - if (frameHeader != null) { + return handleFailure(data, frameHeader, promise, cause, promiseAggregator); + } + } + + private ChannelFuture writeDataNoPadding(ChannelHandlerContext ctx, int streamId, ByteBuf data, + boolean endStream, ChannelPromise promise) { + int dataLength = data.readableBytes(); + if (dataLength <= maxFrameSize && (data.hasArray() || dataLength <= COPY_SLICE_CUTOFF_BYTES)) { + return writeCopySingleFrameDataNoPadding(ctx, streamId, data, endStream, promise); + } + return writeSlicedDataNoPadding(ctx, streamId, data, endStream, promise); + } + + private ChannelFuture writeSlicedDataNoPadding(ChannelHandlerContext ctx, int streamId, ByteBuf data, + boolean endStream, ChannelPromise promise) { + SimpleChannelPromiseAggregator promiseAggregator = + new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); + ByteBuf frameHeader = null; + try { + int remainingData = data.readableBytes(); + final Http2Flags flags = new Http2Flags(); + flags.endOfStream(false); + flags.paddingPresent(false); + // Fast path to write frames of payload size maxFrameSize first. + if (remainingData > maxFrameSize) { + frameHeader = ctx.alloc().buffer(FRAME_HEADER_LENGTH); + remainingData = writeDataFrames(ctx, streamId, data, frameHeader, + flags, promiseAggregator, remainingData, maxFrameSize); frameHeader.release(); + frameHeader = null; } - // Use a try/finally here in case the data has been released before calling this method. This is not - // necessary above because we internally allocate frameHeader. + // Write the header. + final ByteBuf frameHeader2 = ctx.alloc().buffer(FRAME_HEADER_LENGTH); + flags.endOfStream(endStream); + writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId); + ctx.write(frameHeader2, promiseAggregator.newPromise()); + + // Write the payload. + ByteBuf lastFrame = data.readSlice(remainingData); + data = null; + ctx.write(lastFrame, promiseAggregator.newPromise()); + return promiseAggregator.doneAllocatingPromises(); + } catch (Throwable cause) { + return handleFailure(data, frameHeader, promise, cause, promiseAggregator); + } + } + + private static int writeDataFrames(ChannelHandlerContext ctx, int streamId, ByteBuf data, ByteBuf frameHeader, + Http2Flags flags, SimpleChannelPromiseAggregator promiseAggregator, + int remainingData, int maxFrameSize) { + writeFrameHeaderInternal(frameHeader, maxFrameSize, DATA, flags, streamId); + do { + // Write the header. + ctx.write(frameHeader.retainedSlice(), promiseAggregator.newPromise()); + + // Write the payload. + ctx.write(data.readRetainedSlice(maxFrameSize), promiseAggregator.newPromise()); + + remainingData -= maxFrameSize; + // Stop iterating if remainingData == maxFrameSize so we can take care of reference counts below. + } while (remainingData > maxFrameSize); + return remainingData; + } + + private ChannelFuture writeCopySingleFrameDataNoPadding(ChannelHandlerContext ctx, int streamId, ByteBuf data, + boolean endStream, ChannelPromise promise) { + try { + final int remainingData = data.readableBytes(); + assert remainingData <= maxFrameSize && (data.hasArray() || remainingData <= COPY_SLICE_CUTOFF_BYTES); + final ByteBuf frameHeader2AndLastFrame = ctx.alloc().buffer(FRAME_HEADER_LENGTH + remainingData); try { - if (data != null) { - data.release(); - } - } finally { - promiseAggregator.setFailure(cause); - promiseAggregator.doneAllocatingPromises(); + final Http2Flags flags = new Http2Flags(); + flags.paddingPresent(false); + flags.endOfStream(endStream); + writeFrameHeaderInternal(frameHeader2AndLastFrame, remainingData, DATA, flags, streamId); + // copy the last frame content + frameHeader2AndLastFrame.writeBytes(data, remainingData); + ByteBuf lastFrame = data; + // here we're in charge to release the data buffer + data = null; + lastFrame.release(); + } catch (Throwable cause) { + frameHeader2AndLastFrame.release(); + throw cause; } - return promiseAggregator; + ctx.write(frameHeader2AndLastFrame, promise); + return promise; + } catch (Throwable cause) { + return handleFailure(data, null, promise, cause, null); } - return promiseAggregator.doneAllocatingPromises(); } @Override @@ -456,24 +520,31 @@ public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, @Override public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload, ChannelPromise promise) { - SimpleChannelPromiseAggregator promiseAggregator = - new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); + // heap-based frames always got copied + final boolean copyPayload = payload.hasArray() || payload.readableBytes() <= COPY_SLICE_CUTOFF_BYTES; + SimpleChannelPromiseAggregator promiseAggregator = null; + ByteBuf buf = null; try { verifyStreamOrConnectionId(streamId, STREAM_ID); - ByteBuf buf = ctx.alloc().buffer(FRAME_HEADER_LENGTH); + buf = ctx.alloc().buffer(FRAME_HEADER_LENGTH + (copyPayload? payload.readableBytes() : 0)); // Assume nothing below will throw until buf is written. That way we don't have to take care of ownership // in the catch block. writeFrameHeaderInternal(buf, payload.readableBytes(), frameType, flags, streamId); - ctx.write(buf, promiseAggregator.newPromise()); - } catch (Throwable t) { - try { + if (copyPayload || buf.maxFastWritableBytes() >= payload.readableBytes()) { + // maybe we've been lucky, even if we were not supposed to copy the frame + buf.writeBytes(payload); payload.release(); - } finally { - promiseAggregator.setFailure(t); - promiseAggregator.doneAllocatingPromises(); + payload = null; + ctx.write(buf, promise); + return promise; } - return promiseAggregator; + // Slow-path + promiseAggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); + ctx.write(buf, promiseAggregator.newPromise()); + } catch (Throwable t) { + return handleFailure(payload, buf, promise, t, promiseAggregator); } + assert !copyPayload && promiseAggregator != null; try { ctx.write(payload, promiseAggregator.newPromise()); } catch (Throwable t) { @@ -482,12 +553,36 @@ public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int s return promiseAggregator.doneAllocatingPromises(); } + private static ChannelPromise handleFailure(ByteBuf first, ByteBuf second, + ChannelPromise promise, Throwable t, + SimpleChannelPromiseAggregator promiseAggregator) { + try { + if (first != null) { + first.release(); + } + } catch (Throwable silent) { + } + try { + if (second != null) { + second.release(); + } + } catch (Throwable silent) { + } finally { + if (promiseAggregator != null) { + promiseAggregator.setFailure(t); + promiseAggregator.doneAllocatingPromises(); + return promiseAggregator; + } else { + promise.setFailure(t); + return promise; + } + } + } + private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, boolean hasPriority, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { ByteBuf headerBlock = null; - SimpleChannelPromiseAggregator promiseAggregator = - new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); try { verifyStreamId(streamId, STREAM_ID); if (hasPriority) { @@ -498,15 +593,70 @@ private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, // Encode the entire header block. headerBlock = ctx.alloc().buffer(); - headersEncoder.encodeHeaders(streamId, headers, headerBlock); + try { + headersEncoder.encodeHeaders(streamId, headers, headerBlock); + } catch (Http2Exception e) { + return promise.setFailure(e); + } Http2Flags flags = new Http2Flags().endOfStream(endStream).priorityPresent(hasPriority).paddingPresent(padding > 0); - - // Read the first fragment (possibly everything). int nonFragmentBytes = padding + flags.getNumPriorityBytes(); int maxFragmentLength = maxFrameSize - nonFragmentBytes; - ByteBuf fragment = headerBlock.readRetainedSlice(min(headerBlock.readableBytes(), maxFragmentLength)); + int fragmentBytes = min(headerBlock.readableBytes(), maxFragmentLength); + // if the first fragment fully consume the headerBlock, we can try making it fit into the + // header frame if is small enough: it would avoid using a promise aggregator and reduce the number + // of pipeline traversal + if (fragmentBytes <= COPY_SLICE_CUTOFF_BYTES && fragmentBytes == headerBlock.readableBytes() && + paddingBytes(padding) <= 64) { + // no need for a continuation either! + try { + int paddingBytes = paddingBytes(padding); + flags.endOfHeaders(true); + final int bufBytes = + HEADERS_FRAME_HEADER_LENGTH + fragmentBytes + (paddingBytes > 0? paddingBytes : 0); + ByteBuf buf = ctx.alloc().buffer(bufBytes); + int payloadLength = fragmentBytes + nonFragmentBytes; + writeFrameHeaderInternal(buf, payloadLength, HEADERS, flags, streamId); + writePaddingLength(buf, padding); + if (hasPriority) { + buf.writeInt(exclusive? (int) (0x80000000L | streamDependency) : streamDependency); + // Adjust the weight so that it fits into a single byte on the wire. + buf.writeByte(weight - 1); + } + buf.writeBytes(headerBlock, fragmentBytes); + if (paddingBytes > 0) { + // this is quite fast assuming small padding + // see https://github.com/netty/netty/pull/13693 + buf.writeZero(paddingBytes); + } + ctx.write(buf, promise); + } catch (Throwable t) { + promise.setFailure(t); + PlatformDependent.throwException(t); + } + return promise; + } else { + return writeHeadersPartsInternal(ctx, streamId, padding, hasPriority, streamDependency, + weight, exclusive, promise, headerBlock, fragmentBytes, + flags, nonFragmentBytes); + } + } finally { + if (headerBlock != null) { + headerBlock.release(); + } + } + } + + private ChannelPromise writeHeadersPartsInternal(ChannelHandlerContext ctx, int streamId, int padding, + boolean hasPriority, int streamDependency, short weight, + boolean exclusive, ChannelPromise promise, ByteBuf headerBlock, + int fragmentBytes, Http2Flags flags, int nonFragmentBytes) { + final SimpleChannelPromiseAggregator promiseAggregator = + new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor()); + try { + // Read the first fragment (possibly everything). + ByteBuf fragment = headerBlock.readRetainedSlice(fragmentBytes); // Set the end of headers flag for the first frame. flags.endOfHeaders(!headerBlock.isReadable()); @@ -517,7 +667,7 @@ private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, writePaddingLength(buf, padding); if (hasPriority) { - buf.writeInt(exclusive ? (int) (0x80000000L | streamDependency) : streamDependency); + buf.writeInt(exclusive? (int) (0x80000000L | streamDependency) : streamDependency); // Adjust the weight so that it fits into a single byte on the wire. buf.writeByte(weight - 1); @@ -535,16 +685,10 @@ private ChannelFuture writeHeadersInternal(ChannelHandlerContext ctx, if (!flags.endOfHeaders()) { writeContinuationFrames(ctx, streamId, headerBlock, promiseAggregator); } - } catch (Http2Exception e) { - promiseAggregator.setFailure(e); } catch (Throwable t) { promiseAggregator.setFailure(t); promiseAggregator.doneAllocatingPromises(); PlatformDependent.throwException(t); - } finally { - if (headerBlock != null) { - headerBlock.release(); - } } return promiseAggregator.doneAllocatingPromises(); }