Skip to content

Commit

Permalink
Http2StreamChannelBootstrap can be shared (#2257)
Browse files Browse the repository at this point in the history
Http2StreamFrameToHttpObjectCodec can be shared
Http2StreamBridgeClientHandler can be shared

Related to #2151 and #2262
  • Loading branch information
violetagg committed Jun 8, 2022
1 parent dae5de8 commit 14b2bfc
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
Expand Down Expand Up @@ -300,8 +301,7 @@ else if (p.state != null) {
return;
}

HttpClientConfig.openStream(channel, this, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue)
.addListener(this);
http2StreamChannelBootstrap(channel).open().addListener(this);
}

@Override
Expand Down Expand Up @@ -354,6 +354,10 @@ public void operationComplete(Future<Http2StreamChannel> future) {
}
}
else {
Http2ConnectionProvider.registerClose(ch, this);
HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())),
opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue);

ChannelOperations<?, ?> ops = ChannelOperations.get(ch);
if (ops != null) {
obs.onStateChange(ops, STREAM_CONFIGURED);
Expand Down Expand Up @@ -420,6 +424,27 @@ else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == n
}
return false;
}

static final AttributeKey<Http2StreamChannelBootstrap> HTTP2_STREAM_CHANNEL_BOOTSTRAP =
AttributeKey.valueOf("http2StreamChannelBootstrap");

static Http2StreamChannelBootstrap http2StreamChannelBootstrap(Channel channel) {
Http2StreamChannelBootstrap http2StreamChannelBootstrap;

for (;;) {
http2StreamChannelBootstrap = channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP).get();
if (http2StreamChannelBootstrap == null) {
http2StreamChannelBootstrap = new Http2StreamChannelBootstrap(channel);
}
else {
return http2StreamChannelBootstrap;
}
if (channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP)
.compareAndSet(null, http2StreamChannelBootstrap)) {
return http2StreamChannelBootstrap;
}
}
}
}

static final class PendingConnectionObserver implements ConnectionObserver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.ChannelOperations;

import static reactor.netty.ReactorNetty.format;

/**
* This handler is intended to work together with {@link Http2StreamFrameToHttpObjectCodec}
Expand All @@ -35,33 +31,14 @@
* @author Violeta Georgieva
* @since 1.0.0
*/
@ChannelHandler.Sharable
final class Http2StreamBridgeClientHandler extends ChannelDuplexHandler {

final ConnectionObserver observer;
final ChannelOperations.OnSetup opsFactory;

Http2StreamBridgeClientHandler(ConnectionObserver listener, ChannelOperations.OnSetup opsFactory) {
this.observer = listener;
this.opsFactory = opsFactory;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
if (HttpClientOperations.log.isDebugEnabled()) {
HttpClientOperations.log.debug(format(ctx.channel(), "New HTTP/2 stream"));
}

ChannelOperations<?, ?> ops = opsFactory.create(Connection.from(ctx.channel()), observer, null);
if (ops != null) {
ops.bind();
}
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,13 @@
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.concurrent.Future;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.ChannelPipelineConfigurer;
Expand Down Expand Up @@ -504,6 +501,65 @@ Http2Settings http2Settings() {
return settings;
}

static void addStreamHandlers(
Channel ch,
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
@Nullable ChannelMetricsRecorder metricsRecorder,
long responseTimeoutMillis,
@Nullable Function<String, String> uriTagValue) {

if (HttpClientOperations.log.isDebugEnabled()) {
HttpClientOperations.log.debug(format(ch, "New HTTP/2 stream"));
}

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT)
.addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER);

if (acceptGzip) {
pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

ChannelOperations.addReactiveBridge(ch, opsFactory, obs);

if (metricsRecorder != null) {
if (metricsRecorder instanceof HttpClientMetricsRecorder) {
ChannelHandler handler;
Channel parent = ch.parent();
ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler);
if (existingHandler != null) {
// This use case can happen only in HTTP/2 clear text connection upgrade
parent.pipeline().remove(NettyPipeline.HttpMetricsHandler);
handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ?
new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler) :
new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler);
}
else {
handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ?
new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue) :
new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue);
}
pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler);
}
}

if (responseTimeoutMillis > -1) {
Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
}

if (log.isDebugEnabled()) {
log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline());
}

ChannelOperations<?, ?> ops = opsFactory.create(Connection.from(ch), obs, null);
if (ops != null) {
ops.bind();
}
}

static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder,
Http2Settings http2Settings, ConnectionObserver observer) {
Http2FrameCodecBuilder http2FrameCodecBuilder =
Expand Down Expand Up @@ -608,20 +664,6 @@ static void configureHttp11Pipeline(ChannelPipeline p,
}
}

static Future<Http2StreamChannel> openStream(
Channel channel,
Http2ConnectionProvider.DisposableAcquire owner,
ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel);
bootstrap.option(ChannelOption.AUTO_READ, false);
bootstrap.handler(new H2Codec(owner, observer, opsFactory, acceptGzip, metricsRecorder, uriTagValue));
return bootstrap.open();
}

static final Pattern FOLLOW_REDIRECT_CODES = Pattern.compile("30[12378]");

static final BiPredicate<HttpClientRequest, HttpClientResponse> FOLLOW_REDIRECT_PREDICATE =
Expand All @@ -639,6 +681,12 @@ static Future<Http2StreamChannel> openStream(

static final int h11orH2C = h11 | h2c;

static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT =
new Http2StreamFrameToHttpObjectCodec(false);

static final Http2StreamBridgeClientHandler HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER =
new Http2StreamBridgeClientHandler();

static final Logger log = Loggers.getLogger(HttpClientConfig.class);

static final LoggingHandler LOGGING_HANDLER =
Expand Down Expand Up @@ -706,6 +754,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
}

static final class H2Codec extends ChannelInitializer<Channel> {

final boolean acceptGzip;
final ChannelMetricsRecorder metricsRecorder;
final ConnectionObserver observer;
Expand Down Expand Up @@ -759,55 +808,14 @@ static final class H2Codec extends ChannelInitializer<Channel> {
protected void initChannel(Channel ch) {
if (observer != null && opsFactory != null && owner != null) {
Http2ConnectionProvider.registerClose(ch, owner);
addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory);
addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory,
acceptGzip, metricsRecorder, responseTimeoutMillis, uriTagValue);
}
else {
// Handle server pushes (inbound streams)
// TODO this is not supported
}
}

void addStreamHandlers(Channel ch, ConnectionObserver obs, ChannelOperations.OnSetup opsFactory) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(false))
.addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeClientHandler(obs, opsFactory));

if (acceptGzip) {
pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

ChannelOperations.addReactiveBridge(ch, opsFactory, obs);

if (metricsRecorder != null) {
if (metricsRecorder instanceof HttpClientMetricsRecorder) {
ChannelHandler handler;
Channel parent = ch.parent();
ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler);
if (existingHandler != null) {
// This use case can happen only in HTTP/2 clear text connection upgrade
parent.pipeline().remove(NettyPipeline.HttpMetricsHandler);
handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ?
new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler) :
new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler);
}
else {
handler = metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder ?
new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue) :
new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue);
}
pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler);
}
}

if (responseTimeoutMillis > -1) {
Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
}

if (log.isDebugEnabled()) {
log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline());
}
}
}

static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ static void addStreamHandlers(Channel ch,
if (accessLogEnabled) {
pipeline.addLast(NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H2.create(accessLog));
}
pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(true))
pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT)
.addLast(NettyPipeline.HttpTrafficHandler,
new Http2StreamBridgeServerHandler(compressPredicate, decoder, encoder, formDecoderProvider,
forwardedHeaderHandler, listener, mapHandle));
Expand Down Expand Up @@ -664,6 +664,9 @@ static void configureHttp11Pipeline(ChannelPipeline p,

static final int h11orH2C = h11 | h2c;

static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT =
new Http2StreamFrameToHttpObjectCodec(true);

static final Logger log = Loggers.getLogger(HttpServerConfig.class);

static final LoggingHandler LOGGING_HANDLER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private void doTestResponseTimeout(HttpClient client, long expectedTimeout)
timeout.set(((ReadTimeoutHandler) handler).getReaderIdleTimeInMillis());
}
})
.doOnDisconnected(conn -> onDisconnected.set(handlerAvailable.test(conn)));
.doOnDisconnected(conn -> onDisconnected.set(conn.channel().isActive() && handlerAvailable.test(conn)));

Mono<String> response =
localClient.get()
Expand Down

0 comments on commit 14b2bfc

Please sign in to comment.