From c1b9631300ccb79dd9a8670d16aa52e2bb542f42 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Wed, 5 Apr 2023 09:38:34 +0300 Subject: [PATCH] When in graceful shutdown, send GO_AWAY to notify the client (#2758) Configure Http2FrameCodec with indefinite graceful shutdown timeout as the scheduling happens with ServerTransport#disposeNow https://datatracker.ietf.org/doc/html/rfc9113#GOAWAY "The GOAWAY frame (type=0x07) is used to initiate shutdown of a connection... GOAWAY allows an endpoint to gracefully stop accepting new streams while still finishing processing of previously established streams." "Once the GOAWAY is sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection..." "A GOAWAY frame might not immediately precede closing of the connection" "Activity on streams numbered lower than or equal to the last stream identifier might still complete successfully. The sender of a GOAWAY frame might gracefully shut down a connection by sending a GOAWAY frame, maintaining the connection in an "open" state until all in-progress streams complete." Fix #2735 --- .../netty/transport/ServerTransport.java | 33 ++++++++++++------- .../netty/http/server/HttpServerConfig.java | 26 +++++++++++++-- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java b/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java index f52e5b3cac..19d05c11f5 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java +++ b/reactor-netty-core/src/main/java/reactor/netty/transport/ServerTransport.java @@ -50,6 +50,7 @@ import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.DisposableServer; +import reactor.netty.FutureMono; import reactor.netty.channel.AbortedException; import reactor.netty.channel.ChannelOperations; import reactor.netty.internal.util.MapUtils; @@ -530,12 +531,15 @@ public final void dispose() { @Override @SuppressWarnings("FutureReturnValueIgnored") public void disposeNow(Duration timeout) { - if (log.isDebugEnabled()) { - log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout); - } if (isDisposed()) { + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Server has been disposed")); + } return; } + if (log.isDebugEnabled()) { + log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout); + } dispose(); Mono terminateSignals = Mono.empty(); if (config.channelGroup != null && config.channelGroup.size() > 0) { @@ -544,10 +548,21 @@ public void disposeNow(Duration timeout) { // Wait for the running requests to finish for (Channel channel : config.channelGroup) { Channel parent = channel.parent(); + // For TCP and HTTP/1.1 the channel parent is the ServerChannel + boolean isParentServerChannel = parent instanceof ServerChannel; List> monos = MapUtils.computeIfAbsent(channelsToMono, - parent instanceof ServerChannel ? channel : parent, - key -> new ArrayList<>()); + isParentServerChannel ? channel : parent, + key -> { + List> list = new ArrayList<>(); + if (!isParentServerChannel) { + // In case of HTTP/2 Reactor Netty will send GO_AWAY with lastStreamId to notify the + // client to stop opening streams, the actual CLOSE will happen when all + // streams up to lastStreamId are closed + list.add(FutureMono.from(key.close())); + } + return list; + }); ChannelOperations ops = ChannelOperations.get(channel); if (ops != null) { monos.add(ops.onTerminate().doFinally(sig -> ops.dispose())); @@ -558,16 +573,12 @@ public void disposeNow(Duration timeout) { List> monos = entry.getValue(); if (monos.isEmpty()) { // At this point there are no running requests for this channel + // This can happen for TCP and HTTP/1.1 // "FutureReturnValueIgnored" this is deliberate channel.close(); } else { - terminateSignals = - Mono.when(monos) - // At this point there are no running requests for this channel - // "FutureReturnValueIgnored" this is deliberate - .doFinally(sig -> channel.close()) - .and(terminateSignals); + terminateSignals = Mono.when(monos).and(terminateSignals); } } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 62b93f5864..03407468c6 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -503,6 +503,7 @@ static void configureH2Pipeline(ChannelPipeline p, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, + boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, Http2Settings http2Settings, @@ -522,6 +523,12 @@ static void configureH2Pipeline(ChannelPipeline p, .validateHeaders(validate) .initialSettings(http2Settings); + if (enableGracefulShutdown) { + // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + // when disposeNow(timeout) is invoked + http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); + } + if (p.get(NettyPipeline.LoggingHandler) != null) { http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG, "reactor.netty.http.server.h2")); @@ -552,6 +559,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, HttpRequestDecoderSpec decoder, + boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, Http2Settings http2Settings, @@ -570,7 +578,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, decoder.allowDuplicateContentLengths()); Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate, - cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, formDecoderProvider, + cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); @@ -880,6 +888,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, boolean debug, + boolean enableGracefulShutdown, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, Http2Settings http2Settings, @@ -903,6 +912,12 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer .validateHeaders(validate) .initialSettings(http2Settings); + if (enableGracefulShutdown) { + // Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout + // when disposeNow(timeout) is invoked + http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1); + } + if (debug) { http2FrameCodecBuilder.frameLogger(new Http2FrameLogger( LogLevel.DEBUG, @@ -949,6 +964,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpRequestDecoderSpec decoder; + final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final Http2Settings http2Settings; @@ -971,6 +987,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler this.cookieDecoder = initializer.cookieDecoder; this.cookieEncoder = initializer.cookieEncoder; this.decoder = initializer.decoder; + this.enableGracefulShutdown = initializer.enableGracefulShutdown; this.formDecoderProvider = initializer.formDecoderProvider; this.forwardedHeaderHandler = initializer.forwardedHeaderHandler; this.http2Settings = initializer.http2Settings; @@ -995,7 +1012,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, - formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); return; } @@ -1024,6 +1041,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig final ServerCookieDecoder cookieDecoder; final ServerCookieEncoder cookieEncoder; final HttpRequestDecoderSpec decoder; + final boolean enableGracefulShutdown; final HttpServerFormDecoderProvider formDecoderProvider; final BiFunction forwardedHeaderHandler; final Http2Settings http2Settings; @@ -1048,6 +1066,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig this.cookieDecoder = config.cookieDecoder; this.cookieEncoder = config.cookieEncoder; this.decoder = config.decoder; + this.enableGracefulShutdown = config.channelGroup() != null; this.formDecoderProvider = config.formDecoderProvider; this.forwardedHeaderHandler = config.forwardedHeaderHandler; this.http2Settings = config.http2Settings(); @@ -1115,6 +1134,7 @@ else if ((protocols & h2) == h2) { compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, @@ -1139,6 +1159,7 @@ else if ((protocols & h2) == h2) { cookieDecoder, cookieEncoder, decoder, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, @@ -1180,6 +1201,7 @@ else if ((protocols & h2c) == h2c) { compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, + enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings,