Skip to content

Commit

Permalink
Move from ChannelOutboundHandlerAdapter to ChannelHandlerAdapter
Browse files Browse the repository at this point in the history
- There is no VoidPromise anymore so no need to handle this use case in ProxyProvider

Related to #1873
  • Loading branch information
violetagg committed May 20, 2022
1 parent 661fcbf commit e515979
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public interface NettyPipeline {
String OnChannelReadIdle = LEFT + "onChannelReadIdle";
String OnChannelWriteIdle = LEFT + "onChannelWriteIdle";
String ProxyHandler = LEFT + "proxyHandler";
String UnvoidHandler = LEFT + "unvoidHandler";
String ProxyLoggingHandler = LEFT + "proxyLoggingHandler";
String ProxyProtocolDecoder = LEFT + "proxyProtocolDecoder";
String ProxyProtocolReader = LEFT + "proxyProtocolReader";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelOutboundHandlerAdapter;
import io.netty5.channel.ChannelPromise;
import io.netty5.handler.ssl.SslHandler;
import io.netty5.util.concurrent.Future;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
Expand Down Expand Up @@ -58,7 +57,7 @@ public ChannelMetricsRecorder recorder() {
return recorder;
}

static final class ConnectMetricsHandler extends ChannelOutboundHandlerAdapter {
static final class ConnectMetricsHandler extends ChannelHandlerAdapter {

final ChannelMetricsRecorder recorder;

Expand All @@ -67,18 +66,17 @@ static final class ConnectMetricsHandler extends ChannelOutboundHandlerAdapter {
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
public Future<Void> connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress) {
long connectTimeStart = System.nanoTime();
super.connect(ctx, remoteAddress, localAddress, promise);
promise.addListener(future -> {
ctx.pipeline().remove(this);

recorder.recordConnectTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
});
return ctx.connect(remoteAddress, localAddress)
.addListener(future -> {
ctx.pipeline().remove(this);

recorder.recordConnectTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - connectTimeStart),
future.isSuccess() ? SUCCESS : ERROR);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package reactor.netty.channel;

import io.netty5.channel.ChannelHandler;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelOutboundHandlerAdapter;
import io.netty5.channel.ChannelPromise;
import io.netty5.util.concurrent.Future;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -95,7 +95,7 @@ protected void recordWrite(ChannelHandlerContext ctx, SocketAddress address, lon
}
}

static final class ContextAwareConnectMetricsHandler extends ChannelOutboundHandlerAdapter {
static final class ContextAwareConnectMetricsHandler extends ChannelHandlerAdapter {

final ContextAwareChannelMetricsRecorder recorder;

Expand All @@ -104,14 +104,13 @@ static final class ContextAwareConnectMetricsHandler extends ChannelOutboundHand
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
public Future<Void> connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress) {
long connectTimeStart = System.nanoTime();
super.connect(ctx, remoteAddress, localAddress, promise);
promise.addListener(future -> {
ctx.pipeline().remove(this);
recordConnectTime(ctx, remoteAddress, connectTimeStart, future.isSuccess() ? SUCCESS : ERROR);
});
return ctx.connect(remoteAddress, localAddress)
.addListener(future -> {
ctx.pipeline().remove(this);
recordConnectTime(ctx, remoteAddress, connectTimeStart, future.isSuccess() ? SUCCESS : ERROR);
});
}

void recordConnectTime(ChannelHandlerContext ctx, SocketAddress address, long connectTimeStart, String status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import java.util.regex.Pattern;

import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelOutboundHandlerAdapter;
import io.netty5.channel.ChannelPipeline;
import io.netty5.channel.ChannelPromise;
import io.netty5.handler.codec.http.DefaultHttpHeaders;
import io.netty5.handler.codec.http.HttpHeaders;
import io.netty5.handler.logging.LogLevel;
Expand Down Expand Up @@ -176,18 +173,6 @@ public void addProxyHandler(Channel channel) {
Objects.requireNonNull(channel, "channel");
ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(NettyPipeline.ProxyHandler, newProxyHandler());
// For SOCKS proxy, the netty SOCKS handlers may register listeners in channel promises, so we need to register a
// special handler which ensures that any VoidPromise will be converted to "unvoided" promises (for support of listeners).
// Note: an example of a VoidPromise which does not support listeners is the MonoSendMany.SendManyInner promise.
if (this.type == Proxy.SOCKS4 || type == Proxy.SOCKS5) {
pipeline.addAfter(NettyPipeline.ProxyHandler, NettyPipeline.UnvoidHandler, new ChannelOutboundHandlerAdapter() {
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.write(msg, promise.unvoid());
}
});
}

if (pipeline.get(NettyPipeline.LoggingHandler) != null) {
pipeline.addBefore(NettyPipeline.ProxyHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
package reactor.netty.resources;

import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelOutboundHandlerAdapter;
import io.netty5.channel.ChannelPromise;
import io.netty5.channel.DefaultChannelPromise;
import io.netty5.handler.codec.http.HttpHeaderNames;
import io.netty5.handler.codec.http.HttpResponseStatus;
import io.netty5.handler.ssl.SslContext;
import io.netty5.handler.ssl.SslContextBuilder;
import io.netty5.handler.ssl.SslProvider;
import io.netty5.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty5.handler.ssl.util.SelfSignedCertificate;
import io.netty5.util.concurrent.Future;
import io.netty5.util.concurrent.FutureContextListener;
import io.netty5.util.concurrent.Promise;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -269,13 +270,15 @@ private void doTestSslEngineClosed(HttpClient client, AtomicInteger closeCount,
client.doOnChannelInit(
(o, c, address) ->
c.pipeline()
.addFirst(new ChannelOutboundHandlerAdapter() {
.addFirst(new ChannelHandlerAdapter() {

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress,
new TestPromise(ctx.channel(), promise, closeCount));
public Future<Void> connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress) {
Promise<Void> promise = ctx.newPromise();
ctx.connect(remoteAddress, localAddress)
.addListener(ctx.channel(), new TestPromise(promise, closeCount));
return promise.asFuture();
}
}))
.get()
Expand Down Expand Up @@ -484,32 +487,28 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie
}
}

static final class TestPromise extends DefaultChannelPromise {
static final class TestPromise implements FutureContextListener<Channel, Void> {

final ChannelPromise parent;
final Promise<Void> parent;
final AtomicInteger closeCount;

public TestPromise(Channel channel, ChannelPromise parent, AtomicInteger closeCount) {
super(channel);
public TestPromise(Promise<Void> parent, AtomicInteger closeCount) {
this.parent = parent;
this.closeCount = closeCount;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public boolean trySuccess(Void result) {
boolean r;
public void operationComplete(Channel channel, Future<? extends Void> future) {
if (closeCount.getAndDecrement() > 0) {
//"FutureReturnValueIgnored" this is deliberate
channel().close();
r = parent.trySuccess(result);
channel.close();
parent.trySuccess(null);
}
else {
r = parent.trySuccess(result);
parent.trySuccess(null);
//"FutureReturnValueIgnored" this is deliberate
channel().close();
channel.close();
}
return r;
}
}
}

0 comments on commit e515979

Please sign in to comment.