diff --git a/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java b/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java index f52e5b3cac..19d05c11f5 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java +++ b/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java @@ -50,6 +50,7 @@ import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; +import reactor.netty.FutureMono; import reactor.netty.channel.AbortedException; import reactor.netty.channel.ChannelOperations; import reactor.netty.internal.util.MapUtils; @@ -530,12 +531,15 @@ public final void dispose() { @Override @SuppressWarnings("FutureReturnValueIgnored") public void disposeNow(Duration timeout) { - if (log.isDebugEnabled()) { - log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout); - } if (isDisposed()) { + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Server has been disposed")); + } return; } + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout); + } dispose(); Mono terminateSignals = Mono.empty(); if (config.channelGroup != null && config.channelGroup.size() > 0) { @@ -544,10 +548,21 @@ public void disposeNow(Duration timeout) { // Wait for the running requests to finish for (Channel channel : config.channelGroup) { Channel parent = channel.parent(); + // For TCP and HTTP/1.1 the channel parent is the ServerChannel + boolean isParentServerChannel = parent instanceof ServerChannel; List> monos = MapUtils.computeIfAbsent(channelsToMono, - parent instanceof ServerChannel ? channel : parent, - key -> new ArrayList<>()); + isParentServerChannel ? channel : parent, + key -> { + List> list = new ArrayList<>(); + if (!isParentServerChannel) { + // In case of HTTP/2 Reactor Netty will send GO_AWAY with lastStreamId to notify the + // client to stop opening streams, the actual CLOSE will happen when all + // streams up to lastStreamId are closed + list.add(FutureMono.from(key.close())); + } + return list; + }); ChannelOperations ops = ChannelOperations.get(channel); if (ops != null) { monos.add(ops.onTerminate().doFinally(sig -> ops.dispose())); @@ -558,16 +573,12 @@ public void disposeNow(Duration timeout) { List> monos = entry.getValue(); if (monos.isEmpty()) { // At this point there are no running requests for this channel + // This can happen for TCP and HTTP/1.1 // "FutureReturnValueIgnored" this is deliberate channel.close(); } else { - terminateSignals = - Mono.when(monos) - // At this point there are no running requests for this channel - // "FutureReturnValueIgnored" this is deliberate - .doFinally(sig -> channel.close()) - .and(terminateSignals); + terminateSignals = Mono.when(monos).and(terminateSignals); } } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 62b93f5864..03407468c6 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -503,6 +503,7 @@ static void configureH2Pipeline(ChannelPipeline p, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, + boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, Http2Settings http2Settings, @@ -522,6 +523,12 @@ static void configureH2Pipeline(ChannelPipeline p, .validateHeaders(validate) .initialSettings(http2Settings); + if (enableGracefulShutdown) { + // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + // when disposeNow(timeout) is invoked + http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); + } + if (p.get(NettyPipeline.LoggingHandler) != null) { http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG, "reactor.netty.http.server.h2")); @@ -552,6 +559,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, HttpRequestDecoderSpec decoder, + boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, Http2Settings http2Settings, @@ -570,7 +578,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, decoder.allowDuplicateContentLengths()); Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate, - cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, formDecoderProvider, + cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); @@ -880,6 +888,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, boolean debug, + boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, Http2Settings http2Settings, @@ -903,6 +912,12 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer .validateHeaders(validate) .initialSettings(http2Settings); + if (enableGracefulShutdown) { + // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + // when disposeNow(timeout) is invoked + http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); + } + if (debug) { http2FrameCodecBuilder.frameLogger(new Http2FrameLogger( LogLevel.DEBUG, @@ -949,6 +964,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpRequestDecoderSpec decoder; + final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final Http2Settings http2Settings; @@ -971,6 +987,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler this.cookieDecoder = initializer.cookieDecoder; this.cookieEncoder = initializer.cookieEncoder; this.decoder = initializer.decoder; + this.enableGracefulShutdown = initializer.enableGracefulShutdown; this.formDecoderProvider = initializer.formDecoderProvider; this.forwardedHeaderHandler = initializer.forwardedHeaderHandler; this.http2Settings = initializer.http2Settings; @@ -995,7 +1012,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, - formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); return; } @@ -1024,6 +1041,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpRequestDecoderSpec decoder; + final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final Http2Settings http2Settings; @@ -1048,6 +1066,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig this.cookieDecoder = config.cookieDecoder; this.cookieEncoder = config.cookieEncoder; this.decoder = config.decoder; + this.enableGracefulShutdown = config.channelGroup() != null; this.formDecoderProvider = config.formDecoderProvider; this.forwardedHeaderHandler = config.forwardedHeaderHandler; this.http2Settings = config.http2Settings(); @@ -1115,6 +1134,7 @@ else if ((protocols & h2) == h2) { compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, @@ -1139,6 +1159,7 @@ else if ((protocols & h2) == h2) { cookieDecoder, cookieEncoder, decoder, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, @@ -1180,6 +1201,7 @@ else if ((protocols & h2c) == h2c) { compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings,