Skip to content

Commit

Permalink
Avoid extra buffer to prepend frame size (apache#3560)
Browse files Browse the repository at this point in the history
* Avoid extra buffer to prepend frame size

* Fixed checkstyle

* Fixed touch methods on ReadResponse

* Fixed frame size for protobuf requests

* Removed unwanted changes

* Fixed AuthResponse in v2 protocol

* Fixed test
  • Loading branch information
merlimat authored and yaalsn committed Jan 30, 2023
1 parent 5eb9c59 commit 759f911
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -337,11 +336,9 @@ protected void initChannel(SocketChannel ch) throws Exception {

pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));

// For ByteBufList, skip the usual LengthFieldPrepender and have the encoder itself to add it
pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
pipeline.addLast("bytebufList", ByteBufList.ENCODER);

pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));

pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
Expand Down Expand Up @@ -406,7 +403,6 @@ protected void initChannel(LocalChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));

pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,17 @@ public Object encode(Object msg, ByteBufAllocator allocator)
BookieProtocol.Request r = (BookieProtocol.Request) msg;
if (r instanceof BookieProtocol.AddRequest) {
BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
int totalHeaderSize = 4 // for the header
+ BookieProtocol.MASTER_KEY_LENGTH; // for the master key
ByteBuf buf = allocator.buffer(totalHeaderSize);
ByteBufList data = ar.getData();

int totalHeaderSize = 4 // for the request header
+ BookieProtocol.MASTER_KEY_LENGTH; // for the master key

int totalPayloadSize = totalHeaderSize + data.readableBytes();
ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */);
buf.writeInt(totalPayloadSize); // Frame header
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
ByteBufList data = ar.getData();

ar.recycle();
data.prepend(buf);
return data;
Expand All @@ -123,7 +128,8 @@ public Object encode(Object msg, ByteBufAllocator allocator)
totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
}

ByteBuf buf = allocator.buffer(totalHeaderSize);
ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */);
buf.writeInt(totalHeaderSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
Expand All @@ -136,7 +142,8 @@ public Object encode(Object msg, ByteBufAllocator allocator)
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage();
int totalHeaderSize = 4; // for request type
int totalSize = totalHeaderSize + am.getSerializedSize();
ByteBuf buf = allocator.buffer(totalSize);
ByteBuf buf = allocator.buffer(totalSize + 4 /* frame size */);
buf.writeInt(totalSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
ByteBufOutputStream bufStream = new ByteBufOutputStream(buf);
am.writeTo(bufStream);
Expand Down Expand Up @@ -230,14 +237,17 @@ public ResponseEnDeCoderPreV3(ExtensionRegistry extensionRegistry) {
this.extensionRegistry = extensionRegistry;
}

private static final int RESPONSE_HEADERS_SIZE = 24;

@Override
public Object encode(Object msg, ByteBufAllocator allocator)
throws Exception {
if (!(msg instanceof BookieProtocol.Response)) {
return msg;
}
BookieProtocol.Response r = (BookieProtocol.Response) msg;
ByteBuf buf = allocator.buffer(24);
ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame size */);
buf.writerIndex(4); // Leave the placeholder for the frame size
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0));

try {
Expand All @@ -248,19 +258,26 @@ public Object encode(Object msg, ByteBufAllocator allocator)

BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse) r;
if (rr.hasData()) {
int frameSize = RESPONSE_HEADERS_SIZE + rr.getData().readableBytes();
buf.setInt(0, frameSize);
return ByteBufList.get(buf, rr.getData());
} else {
buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
return buf;
}
} else if (msg instanceof BookieProtocol.AddResponse) {
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size

return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage();
return ByteBufList.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
ByteBuf payload = Unpooled.wrappedBuffer(am.toByteArray());
int frameSize = 4 + payload.readableBytes();
buf.setInt(0, frameSize);
return ByteBufList.get(buf, payload);
} else {
LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
return msg;
Expand Down Expand Up @@ -353,24 +370,27 @@ public Object encode(Object msg, ByteBufAllocator allocator) throws Exception {

private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator allocator) {
int size = msg.getSerializedSize();
int frameSize = size + 4;

// Protobuf serialization is the last step of the netty pipeline. We used to allocate
// a heap buffer while serializing and pass it down to netty library.
// In AbstractChannel#filterOutboundMessage(), netty copies that data to a direct buffer if
// it is currently in heap (otherwise skips it and uses it directly).
// Allocating a direct buffer reducing unncessary CPU cycles for buffer copies in BK client
// and also helps alleviate pressure off the GC, since there is less memory churn.
// Bookies aren't usually CPU bound. This change improves READ_ENTRY code paths by a small factor as well.
ByteBuf buf = allocator.directBuffer(size, size);
ByteBuf buf = allocator.directBuffer(frameSize, frameSize);
buf.writeInt(size);

try {
msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.readerIndex(), size)));
msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.writerIndex(), size)));
} catch (IOException e) {
// This is in-memory serialization, should not fail
throw new RuntimeException(e);
}

// Advance writer idx
buf.writerIndex(buf.capacity());
buf.writerIndex(frameSize);
return buf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ public int refCnt() {

@Override
public ReferenceCounted retain() {
return data.retain();
data.retain();
return this;
}

@Override
Expand All @@ -481,12 +482,14 @@ public ReferenceCounted retain(int increment) {

@Override
public ReferenceCounted touch() {
return data.touch();
data.touch();
return this;
}

@Override
public ReferenceCounted touch(Object hint) {
return data.touch(hint);
data.touch(hint);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
Expand Down Expand Up @@ -585,10 +584,9 @@ protected ChannelFuture connect() {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry));
pipeline.addLast(
"bookieProtoDecoder",
Expand Down Expand Up @@ -1274,7 +1272,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
exceptionCounter.inc();
if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) {
LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress());
LOG.error("Corrupted frame received from bookie: {}", ctx.channel());
ctx.close();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
* will need to be encoded on the channel. There are 2 utility encoders:
* <ul>
* <li>{@link #ENCODER}: regular encode that will write all the buffers in the {@link ByteBufList} on the channel</li>
* <li>{@link #ENCODER_WITH_SIZE}: similar to the previous one, but also prepend a 4 bytes size header, once, carrying
* the size of the readable bytes across all the buffers contained in the {@link ByteBufList}</li>
* </ul>
*
* <p>Example:
Expand Down Expand Up @@ -305,40 +303,20 @@ public ReferenceCounted touch(Object hint) {
/**
* Encoder for the {@link ByteBufList} that doesn't prepend any size header.
*/
public static final Encoder ENCODER = new Encoder(false);

/**
* Encoder for the {@link ByteBufList} that will prepend a 4 byte header with the size of the whole
* {@link ByteBufList} readable bytes.
*/
public static final Encoder ENCODER_WITH_SIZE = new Encoder(true);
public static final Encoder ENCODER = new Encoder();

/**
* {@link ByteBufList} encoder.
*/
@Sharable
public static class Encoder extends ChannelOutboundHandlerAdapter {

private final boolean prependSize;

public Encoder(boolean prependSize) {
this.prependSize = prependSize;
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBufList) {
ByteBufList b = (ByteBufList) msg;

try {
if (prependSize) {
// Prepend the frame size before writing the buffer list, so that we only have 1 single size
// header
ByteBuf sizeBuffer = ctx.alloc().directBuffer(4, 4);
sizeBuffer.writeInt(b.readableBytes());
ctx.write(sizeBuffer, ctx.voidPromise());
}

// Write each buffer individually on the socket. The retain() here is needed to preserve the fact
// that ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased
// and it gets written multiple times, the individual buffers refcount should be reflected as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public void testV3ResponseDecoderNoFallback() throws Exception {
}
assertEquals(0, outList.size());

v3Decoder.channelRead(
ctx,
v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
ByteBuf serWithFrameSize = (ByteBuf) v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT);
ByteBuf ser = serWithFrameSize.slice(4, serWithFrameSize.readableBytes() - 4);
v3Decoder.channelRead(ctx, ser);
assertEquals(1, outList.size());
}

Expand Down

0 comments on commit 759f911

Please sign in to comment.