diff --git a/docs/asciidoc/http-server.adoc b/docs/asciidoc/http-server.adoc index 81ac06b826..c202017d5a 100644 --- a/docs/asciidoc/http-server.adoc +++ b/docs/asciidoc/http-server.adoc @@ -685,9 +685,31 @@ This section describes various timeout configuration options that can be used in Configuring a proper timeout may improve or solve issues in the communication process. The configuration options can be grouped as follows: +* <> * <> * <> +[[http-server-request-timeout]] +=== Request Timeout +The following listing shows all available request timeout configuration options. + +* `readTimeout` - the maximum time between each network-level read operation while reading a given request content (resolution: ms) +* `requestTimeout` - the maximum time for reading a given request content (resolution: ms). + +NOTE: It is always a good practice to configure a read/request timeout. + +To customize the default settings, you can configure `HttpServer` as follows: + +==== +[source,java,indent=0] +.{examplesdir}/read/timeout/Application.java +---- +include::{examplesdir}/read/timeout/Application.java[lines=18..36] +---- +<1> Configures the read timeout to 5 second. +<2> Configures the request timeout to 30 second. +==== + [[http-server-connection-timeout]] === Connection Timeout The following listing shows all available connection timeout configuration options. diff --git a/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java b/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java index f7967433be..523349b51b 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java +++ b/reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -121,6 +121,7 @@ public interface NettyPipeline { String ProxyLoggingHandler = LEFT + "proxyLoggingHandler"; String ProxyProtocolDecoder = LEFT + "proxyProtocolDecoder"; String ProxyProtocolReader = LEFT + "proxyProtocolReader"; + String ReadTimeoutHandler = LEFT + "readTimeoutHandler"; String ResponseTimeoutHandler = LEFT + "responseTimeoutHandler"; String SslHandler = LEFT + "sslHandler"; String SslLoggingHandler = LEFT + "sslLoggingHandler"; diff --git a/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/read/timeout/Application.java b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/read/timeout/Application.java new file mode 100644 index 0000000000..e567c53c30 --- /dev/null +++ b/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/read/timeout/Application.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.examples.documentation.http.server.read.timeout; + +import reactor.netty.DisposableServer; +import reactor.netty.http.server.HttpServer; + +import java.time.Duration; + +public class Application { + + public static void main(String[] args) { + DisposableServer server = + HttpServer.create() + .readTimeout(Duration.ofSeconds(5)) //<1> + .requestTimeout(Duration.ofSeconds(30)) //<2> + .handle((request, response) -> request.receive().then()) + .bindNow(); + + server.onDispose() + .block(); + } +} \ No newline at end of file diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java index d58a39f485..e1342422dc 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java @@ -16,6 +16,7 @@ package reactor.netty.http.server; import java.net.SocketAddress; +import java.time.Duration; import java.time.ZonedDateTime; import java.util.Optional; import java.util.function.BiFunction; @@ -65,6 +66,8 @@ final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler implemen final ConnectionObserver listener; final BiFunction, ? super Connection, ? extends Mono> mapHandle; + final Duration readTimeout; + final Duration requestTimeout; SocketAddress remoteAddress; @@ -83,7 +86,9 @@ final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler implemen @Nullable BiFunction forwardedHeaderHandler, HttpMessageLogFactory httpMessageLogFactory, ConnectionObserver listener, - @Nullable BiFunction, ? super Connection, ? extends Mono> mapHandle) { + @Nullable BiFunction, ? super Connection, ? extends Mono> mapHandle, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout) { this.compress = compress; this.cookieDecoder = decoder; this.cookieEncoder = encoder; @@ -92,6 +97,8 @@ final class Http2StreamBridgeServerHandler extends ChannelDuplexHandler implemen this.httpMessageLogFactory = httpMessageLogFactory; this.listener = listener; this.mapHandle = mapHandle; + this.readTimeout = readTimeout; + this.requestTimeout = requestTimeout; } @Override @@ -135,6 +142,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { httpMessageLogFactory, true, mapHandle, + readTimeout, + requestTimeout, secured, timestamp); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java index fcbc346b0d..a6fd2a2328 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ import reactor.netty.transport.ServerTransport; import reactor.util.Logger; import reactor.util.Loggers; +import reactor.util.annotation.Nullable; import reactor.util.context.Context; import static reactor.netty.ReactorNetty.format; @@ -739,6 +740,51 @@ public final HttpServer proxyProtocol(ProxyProtocolSupportType proxyProtocolSupp return dup; } + /** + * Specifies the maximum duration allowed between each network-level read operation while reading a given request + * content (resolution: ms). In other words, {@link io.netty.handler.timeout.ReadTimeoutHandler} is added to the + * channel pipeline after all the request headers are received, and removed from the channel pipeline after the + * content is fully received. + * If the {@code readTimeout} is {@code null}, any previous setting will be removed and no + * {@code readTimeout} will be applied. + * If the {@code readTimeout} is less than {@code 1ms}, then {@code 1ms} will be the + * {@code readTimeout}. + * + * @param readTimeout the maximum duration allowed between each network-level read operation while reading a given + * request content (resolution: ms) + * @return a new {@link HttpServer} + * @since 1.1.9 + * @see io.netty.handler.timeout.ReadTimeoutHandler + */ + public final HttpServer readTimeout(@Nullable Duration readTimeout) { + if (Objects.equals(readTimeout, configuration().readTimeout)) { + return this; + } + HttpServer dup = duplicate(); + dup.configuration().readTimeout = readTimeout; + return dup; + } + + /** + * Specifies the maximum duration for reading a given request content (resolution: ms). + * If the {@code requestTimeout} is {@code null}, any previous setting will be removed and no + * {@code requestTimeout} will be applied. + * If the {@code requestTimeout} is less than {@code 1ms}, then {@code 1ms} will be the + * {@code requestTimeout}. + * + * @param requestTimeout the maximum duration for reading a given request content (resolution: ms) + * @return a new {@link HttpServer} + * @since 1.1.9 + */ + public final HttpServer requestTimeout(@Nullable Duration requestTimeout) { + if (Objects.equals(requestTimeout, configuration().requestTimeout)) { + return this; + } + HttpServer dup = duplicate(); + dup.configuration().requestTimeout = requestTimeout; + return dup; + } + /** * Define routes for the server through the provided {@link HttpServerRoutes} builder. * diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 27d97c37f5..96c5c9dabe 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpServerUpgradeHandler; @@ -45,6 +46,7 @@ import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.AsciiString; import reactor.core.publisher.Mono; import reactor.netty.ChannelPipelineConfigurer; @@ -74,7 +76,10 @@ import java.net.SocketAddress; import java.nio.charset.Charset; import java.time.Duration; +import java.util.List; import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Function; @@ -222,6 +227,17 @@ public ProxyProtocolSupportType proxyProtocolSupportType() { return proxyProtocolSupportType; } + /** + * Return the configured read timeout for the request or null. + * + * @return the configured read timeout for the request or null + * @since 1.1.9 + */ + @Nullable + public Duration readTimeout() { + return readTimeout; + } + /** * Returns true if that {@link HttpServer} will redirect HTTP to HTTPS by changing * the scheme only but otherwise leaving the port the same when SSL is enabled. @@ -235,6 +251,17 @@ public boolean redirectHttpToHttps() { return redirectHttpToHttps; } + /** + * Return the configured request timeout for the request or null. + * + * @return the configured request timeout for the request or null + * @since 1.1.9 + */ + @Nullable + public Duration requestTimeout() { + return requestTimeout; + } + /** * Returns the current {@link SslProvider} if that {@link HttpServer} secured via SSL * transport or null. @@ -280,7 +307,9 @@ public Function uriTagValue() { HttpProtocol[] protocols; int _protocols; ProxyProtocolSupportType proxyProtocolSupportType; + Duration readTimeout; boolean redirectHttpToHttps; + Duration requestTimeout; SslProvider sslProvider; Function uriTagValue; @@ -318,7 +347,9 @@ public Function uriTagValue() { this.protocols = parent.protocols; this._protocols = parent._protocols; this.proxyProtocolSupportType = parent.proxyProtocolSupportType; + this.readTimeout = parent.readTimeout; this.redirectHttpToHttps = parent.redirectHttpToHttps; + this.requestTimeout = parent.requestTimeout; this.sslProvider = parent.sslProvider; this.uriTagValue = parent.uriTagValue; } @@ -421,6 +452,8 @@ static void addStreamHandlers(Channel ch, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, ChannelOperations.OnSetup opsFactory, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, @Nullable Function uriTagValue) { ChannelPipeline pipeline = ch.pipeline(); if (accessLogEnabled) { @@ -429,7 +462,8 @@ static void addStreamHandlers(Channel ch, pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) .addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeServerHandler(compressPredicate, decoder, encoder, formDecoderProvider, - forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle)); + forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, + readTimeout, requestTimeout)); boolean alwaysCompress = compressPredicate == null && minCompressionSize == 0; @@ -532,6 +566,8 @@ static void configureH2Pipeline(ChannelPipeline p, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, ChannelOperations.OnSetup opsFactory, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, @Nullable Function uriTagValue, boolean validate) { p.remove(NettyPipeline.ReactiveBridge); @@ -565,7 +601,7 @@ static void configureH2Pipeline(ChannelPipeline p, .addLast(NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(new H2Codec(accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, - mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue))); + mapHandle, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue))); IdleTimeoutHandler.addIdleTimeoutHandler(p, idleTimeout); @@ -599,6 +635,8 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, ChannelOperations.OnSetup opsFactory, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, @Nullable Function uriTagValue) { HttpServerCodec httpServerCodec = new HttpServerCodec(decoder.maxInitialLineLength(), decoder.maxHeaderSize(), @@ -608,20 +646,24 @@ static void configureHttp11OrH2CleartextPipeline(ChannelPipeline p, Http11OrH2CleartextCodec upgrader = new Http11OrH2CleartextCodec(accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, p.get(NettyPipeline.LoggingHandler) != null, enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, listener, mapHandle, metricsRecorder, - minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); + minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue, decoder.validateHeaders()); ChannelHandler http2ServerHandler = new H2CleartextCodec(upgrader, http2SettingsSpec != null ? http2SettingsSpec.maxStreams() : null); + + HttpServerUpgradeHandler httpServerUpgradeHandler = readTimeout == null && requestTimeout == null ? + new HttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength()) : + new ReactorNettyHttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength(), readTimeout, requestTimeout); + CleartextHttp2ServerUpgradeHandler h2cUpgradeHandler = new CleartextHttp2ServerUpgradeHandler( - httpServerCodec, - new HttpServerUpgradeHandler(httpServerCodec, upgrader, decoder.h2cMaxContentLength()), - http2ServerHandler); + httpServerCodec, httpServerUpgradeHandler, http2ServerHandler); p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, h2cUpgradeHandler) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, - forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests)); + forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, + readTimeout, requestTimeout)); if (accessLogEnabled) { p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H1.create(accessLog)); @@ -672,6 +714,8 @@ static void configureHttp11Pipeline(ChannelPipeline p, int maxKeepAliveRequests, @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, @Nullable Function uriTagValue) { p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpCodec, @@ -681,7 +725,8 @@ static void configureHttp11Pipeline(ChannelPipeline p, .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, - forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests)); + forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, + readTimeout, requestTimeout)); if (accessLogEnabled) { p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H1.create(accessLog)); @@ -868,6 +913,8 @@ static final class H2Codec extends ChannelInitializer { final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; final ChannelOperations.OnSetup opsFactory; + final Duration readTimeout; + final Duration requestTimeout; final Function uriTagValue; H2Codec( @@ -884,6 +931,8 @@ static final class H2Codec extends ChannelInitializer { @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, ChannelOperations.OnSetup opsFactory, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, @Nullable Function uriTagValue) { this.accessLogEnabled = accessLogEnabled; this.accessLog = accessLog; @@ -898,6 +947,8 @@ static final class H2Codec extends ChannelInitializer { this.metricsRecorder = metricsRecorder; this.minCompressionSize = minCompressionSize; this.opsFactory = opsFactory; + this.readTimeout = readTimeout; + this.requestTimeout = requestTimeout; this.uriTagValue = uriTagValue; } @@ -906,7 +957,7 @@ protected void initChannel(Channel ch) { ch.pipeline().remove(this); addStreamHandlers(ch, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, metricsRecorder, - minCompressionSize, opsFactory, uriTagValue); + minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue); } } @@ -929,6 +980,8 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; final ChannelOperations.OnSetup opsFactory; + final Duration readTimeout; + final Duration requestTimeout; final Function uriTagValue; Http11OrH2CleartextCodec( @@ -948,6 +1001,8 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer @Nullable ChannelMetricsRecorder metricsRecorder, int minCompressionSize, ChannelOperations.OnSetup opsFactory, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, @Nullable Function uriTagValue, boolean validate) { this.accessLogEnabled = accessLogEnabled; @@ -985,6 +1040,8 @@ static final class Http11OrH2CleartextCodec extends ChannelInitializer this.metricsRecorder = metricsRecorder; this.minCompressionSize = minCompressionSize; this.opsFactory = opsFactory; + this.readTimeout = readTimeout; + this.requestTimeout = requestTimeout; this.uriTagValue = uriTagValue; } @@ -996,7 +1053,7 @@ protected void initChannel(Channel ch) { ch.pipeline().remove(this); addStreamHandlers(ch, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, metricsRecorder, - minCompressionSize, opsFactory, uriTagValue); + minCompressionSize, opsFactory, readTimeout, requestTimeout, uriTagValue); } @Override @@ -1055,6 +1112,8 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler final ChannelMetricsRecorder metricsRecorder; final int minCompressionSize; final ChannelOperations.OnSetup opsFactory; + final Duration readTimeout; + final Duration requestTimeout; final Function uriTagValue; H2OrHttp11Codec(HttpServerChannelInitializer initializer, ConnectionObserver listener) { @@ -1077,6 +1136,8 @@ static final class H2OrHttp11Codec extends ApplicationProtocolNegotiationHandler this.metricsRecorder = initializer.metricsRecorder; this.minCompressionSize = initializer.minCompressionSize; this.opsFactory = initializer.opsFactory; + this.readTimeout = initializer.readTimeout; + this.requestTimeout = initializer.requestTimeout; this.uriTagValue = initializer.uriTagValue; } @@ -1091,14 +1152,15 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { configureH2Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, enableGracefulShutdown, formDecoderProvider, forwardedHeaderHandler, http2SettingsSpec, httpMessageLogFactory, idleTimeout, - listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, uriTagValue, decoder.validateHeaders()); + listener, mapHandle, metricsRecorder, minCompressionSize, opsFactory, readTimeout, requestTimeout, + uriTagValue, decoder.validateHeaders()); return; } if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, decoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, - mapHandle, maxKeepAliveRequests, metricsRecorder, minCompressionSize, uriTagValue); + mapHandle, maxKeepAliveRequests, metricsRecorder, minCompressionSize, readTimeout, requestTimeout, uriTagValue); // When the server is configured with HTTP/1.1 and H2 and HTTP/1.1 is negotiated, // when channelActive event happens, this HttpTrafficHandler is still not in the pipeline, @@ -1135,6 +1197,8 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig final ProxyProtocolSupportType proxyProtocolSupportType; final boolean redirectHttpToHttps; final SslProvider sslProvider; + final Duration readTimeout; + final Duration requestTimeout; final Function uriTagValue; HttpServerChannelInitializer(HttpServerConfig config) { @@ -1157,7 +1221,9 @@ static final class HttpServerChannelInitializer implements ChannelPipelineConfig this.opsFactory = config.channelOperationsProvider(); this.protocols = config._protocols; this.proxyProtocolSupportType = config.proxyProtocolSupportType; + this.readTimeout = config.readTimeout; this.redirectHttpToHttps = config.redirectHttpToHttps; + this.requestTimeout = config.requestTimeout; this.sslProvider = config.sslProvider; this.uriTagValue = config.uriTagValue; } @@ -1202,6 +1268,8 @@ else if ((protocols & h11) == h11) { maxKeepAliveRequests, metricsRecorder, minCompressionSize, + readTimeout, + requestTimeout, uriTagValue); } else if ((protocols & h2) == h2) { @@ -1223,6 +1291,8 @@ else if ((protocols & h2) == h2) { metricsRecorder, minCompressionSize, opsFactory, + readTimeout, + requestTimeout, uriTagValue, decoder.validateHeaders()); } @@ -1249,6 +1319,8 @@ else if ((protocols & h2) == h2) { metricsRecorder, minCompressionSize, opsFactory, + readTimeout, + requestTimeout, uriTagValue); } else if ((protocols & h11) == h11) { @@ -1269,6 +1341,8 @@ else if ((protocols & h11) == h11) { maxKeepAliveRequests, metricsRecorder, minCompressionSize, + readTimeout, + requestTimeout, uriTagValue); } else if ((protocols & h2c) == h2c) { @@ -1290,6 +1364,8 @@ else if ((protocols & h2c) == h2c) { metricsRecorder, minCompressionSize, opsFactory, + readTimeout, + requestTimeout, uriTagValue, decoder.validateHeaders()); needRead = true; @@ -1313,4 +1389,87 @@ else if (proxyProtocolSupportType == ProxyProtocolSupportType.AUTO) { } } } + + static final class ReactorNettyHttpServerUpgradeHandler extends HttpServerUpgradeHandler { + + final Duration readTimeout; + final Duration requestTimeout; + + boolean requestAvailable; + Future requestTimeoutFuture; + + ReactorNettyHttpServerUpgradeHandler( + SourceCodec sourceCodec, + UpgradeCodecFactory upgradeCodecFactory, + int maxContentLength, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout) { + super(sourceCodec, upgradeCodecFactory, maxContentLength); + this.readTimeout = readTimeout; + this.requestTimeout = requestTimeout; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + // The upgrade succeeded, the handler is about to be removed from the pipeline, stop all timeouts + requestAvailable = true; + stopTimeouts(ctx); + super.handlerRemoved(ctx); + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg, List out) throws Exception { + if (msg instanceof HttpRequest) { + HttpRequest req = (HttpRequest) msg; + if (req.headers().contains(HttpHeaderNames.UPGRADE)) { + if (readTimeout != null) { + ctx.channel().pipeline().addFirst(NettyPipeline.ReadTimeoutHandler, + new ReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS)); + } + if (requestTimeout != null) { + requestTimeoutFuture = + ctx.executor().schedule(new RequestTimeoutTask(ctx), Math.max(requestTimeout.toMillis(), 1), TimeUnit.MILLISECONDS); + } + } + } + + super.decode(ctx, msg, out); + + if (!out.isEmpty()) { + // The upgrade did not succeed, the full request was created, stop all timeouts + requestAvailable = true; + stopTimeouts(ctx); + } + } + + void stopTimeouts(ChannelHandlerContext ctx) { + ChannelHandler handler = ctx.channel().pipeline().get(NettyPipeline.ReadTimeoutHandler); + if (handler != null) { + ctx.channel().pipeline().remove(handler); + } + if (requestTimeoutFuture != null) { + requestTimeoutFuture.cancel(false); + requestTimeoutFuture = null; + } + } + + final class RequestTimeoutTask implements Runnable { + + final ChannelHandlerContext ctx; + + RequestTimeoutTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void run() { + if (!requestAvailable) { + ctx.fireExceptionCaught(RequestTimeoutException.INSTANCE); + //"FutureReturnValueIgnored" this is deliberate + ctx.close(); + } + } + } + } } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java index ddcad19758..5d12509702 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java @@ -20,6 +20,7 @@ import java.net.SocketAddress; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.time.ZonedDateTime; import java.util.HashSet; import java.util.List; @@ -27,6 +28,8 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.BiPredicate; @@ -68,6 +71,7 @@ import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; @@ -117,6 +121,8 @@ class HttpServerOperations extends HttpOperations, ? super Connection, ? extends Mono> mapHandle; final HttpRequest nettyRequest; final HttpResponse nettyResponse; + final Duration readTimeout; + final Duration requestTimeout; final HttpHeaders responseHeaders; final String scheme; final ZonedDateTime timestamp; @@ -124,6 +130,7 @@ class HttpServerOperations extends HttpOperations compressionPredicate; Function> paramsResolver; String path; + Future requestTimeoutFuture; Consumer trailerHeadersConsumer; volatile Context currentContext; @@ -144,6 +151,8 @@ class HttpServerOperations extends HttpOperations, ? super Connection, ? extends Mono> mapHandle, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, boolean secured, ZonedDateTime timestamp) { this(c, listener, nettyRequest, compressionPredicate, connectionInfo, decoder, encoder, formDecoderProvider, - httpMessageLogFactory, isHttp2, mapHandle, true, secured, timestamp); + httpMessageLogFactory, isHttp2, mapHandle, readTimeout, requestTimeout, true, secured, timestamp); } HttpServerOperations(Connection c, ConnectionObserver listener, HttpRequest nettyRequest, @@ -174,6 +185,8 @@ class HttpServerOperations extends HttpOperations, ? super Connection, ? extends Mono> mapHandle, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout, boolean resolvePath, boolean secured, ZonedDateTime timestamp) { @@ -196,6 +209,8 @@ class HttpServerOperations extends HttpOperations 0) { super.onInboundNext(ctx, msg); @@ -646,6 +672,11 @@ protected void onInboundNext(ChannelHandlerContext ctx, Object msg) { super.onInboundNext(ctx, msg); } if (msg instanceof LastHttpContent) { + removeHandler(NettyPipeline.ReadTimeoutHandler); + if (requestTimeoutFuture != null) { + requestTimeoutFuture.cancel(false); + requestTimeoutFuture = null; + } //force auto read to enable more accurate close selection now inbound is done channel().config().setAutoRead(true); onInboundComplete(); @@ -1020,7 +1051,7 @@ static final class FailedHttpServerRequest extends HttpServerOperations { ConnectionInfo connectionInfo) { super(c, listener, nettyRequest, null, connectionInfo, ServerCookieDecoder.STRICT, ServerCookieEncoder.STRICT, DEFAULT_FORM_DECODER_SPEC, httpMessageLogFactory, isHttp2, - null, false, secure, timestamp); + null, null, null, false, secure, timestamp); this.customResponse = nettyResponse; String tempPath = ""; try { @@ -1045,6 +1076,26 @@ public HttpResponseStatus status() { } } + + final class RequestTimeoutTask implements Runnable { + + final ChannelHandlerContext ctx; + + RequestTimeoutTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void run() { + if (ctx.channel().isActive() && !(isInboundCancelled() || isInboundDisposed())) { + onInboundError(RequestTimeoutException.INSTANCE); + //"FutureReturnValueIgnored" this is deliberate + ctx.close(); + } + } + } + static final class TrailerHeaders extends DefaultHttpHeaders { static final Set DISALLOWED_TRAILER_HEADER_NAMES = new HashSet<>(14); diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java index 09412fabe3..9a3fe20a91 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpTrafficHandler.java @@ -80,6 +80,8 @@ final class HttpTrafficHandler extends ChannelDuplexHandler final BiFunction, ? super Connection, ? extends Mono> mapHandle; final int maxKeepAliveRequests; + final Duration readTimeout; + final Duration requestTimeout; ChannelHandlerContext ctx; @@ -106,7 +108,9 @@ final class HttpTrafficHandler extends ChannelDuplexHandler @Nullable Duration idleTimeout, ConnectionObserver listener, @Nullable BiFunction, ? super Connection, ? extends Mono> mapHandle, - int maxKeepAliveRequests) { + int maxKeepAliveRequests, + @Nullable Duration readTimeout, + @Nullable Duration requestTimeout) { this.listener = listener; this.formDecoderProvider = formDecoderProvider; this.forwardedHeaderHandler = forwardedHeaderHandler; @@ -117,6 +121,8 @@ final class HttpTrafficHandler extends ChannelDuplexHandler this.idleTimeout = idleTimeout; this.mapHandle = mapHandle; this.maxKeepAliveRequests = maxKeepAliveRequests; + this.readTimeout = readTimeout; + this.requestTimeout = requestTimeout; } @Override @@ -216,6 +222,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { httpMessageLogFactory, false, mapHandle, + readTimeout, + requestTimeout, secure, timestamp); } @@ -422,6 +430,8 @@ public void run() { httpMessageLogFactory, false, mapHandle, + readTimeout, + requestTimeout, secure, holder.timestamp); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/RequestTimeoutException.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/RequestTimeoutException.java new file mode 100644 index 0000000000..46865d4e5d --- /dev/null +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/RequestTimeoutException.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.http.server; + +import io.netty.channel.ChannelException; + +final class RequestTimeoutException extends ChannelException { + + static final RequestTimeoutException INSTANCE = new RequestTimeoutException(); + + private static final long serialVersionUID = 422626851161276356L; + + RequestTimeoutException() { + super(null, null, true); + } + + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } +} diff --git a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json index c51f02a8ab..d854c8a6c3 100644 --- a/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json +++ b/reactor-netty-http/src/main/resources/META-INF/native-image/io.projectreactor.netty/reactor-netty-http/reflect-config.json @@ -139,6 +139,13 @@ "name": "reactor.netty.http.server.HttpServerConfig$Http11OrH2CleartextCodec", "queryAllPublicMethods": true }, + { + "condition": { + "typeReachable": "reactor.netty.http.server.HttpServerConfig$ReactorNettyHttpServerUpgradeHandler" + }, + "name": "reactor.netty.http.server.HttpServerConfig$ReactorNettyHttpServerUpgradeHandler", + "queryAllPublicMethods": true + }, { "condition": { "typeReachable": "reactor.netty.http.server.HttpServerMetricsHandler" diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java index 01af444975..5feaa80479 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java @@ -28,6 +28,7 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2DataFrame; @@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; import reactor.core.scheduler.Schedulers; import reactor.netty.BaseHttpTest; import reactor.netty.ByteBufFlux; @@ -57,6 +59,7 @@ import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientConfig; import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.http.client.PrematureCloseException; import reactor.netty.http.server.HttpServer; import reactor.netty.http.server.HttpServerConfig; import reactor.netty.http.server.logging.AccessLog; @@ -78,6 +81,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; @@ -757,6 +761,102 @@ else if (clientProtocols.length == 2 && clientProtocols[1] == HttpProtocol.H2C) } } + @ParameterizedCompatibleCombinationsTest + void testRequestTimeout(HttpServer server, HttpClient client) throws Exception { + HttpProtocol[] serverProtocols = server.configuration().protocols(); + HttpProtocol[] clientProtocols = client.configuration().protocols(); + AtomicReference> handlerAvailable = new AtomicReference<>(new ArrayList<>(3)); + AtomicReference> onTerminate = new AtomicReference<>(new ArrayList<>(3)); + AtomicReference> timeout = new AtomicReference<>(new ArrayList<>(3)); + CountDownLatch latch = new CountDownLatch(3); + disposableServer = + server.readTimeout(Duration.ofMillis(60)) + .requestTimeout(Duration.ofMillis(150)) + .doOnChannelInit((obs, ch, addr) -> { + if ((serverProtocols.length == 2 && serverProtocols[1] == HttpProtocol.H2C) && + (clientProtocols.length == 2 && clientProtocols[1] == HttpProtocol.H2C)) { + ChannelHandler httpServerCodec = ch.pipeline().get(HttpServerCodec.class); + if (httpServerCodec != null) { + String name = ch.pipeline().context(httpServerCodec).name(); + ch.pipeline().addAfter(name, "testRequestTimeout", + new RequestTimeoutTestChannelInboundHandler(handlerAvailable, onTerminate, timeout, latch)); + } + } + }) + .handle((req, res) -> + res.withConnection(conn -> { + ChannelHandler handler = conn.channel().pipeline().get(NettyPipeline.ReadTimeoutHandler); + if (handler != null) { + handlerAvailable.get().add(true); + timeout.get().add(((ReadTimeoutHandler) handler).getReaderIdleTimeInMillis()); + } + conn.onTerminate().subscribe(null, null, () -> { + onTerminate.get().add(conn.channel().isActive() && + conn.channel().pipeline().get(NettyPipeline.ReadTimeoutHandler) != null); + latch.countDown(); + }); + }) + .send(req.receive().retain())) + .bindNow(); + + HttpClient localClient = client.port(disposableServer.port()); + + Mono response1 = + localClient.post() + .uri("/") + .send(ByteBufFlux.fromString(Flux.just("test", "ProtocolVariations", "RequestTimeout") + .delayElements(Duration.ofMillis(80)))) + .responseContent() + .aggregate() + .asString(); + + Mono response2 = + localClient.post() + .uri("/") + .send(ByteBufFlux.fromString(Flux.just("test", "Protocol", "Variations", "Request", "Timeout") + .delayElements(Duration.ofMillis(40)))) + .responseContent() + .aggregate() + .asString(); + + Mono response3 = + localClient.post() + .uri("/") + .send(ByteBufFlux.fromString(Flux.just("test", "ProtocolVariations", "RequestTimeout"))) + .responseContent() + .aggregate() + .asString(); + + List> result = + Flux.concat(response1.materialize(), response2.materialize(), response3.materialize()) + .collectList() + .block(Duration.ofSeconds(30)); + + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + + assertThat(result).isNotNull(); + + assertThat(handlerAvailable.get()).hasSize(3).allMatch(b -> b); + assertThat(onTerminate.get()).hasSize(3).allMatch(b -> !b); + assertThat(timeout.get()).hasSize(3).allMatch(l -> l == 60); + + int onNext = 0; + int onError = 0; + for (Signal signal : result) { + if (signal.isOnNext()) { + onNext++; + assertThat(signal.get()).isEqualTo("testProtocolVariationsRequestTimeout"); + } + else if (signal.getThrowable() instanceof PrematureCloseException || + signal.getThrowable().getMessage().contains("Connection reset by peer")) { + onError++; + } + } + + assertThat(onNext).isEqualTo(1); + assertThat(onError).isEqualTo(2); + } + static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter { final CountDownLatch latch = new CountDownLatch(1); @@ -795,4 +895,47 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { ctx.fireUserEventTriggered(evt); } } -} + + static final class RequestTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter { + + final AtomicReference> handlerAvailable; + final AtomicReference> onTerminate; + final AtomicReference> timeout; + final CountDownLatch latch; + + boolean added; + + RequestTimeoutTestChannelInboundHandler( + AtomicReference> handlerAvailable, + AtomicReference> onTerminate, + AtomicReference> timeout, + CountDownLatch latch) { + this.handlerAvailable = handlerAvailable; + this.onTerminate = onTerminate; + this.timeout = timeout; + this.latch = latch; + } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (!added && msg instanceof HttpContent) { + ChannelHandler handler = ctx.channel().pipeline().get(NettyPipeline.ReadTimeoutHandler); + if (handler != null) { + handlerAvailable.get().add(true); + timeout.get().add(((ReadTimeoutHandler) handler).getReaderIdleTimeInMillis()); + } + added = true; + } + + ctx.fireChannelRead(msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + onTerminate.get().add(ctx.channel().isActive() && + ctx.channel().pipeline().get(NettyPipeline.ReadTimeoutHandler) != null); + latch.countDown(); + + ctx.fireChannelInactive(); + } + } +} \ No newline at end of file diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java index 8c8ec9e671..0de63979b3 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -2096,6 +2096,8 @@ private void doTestStatus(HttpResponseStatus status) { ReactorNettyHttpMessageLogFactory.INSTANCE, false, null, + null, + null, false, ZonedDateTime.now(ReactorNetty.ZONE_ID_SYSTEM)); ops.status(status); @@ -3008,6 +3010,8 @@ private void doTestIsFormUrlencoded(String headerValue, boolean expectation) { ReactorNettyHttpMessageLogFactory.INSTANCE, false, null, + null, + null, false, ZonedDateTime.now(ReactorNetty.ZONE_ID_SYSTEM)); assertThat(ops.isFormUrlencoded()).isEqualTo(expectation);