Skip to content

Commit

Permalink
Merge #2758 into 1.1.6
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 5, 2023
2 parents 70e7216 + c1b9631 commit 95a8f47
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableServer;
import reactor.netty.FutureMono;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.util.MapUtils;
Expand Down Expand Up @@ -530,12 +531,15 @@ public final void dispose() {
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void disposeNow(Duration timeout) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
if (isDisposed()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server has been disposed"));
}
return;
}
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
dispose();
Mono<Void> terminateSignals = Mono.empty();
if (config.channelGroup != null && config.channelGroup.size() > 0) {
Expand All @@ -544,10 +548,21 @@ public void disposeNow(Duration timeout) {
// Wait for the running requests to finish
for (Channel channel : config.channelGroup) {
Channel parent = channel.parent();
// For TCP and HTTP/1.1 the channel parent is the ServerChannel
boolean isParentServerChannel = parent instanceof ServerChannel;
List<Mono<Void>> monos =
MapUtils.computeIfAbsent(channelsToMono,
parent instanceof ServerChannel ? channel : parent,
key -> new ArrayList<>());
isParentServerChannel ? channel : parent,
key -> {
List<Mono<Void>> list = new ArrayList<>();
if (!isParentServerChannel) {
// In case of HTTP/2 Reactor Netty will send GO_AWAY with lastStreamId to notify the
// client to stop opening streams, the actual CLOSE will happen when all
// streams up to lastStreamId are closed
list.add(FutureMono.from(key.close()));
}
return list;
});
ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
if (ops != null) {
monos.add(ops.onTerminate().doFinally(sig -> ops.dispose()));
Expand All @@ -558,16 +573,12 @@ public void disposeNow(Duration timeout) {
List<Mono<Void>> monos = entry.getValue();
if (monos.isEmpty()) {
// At this point there are no running requests for this channel
// This can happen for TCP and HTTP/1.1
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
terminateSignals =
Mono.when(monos)
// At this point there are no running requests for this channel
// "FutureReturnValueIgnored" this is deliberate
.doFinally(sig -> channel.close())
.and(terminateSignals);
terminateSignals = Mono.when(monos).and(terminateSignals);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ static void configureH2Pipeline(ChannelPipeline p,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -538,6 +539,12 @@ static void configureH2Pipeline(ChannelPipeline p,
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.server.h2"));
Expand Down Expand Up @@ -569,6 +576,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
HttpRequestDecoderSpec decoder,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -587,7 +595,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
decoder.allowDuplicateContentLengths());

Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate,
cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, formDecoderProvider,
cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider,
forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder,
minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());

Expand Down Expand Up @@ -912,6 +920,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean debug,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -935,6 +944,12 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (debug) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(
LogLevel.DEBUG,
Expand Down Expand Up @@ -981,6 +996,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -1003,6 +1019,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
this.cookieDecoder = initializer.cookieDecoder;
this.cookieEncoder = initializer.cookieEncoder;
this.decoder = initializer.decoder;
this.enableGracefulShutdown = initializer.enableGracefulShutdown;
this.formDecoderProvider = initializer.formDecoderProvider;
this.forwardedHeaderHandler = initializer.forwardedHeaderHandler;
this.http2Settings = initializer.http2Settings;
Expand All @@ -1027,7 +1044,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {

if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder,
formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout,
enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout,
listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());
return;
}
Expand Down Expand Up @@ -1056,6 +1073,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -1080,6 +1098,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
this.cookieDecoder = config.cookieDecoder;
this.cookieEncoder = config.cookieEncoder;
this.decoder = config.decoder;
this.enableGracefulShutdown = config.channelGroup() != null;
this.formDecoderProvider = config.formDecoderProvider;
this.forwardedHeaderHandler = config.forwardedHeaderHandler;
this.http2Settings = config.http2Settings();
Expand Down Expand Up @@ -1147,6 +1166,7 @@ else if ((protocols & h2) == h2) {
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand All @@ -1171,6 +1191,7 @@ else if ((protocols & h2) == h2) {
cookieDecoder,
cookieEncoder,
decoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down Expand Up @@ -1212,6 +1233,7 @@ else if ((protocols & h2c) == h2c) {
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down

0 comments on commit 95a8f47

Please sign in to comment.