Skip to content

Commit

Permalink
Merge #2758 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 5, 2023
2 parents 3aab101 + 95a8f47 commit a600768
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import reactor.netty5.Connection;
import reactor.netty5.ConnectionObserver;
import reactor.netty5.DisposableServer;
import reactor.netty5.FutureMono;
import reactor.netty5.channel.AbortedException;
import reactor.netty5.channel.ChannelOperations;
import reactor.netty5.internal.util.MapUtils;
Expand Down Expand Up @@ -546,12 +547,15 @@ public final void dispose() {

@Override
public void disposeNow(Duration timeout) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
if (isDisposed()) {
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server has been disposed"));
}
return;
}
if (log.isDebugEnabled()) {
log.debug(format(channel(), "Server is about to be disposed with timeout: {}"), timeout);
}
dispose();
Mono<Void> terminateSignals = Mono.empty();
if (config.channelGroup != null && config.channelGroup.size() > 0) {
Expand All @@ -560,10 +564,21 @@ public void disposeNow(Duration timeout) {
// Wait for the running requests to finish
for (Channel channel : config.channelGroup) {
Channel parent = channel.parent();
// For TCP and HTTP/1.1 the channel parent is the ServerChannel
boolean isParentServerChannel = parent instanceof ServerChannel;
List<Mono<Void>> monos =
MapUtils.computeIfAbsent(channelsToMono,
parent instanceof ServerChannel ? channel : parent,
key -> new ArrayList<>());
isParentServerChannel ? channel : parent,
key -> {
List<Mono<Void>> list = new ArrayList<>();
if (!isParentServerChannel) {
// In case of HTTP/2 Reactor Netty will send GO_AWAY with lastStreamId to notify the
// client to stop opening streams, the actual CLOSE will happen when all
// streams up to lastStreamId are closed
list.add(FutureMono.from(key.close()));
}
return list;
});
ChannelOperations<?, ?> ops = ChannelOperations.get(channel);
if (ops != null) {
monos.add(ops.onTerminate().doFinally(sig -> ops.dispose()));
Expand All @@ -574,16 +589,12 @@ public void disposeNow(Duration timeout) {
List<Mono<Void>> monos = entry.getValue();
if (monos.isEmpty()) {
// At this point there are no running requests for this channel
// This can happen for TCP and HTTP/1.1
// "FutureReturnValueIgnored" this is deliberate
channel.close();
}
else {
terminateSignals =
Mono.when(monos)
// At this point there are no running requests for this channel
// "FutureReturnValueIgnored" this is deliberate
.doFinally(sig -> channel.close())
.and(terminateSignals);
terminateSignals = Mono.when(monos).and(terminateSignals);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ static void configureH2Pipeline(ChannelPipeline p,
boolean accessLogEnabled,
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -505,6 +506,12 @@ static void configureH2Pipeline(ChannelPipeline p,
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (p.get(NettyPipeline.LoggingHandler) != null) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(LogLevel.DEBUG,
"reactor.netty5.http.server.h2"));
Expand Down Expand Up @@ -533,6 +540,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
HttpRequestDecoderSpec decoder,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -551,7 +559,7 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p,
decoder.allowDuplicateContentLengths());

Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate,
p.get(NettyPipeline.LoggingHandler) != null, formDecoderProvider,
p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider,
forwardedHeaderHandler, http2Settings, httpMessageLogFactory, listener, mapHandle, metricsRecorder,
minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());

Expand Down Expand Up @@ -877,6 +885,7 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
boolean debug,
boolean enableGracefulShutdown,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Http2Settings http2Settings,
Expand All @@ -898,6 +907,12 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer<Channel>
.validateHeaders(validate)
.initialSettings(http2Settings);

if (enableGracefulShutdown) {
// Configure the graceful shutdown with indefinite timeout as Reactor Netty controls the timeout
// when disposeNow(timeout) is invoked
http2FrameCodecBuilder.gracefulShutdownTimeoutMillis(-1);
}

if (debug) {
http2FrameCodecBuilder.frameLogger(new Http2FrameLogger(
LogLevel.DEBUG,
Expand Down Expand Up @@ -941,6 +956,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
final Function<AccessLogArgProvider, AccessLog> accessLog;
final BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -961,6 +977,7 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler
this.accessLog = initializer.accessLog;
this.compressPredicate = compressPredicate(initializer.compressPredicate, initializer.minCompressionSize);
this.decoder = initializer.decoder;
this.enableGracefulShutdown = initializer.enableGracefulShutdown;
this.formDecoderProvider = initializer.formDecoderProvider;
this.forwardedHeaderHandler = initializer.forwardedHeaderHandler;
this.http2Settings = initializer.http2Settings;
Expand All @@ -985,7 +1002,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {

if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate,
formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, listener, mapHandle,
enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2Settings, httpMessageLogFactory, idleTimeout, listener, mapHandle,
metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders());
return;
}
Expand All @@ -1012,6 +1029,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
final Function<AccessLogArgProvider, AccessLog> accessLog;
final BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate;
final HttpRequestDecoderSpec decoder;
final boolean enableGracefulShutdown;
final HttpServerFormDecoderProvider formDecoderProvider;
final BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler;
final Http2Settings http2Settings;
Expand All @@ -1034,6 +1052,7 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig
this.accessLog = config.accessLog;
this.compressPredicate = config.compressPredicate;
this.decoder = config.decoder;
this.enableGracefulShutdown = config.channelGroup() != null;
this.formDecoderProvider = config.formDecoderProvider;
this.forwardedHeaderHandler = config.forwardedHeaderHandler;
this.http2Settings = config.http2Settings();
Expand Down Expand Up @@ -1095,6 +1114,7 @@ else if ((protocols & h2) == h2) {
accessLogEnabled,
accessLog,
compressPredicate(compressPredicate, minCompressionSize),
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand All @@ -1117,6 +1137,7 @@ else if ((protocols & h2) == h2) {
accessLog,
compressPredicate(compressPredicate, minCompressionSize),
decoder,
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down Expand Up @@ -1154,6 +1175,7 @@ else if ((protocols & h2c) == h2c) {
accessLogEnabled,
accessLog,
compressPredicate(compressPredicate, minCompressionSize),
enableGracefulShutdown,
formDecoderProvider,
forwardedHeaderHandler,
http2Settings,
Expand Down

0 comments on commit a600768

Please sign in to comment.