Skip to content

Commit

Permalink
When in graceful shutdown, send GO_AWAY to notify the client (#2758)
Browse files Browse the repository at this point in the history
Configure Http2FrameCodec with indefinite graceful shutdown timeout
as the scheduling happens with ServerTransport#disposeNow

https://datatracker.ietf.org/doc/html/rfc9113#GOAWAY

"The GOAWAY frame (type=0x07) is used to initiate shutdown of a connection...
GOAWAY allows an endpoint to gracefully stop accepting new streams while
still finishing processing of previously established streams."

"Once the GOAWAY is sent, the sender will ignore frames sent on streams initiated
by the receiver if the stream has an identifier higher than the included last stream identifier.
Receivers of a GOAWAY frame MUST NOT open additional streams on the connection..."

"A GOAWAY frame might not immediately precede closing of the connection"

"Activity on streams numbered lower than or equal to the last stream identifier might still complete successfully.
The sender of a GOAWAY frame might gracefully shut down a connection by sending a GOAWAY frame,
maintaining the connection in an "open" state until all in-progress streams complete."

Fix #2735
  • Loading branch information
violetagg committed Apr 5, 2023
1 parent a9fe467 commit c1b9631
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableServer;
import reactor.netty.FutureMono;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.util.MapUtils;
Expand Down Expand Up @@ -530,12 +531,15 @@ public final void dispose() {
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void disposeNow(Duration timeout) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
if (isDisposed()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server has been disposed"));
}
return;
}
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
dispose();
Mono<Void> terminateSignals = Mono.empty();
if (config.channelGroup != null && config.channelGroup.size() > 0) {
Expand All @@ -544,10 +548,21 @@ public void disposeNow(Duration timeout) {
// Wait for the running requests to finish
for (Channel channel : config.channelGroup) {
Channel parent = channel.parent();
// For TCP and HTTP/1.1 the channel parent is the ServerChannel
boolean isParentServerChannel = parent instanceof ServerChannel;
List<Mono<Void>> monos =
MapUtils.computeIfAbsent(channelsToMono,
parent instanceof ServerChannel ? channel : parent,
key -> new ArrayList<>());
isParentServerChannel ? channel : parent,
key -> {
List<Mono<Void>> list = new ArrayList<>();
if (!isParentServerChannel) {
// In case of HTTP/2 Reactor Netty will send GO_AWAY with lastStreamId to notify the
// client to stop opening streams, the actual CLOSE will happen when all
// streams up to lastStreamId are closed
list.add(FutureMono.from(key.close()));
}
return list;
});
ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
if (ops != null) {
monos.add(ops.onTerminate().doFinally(sig -> ops.dispose()));
Expand All @@ -558,16 +573,12 @@ public void disposeNow(Duration timeout) {
List<Mono<Void>> monos = entry.getValue();
if (monos.isEmpty()) {
// At this point there are no running requests for this channel
// This can happen for TCP and HTTP/1.1
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
terminateSignals =
Mono.when(monos)
// At this point there are no running requests for this channel
// "FutureReturnValueIgnored" this is deliberate
.doFinally(sig -> channel.close())
.and(terminateSignals);
terminateSignals = Mono.when(monos).and(terminateSignals);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ static void configureH2Pipeline(ChannelPipeline p,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -522,6 +523,12 @@ static void configureH2Pipeline(ChannelPipeline p,
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.server.h2"));
Expand Down Expand Up @@ -552,6 +559,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
HttpRequestDecoderSpec decoder,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -570,7 +578,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
decoder.allowDuplicateContentLengths());

Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate,
cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, formDecoderProvider,
cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider,
forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder,
minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());

Expand Down Expand Up @@ -880,6 +888,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean debug,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -903,6 +912,12 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (debug) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(
LogLevel.DEBUG,
Expand Down Expand Up @@ -949,6 +964,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -971,6 +987,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
this.cookieDecoder = initializer.cookieDecoder;
this.cookieEncoder = initializer.cookieEncoder;
this.decoder = initializer.decoder;
this.enableGracefulShutdown = initializer.enableGracefulShutdown;
this.formDecoderProvider = initializer.formDecoderProvider;
this.forwardedHeaderHandler = initializer.forwardedHeaderHandler;
this.http2Settings = initializer.http2Settings;
Expand All @@ -995,7 +1012,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {

if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder,
formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout,
enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout,
listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());
return;
}
Expand Down Expand Up @@ -1024,6 +1041,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -1048,6 +1066,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
this.cookieDecoder = config.cookieDecoder;
this.cookieEncoder = config.cookieEncoder;
this.decoder = config.decoder;
this.enableGracefulShutdown = config.channelGroup() != null;
this.formDecoderProvider = config.formDecoderProvider;
this.forwardedHeaderHandler = config.forwardedHeaderHandler;
this.http2Settings = config.http2Settings();
Expand Down Expand Up @@ -1115,6 +1134,7 @@ else if ((protocols & h2) == h2) {
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand All @@ -1139,6 +1159,7 @@ else if ((protocols & h2) == h2) {
cookieDecoder,
cookieEncoder,
decoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down Expand Up @@ -1180,6 +1201,7 @@ else if ((protocols & h2c) == h2c) {
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down

0 comments on commit c1b9631

Please sign in to comment.