Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When in graceful shutdown, send GO_AWAY to notify the client to stop opening streams #2758

Merged
merged 1 commit into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.DisposableServer;
import reactor.netty.FutureMono;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.internal.util.MapUtils;
Expand Down Expand Up @@ -530,12 +531,15 @@ public final void dispose() {
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void disposeNow(Duration timeout) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
if (isDisposed()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server has been disposed"));
}
return;
}
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
dispose();
Mono<Void> terminateSignals = Mono.empty();
if (config.channelGroup != null && config.channelGroup.size() > 0) {
Expand All @@ -544,10 +548,21 @@ public void disposeNow(Duration timeout) {
// Wait for the running requests to finish
for (Channel channel : config.channelGroup) {
Channel parent = channel.parent();
// For TCP and HTTP/1.1 the channel parent is the ServerChannel
boolean isParentServerChannel = parent instanceof ServerChannel;
List<Mono<Void>> monos =
MapUtils.computeIfAbsent(channelsToMono,
parent instanceof ServerChannel ? channel : parent,
key -> new ArrayList<>());
isParentServerChannel ? channel : parent,
key -> {
List<Mono<Void>> list = new ArrayList<>();
if (!isParentServerChannel) {
// In case of HTTP/2 Reactor Netty will send GO_AWAY with lastStreamId to notify the
// client to stop opening streams, the actual CLOSE will happen when all
// streams up to lastStreamId are closed
list.add(FutureMono.from(key.close()));
}
return list;
});
ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
if (ops != null) {
monos.add(ops.onTerminate().doFinally(sig -> ops.dispose()));
Expand All @@ -558,16 +573,12 @@ public void disposeNow(Duration timeout) {
List<Mono<Void>> monos = entry.getValue();
if (monos.isEmpty()) {
// At this point there are no running requests for this channel
// This can happen for TCP and HTTP/1.1
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
terminateSignals =
Mono.when(monos)
// At this point there are no running requests for this channel
// "FutureReturnValueIgnored" this is deliberate
.doFinally(sig -> channel.close())
.and(terminateSignals);
terminateSignals = Mono.when(monos).and(terminateSignals);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ static void configureH2Pipeline(ChannelPipeline p,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -522,6 +523,12 @@ static void configureH2Pipeline(ChannelPipeline p,
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty.http.server.h2"));
Expand Down Expand Up @@ -552,6 +559,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
HttpRequestDecoderSpec decoder,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -570,7 +578,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
decoder.allowDuplicateContentLengths());

Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate,
cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, formDecoderProvider,
cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider,
forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder,
minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());

Expand Down Expand Up @@ -880,6 +888,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
boolean debug,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -903,6 +912,12 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (debug) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(
LogLevel.DEBUG,
Expand Down Expand Up @@ -949,6 +964,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -971,6 +987,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
this.cookieDecoder = initializer.cookieDecoder;
this.cookieEncoder = initializer.cookieEncoder;
this.decoder = initializer.decoder;
this.enableGracefulShutdown = initializer.enableGracefulShutdown;
this.formDecoderProvider = initializer.formDecoderProvider;
this.forwardedHeaderHandler = initializer.forwardedHeaderHandler;
this.http2Settings = initializer.http2Settings;
Expand All @@ -995,7 +1012,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {

if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder,
formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout,
enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout,
listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());
return;
}
Expand Down Expand Up @@ -1024,6 +1041,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -1048,6 +1066,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
this.cookieDecoder = config.cookieDecoder;
this.cookieEncoder = config.cookieEncoder;
this.decoder = config.decoder;
this.enableGracefulShutdown = config.channelGroup() != null;
this.formDecoderProvider = config.formDecoderProvider;
this.forwardedHeaderHandler = config.forwardedHeaderHandler;
this.http2Settings = config.http2Settings();
Expand Down Expand Up @@ -1115,6 +1134,7 @@ else if ((protocols & h2) == h2) {
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand All @@ -1139,6 +1159,7 @@ else if ((protocols & h2) == h2) {
cookieDecoder,
cookieEncoder,
decoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down Expand Up @@ -1180,6 +1201,7 @@ else if ((protocols & h2c) == h2c) {
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down