Skip to content

Commit

Permalink
Use alternative approach: Remove CopyingEncoder and make the encoder …
Browse files Browse the repository at this point in the history
…suitable for both use cases
  • Loading branch information
lhotari committed May 22, 2024
1 parent 1ffda4c commit aa1543f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -112,10 +113,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
} else {
ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
}
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
} else {
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);

if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());
Expand All @@ -128,7 +127,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
// ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling
// auto-read.
ch.pipeline().addLast("flowController", new FlowControlHandler());
ServerCnx cnx = newServerCnx(pulsar, listenerName);
ChannelHandler cnx = newServerCnx(pulsar, listenerName);
ch.pipeline().addLast("handler", cnx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -146,12 +147,11 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier<ClientCnx
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true));

// Setup channel except for the SsHandler for TLS enabled connections
ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);

ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
ch.pipeline().addLast("handler", clientCnxSupplier.get());
ch.pipeline().addLast("handler", (ChannelHandler) clientCnxSupplier.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -109,7 +108,8 @@ public ReferenceCounted touch(Object hint) {
}

public static final Encoder ENCODER = new Encoder();
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
@Deprecated
public static final Encoder COPYING_ENCODER = ENCODER;

@Sharable
@SuppressWarnings("checkstyle:JavadocType")
Expand All @@ -122,9 +122,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
// Write each buffer individually on the socket. The retain() here is needed to preserve the fact that
// ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased and it
// gets written multiple times, the individual buffers refcount should be reflected as well.
// .asReadOnly() is needed to prevent SslHandler from modifying the input buffers.
try {
ctx.write(b.getFirst().retainedDuplicate(), ctx.voidPromise());
ctx.write(b.getSecond().retainedDuplicate(), promise);
ctx.write(b.getFirst().asReadOnly().retain(), ctx.voidPromise());
ctx.write(b.getSecond().asReadOnly().retain(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
Expand All @@ -133,56 +134,4 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
}

@Sharable
@SuppressWarnings("checkstyle:JavadocType")
public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;

ChannelPromise compositePromise = ctx.newPromise();
compositePromise.addListener(future -> {
// release the ByteBufPair after the write operation is completed
ReferenceCountUtil.safeRelease(b);
// complete the promise passed as an argument unless it's a void promise
if (!promise.isVoid()) {
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});

// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
// for multiple requests.
ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise());
ctx.write(nioBufferCopy(b.getSecond()), compositePromise);
} else {
ctx.write(msg, promise);
}
}

// Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method.
// This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is
// not thread safe when the ByteBuf instance is shared across multiple threads.
// This method works around the issue.
// Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation.
// This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after
// the write method completes.
private ByteBuf nioBufferCopy(ByteBuf buf) {
// avoid calling nioBufferCount() for performance reasons on CompositeByteBuf
// there's a similar optimization in Netty's SslHandler.wrap method where this is explained
if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) {
return Unpooled.wrappedBuffer(buf.nioBuffers());
} else {
// Single buffer, no need to wrap it in an array as the nioBuffers() method would do
return Unpooled.wrappedBuffer(buf.nioBuffer());
}
}
}

}

0 comments on commit aa1543f

Please sign in to comment.