diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http3ExchangeImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http3ExchangeImpl.java index ff1e024673cb8..1970c0dcdb758 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http3ExchangeImpl.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http3ExchangeImpl.java @@ -1336,6 +1336,9 @@ void onReaderReset() { Http3Error resetError = Http3Error.fromCode(errorCode) .orElse(Http3Error.H3_REQUEST_CANCELLED); if (!requestSent || !responseReceived) { + if (!responseReceived && resetError == Http3Error.H3_REQUEST_REJECTED) { + exchange.markUnprocessedByPeer(); + } cancelImpl(new IOException("Stream %s reset by peer: %s" .formatted(streamId(), resetReason)), resetError); diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java b/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java index ec621f7f95576..ebda1d30cb38a 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java @@ -25,13 +25,13 @@ package jdk.internal.net.http; -import java.io.IOError; import java.io.IOException; import java.lang.ref.WeakReference; import java.net.ConnectException; import java.net.http.HttpClient.Version; import java.net.http.HttpConnectTimeoutException; import java.net.http.StreamLimitException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; import java.util.ListIterator; @@ -396,6 +396,7 @@ private HttpResponse setNewResponse(HttpRequest request, Response r, T body, private CompletableFuture> responseAsync0(CompletableFuture start) { + AtomicReference> effectiveResponseHandlerRef = new AtomicReference<>(); return start.thenCompose( _ -> { // this is the first attempt to have the request processed by the server attempts.set(1); @@ -412,11 +413,76 @@ private HttpResponse setNewResponse(HttpRequest request, Response r, T body, } else return handleNoBody(r, exch); } - return exch.readBodyAsync(responseHandler) + HttpResponse.BodyHandler effectiveResponseHandler = effectiveResponseHandlerRef.get(); + if (effectiveResponseHandler == null) { + effectiveResponseHandlerRef.set(effectiveResponseHandler = responseTimerEvent != null + ? new TimerCancellingBodyHandlerWrapper(responseHandler) + : responseHandler); + } + return exch.readBodyAsync(effectiveResponseHandler) .thenApply((T body) -> setNewResponse(r.request, r, body, exch)); }).exceptionallyCompose(this::whenCancelled); } + /** + * Decorates a {@link HttpResponse.BodyHandler} to {@link #cancelTimer()} at completion. + */ + private final class TimerCancellingBodyHandlerWrapper implements HttpResponse.BodyHandler { + + private final HttpResponse.BodyHandler delegate; + + private TimerCancellingBodyHandlerWrapper(HttpResponse.BodyHandler delegate) { + this.delegate = delegate; + } + + @Override + public BodySubscriber apply(HttpResponse.ResponseInfo responseInfo) { + BodySubscriber subscriber = delegate.apply(responseInfo); + return new TimerCancellingBodySubscriberWrapper(subscriber); + } + + } + + /** + * Decorates a {@link HttpResponse.BodySubscriber} to {@link #cancelTimer()} at completion. + */ + private final class TimerCancellingBodySubscriberWrapper implements HttpResponse.BodySubscriber { + + private final HttpResponse.BodySubscriber delegate; + + private TimerCancellingBodySubscriberWrapper(BodySubscriber delegate) { + this.delegate = delegate; + } + + @Override + public CompletionStage getBody() { + return delegate.getBody(); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + delegate.onSubscribe(subscription); + } + + @Override + public void onNext(List buffers) { + delegate.onNext(buffers); + } + + @Override + public void onError(Throwable throwable) { + cancelTimer(); + delegate.onError(throwable); + } + + @Override + public void onComplete() { + cancelTimer(); + delegate.onComplete(); + } + + } + // returns a CancellationException that wraps the given cause // if cancel(boolean) was called, the given cause otherwise private Throwable wrapIfCancelled(Throwable cause) { @@ -467,6 +533,8 @@ private CompletableFuture retryRequest() { private CompletableFuture responseAsyncImpl(final boolean applyReqFilters) { if (currentreq.timeout().isPresent()) { + // Retried/Forwarded requests should reset the timer, if present + cancelTimer(); responseTimerEvent = ResponseTimerEvent.of(this); client.registerTimer(responseTimerEvent); } @@ -502,7 +570,6 @@ private CompletableFuture responseAsyncImpl(final boolean applyReqFilt } return completedFuture(response); } else { - cancelTimer(); setNewResponse(currentreq, response, null, exch); if (currentreq.isWebSocket()) { // need to close the connection and open a new one. @@ -520,11 +587,18 @@ private CompletableFuture responseAsyncImpl(final boolean applyReqFilt } }) .handle((response, ex) -> { // 5. handle errors and cancel any timer set - cancelTimer(); if (ex == null) { assert response != null; return completedFuture(response); } + + // Cancel the timer. Note that we only do so if the + // response has completed exceptionally. That is, we don't + // cancel the timer if there are no exceptions, since the + // response body might still get consumed, and it is + // still subject to the response timer. + cancelTimer(); + // all exceptions thrown are handled here final RetryContext retryCtx = checkRetryEligible(ex, exch); assert retryCtx != null : "retry context is null"; diff --git a/test/jdk/java/net/httpclient/TimeoutResponseBodyTest.java b/test/jdk/java/net/httpclient/TimeoutResponseBodyTest.java new file mode 100644 index 0000000000000..91783b613a855 --- /dev/null +++ b/test/jdk/java/net/httpclient/TimeoutResponseBodyTest.java @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import jdk.internal.net.http.common.Logger; +import jdk.internal.net.http.common.Utils; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.InputStream; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.fail; + +/* + * @test id=retriesDisabled + * @bug 8208693 + * @summary Verifies `HttpRequest::timeout` is effective for *response body* + * timeouts when all retry mechanisms are disabled. + * + * @library /test/lib + * /test/jdk/java/net/httpclient/lib + * @build TimeoutResponseTestSupport + * + * @run junit/othervm + * -Djdk.httpclient.auth.retrylimit=0 + * -Djdk.httpclient.disableRetryConnect + * -Djdk.httpclient.redirects.retrylimit=0 + * -Dtest.requestTimeoutMillis=1000 + * TimeoutResponseBodyTest + */ + +/* + * @test id=retriesEnabledForResponseFailure + * @bug 8208693 + * @summary Verifies `HttpRequest::timeout` is effective for *response body* + * timeouts, where some initial responses are intentionally configured + * to fail to trigger retries. + * + * @library /test/lib + * /test/jdk/java/net/httpclient/lib + * @build TimeoutResponseTestSupport + * + * @run junit/othervm + * -Djdk.httpclient.auth.retrylimit=0 + * -Djdk.httpclient.disableRetryConnect + * -Djdk.httpclient.redirects.retrylimit=3 + * -Dtest.requestTimeoutMillis=1000 + * -Dtest.responseFailureWaitDurationMillis=600 + * TimeoutResponseBodyTest + */ + +/** + * Verifies {@link HttpRequest#timeout() HttpRequest.timeout()} is effective + * for response body timeouts. + * + * @implNote + * + * Using a response body subscriber (i.e., {@link InputStream}) of type that + * allows gradual consumption of the response body after successfully building + * an {@link HttpResponse} instance to ensure timeouts are propagated even + * after the {@code HttpResponse} construction. + *

+ * Each test is provided a pristine ephemeral client to avoid any unexpected + * effects due to pooling. + */ +class TimeoutResponseBodyTest extends TimeoutResponseTestSupport { + + private static final Logger LOGGER = Utils.getDebugLogger( + TimeoutResponseBodyTest.class.getSimpleName()::toString, Utils.DEBUG); + + /** + * Tests timeouts using + * {@link HttpClient#send(HttpRequest, HttpResponse.BodyHandler) HttpClient::send} + * against a server blocking without delivering the response body. + */ + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testSendOnMissingBody(ServerRequestPair pair) throws Exception { + + ServerRequestPair.SERVER_HANDLER_BEHAVIOUR = + ServerRequestPair.ServerHandlerBehaviour.BLOCK_BEFORE_BODY_DELIVERY; + + try (HttpClient client = pair.createClientWithEstablishedConnection()) { + assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> { + LOGGER.log("Sending the request"); + HttpResponse response = client.send( + pair.request(), HttpResponse.BodyHandlers.ofInputStream()); + LOGGER.log("Consuming the obtained response"); + verifyResponseBodyDoesNotArrive(response); + }); + } + + } + + /** + * Tests timeouts using + * {@link HttpClient#sendAsync(HttpRequest, HttpResponse.BodyHandler) HttpClient::sendAsync} + * against a server blocking without delivering the response body. + */ + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testSendAsyncOnMissingBody(ServerRequestPair pair) throws Exception { + + ServerRequestPair.SERVER_HANDLER_BEHAVIOUR = + ServerRequestPair.ServerHandlerBehaviour.BLOCK_BEFORE_BODY_DELIVERY; + + try (HttpClient client = pair.createClientWithEstablishedConnection()) { + assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> { + LOGGER.log("Sending the request asynchronously"); + CompletableFuture> responseFuture = client.sendAsync( + pair.request(), HttpResponse.BodyHandlers.ofInputStream()); + LOGGER.log("Obtaining the response"); + HttpResponse response = responseFuture.get(); + LOGGER.log("Consuming the obtained response"); + verifyResponseBodyDoesNotArrive(response); + }); + } + + } + + private static void verifyResponseBodyDoesNotArrive(HttpResponse response) { + assertEquals(200, response.statusCode()); + assertThrowsHttpTimeoutException(() -> { + try (InputStream responseBodyStream = response.body()) { + int readByte = responseBodyStream.read(); + fail("Unexpected read byte: " + readByte); + } + }); + } + + /** + * Tests timeouts using + * {@link HttpClient#send(HttpRequest, HttpResponse.BodyHandler) HttpClient::send} + * against a server delivering the response body very slowly. + */ + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testSendOnSlowBody(ServerRequestPair pair) throws Exception { + + ServerRequestPair.SERVER_HANDLER_BEHAVIOUR = + ServerRequestPair.ServerHandlerBehaviour.DELIVER_BODY_SLOWLY; + + try (HttpClient client = pair.createClientWithEstablishedConnection()) { + assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> { + LOGGER.log("Sending the request"); + HttpResponse response = client.send( + pair.request(), HttpResponse.BodyHandlers.ofInputStream()); + LOGGER.log("Consuming the obtained response"); + verifyResponseBodyArrivesSlow(response); + }); + } + + } + + /** + * Tests timeouts using + * {@link HttpClient#sendAsync(HttpRequest, HttpResponse.BodyHandler) HttpClient::sendAsync} + * against a server delivering the response body very slowly. + */ + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testSendAsyncOnSlowBody(ServerRequestPair pair) throws Exception { + + ServerRequestPair.SERVER_HANDLER_BEHAVIOUR = + ServerRequestPair.ServerHandlerBehaviour.DELIVER_BODY_SLOWLY; + + try (HttpClient client = pair.createClientWithEstablishedConnection()) { + assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> { + LOGGER.log("Sending the request asynchronously"); + CompletableFuture> responseFuture = client.sendAsync( + pair.request(), HttpResponse.BodyHandlers.ofInputStream()); + LOGGER.log("Obtaining the response"); + HttpResponse response = responseFuture.get(); + LOGGER.log("Consuming the obtained response"); + verifyResponseBodyArrivesSlow(response); + }); + } + + } + + private static void verifyResponseBodyArrivesSlow(HttpResponse response) { + assertEquals(200, response.statusCode()); + assertThrowsHttpTimeoutException(() -> { + try (InputStream responseBodyStream = response.body()) { + int i = 0; + int l = ServerRequestPair.CONTENT_LENGTH; + for (; i < l; i++) { + LOGGER.log("Reading byte %s/%s", i, l); + int readByte = responseBodyStream.read(); + if (readByte < 0) { + break; + } + assertEquals(i, readByte); + } + fail("Should not have reached here! (i=%s)".formatted(i)); + } + }); + } + +} diff --git a/test/jdk/java/net/httpclient/TimeoutResponseHeaderTest.java b/test/jdk/java/net/httpclient/TimeoutResponseHeaderTest.java new file mode 100644 index 0000000000000..b6db5a492a51c --- /dev/null +++ b/test/jdk/java/net/httpclient/TimeoutResponseHeaderTest.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import jdk.internal.net.http.common.Logger; +import jdk.internal.net.http.common.Utils; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +/* + * @test id=retriesDisabled + * @bug 8208693 + * @summary Verifies `HttpRequest::timeout` is effective for *response header* + * timeouts when all retry mechanisms are disabled. + * + * @library /test/jdk/java/net/httpclient/lib + * /test/lib + * @build TimeoutResponseTestSupport + * + * @run junit/othervm + * -Djdk.httpclient.auth.retrylimit=0 + * -Djdk.httpclient.disableRetryConnect + * -Djdk.httpclient.redirects.retrylimit=0 + * -Dtest.requestTimeoutMillis=1000 + * TimeoutResponseHeaderTest + */ + +/* + * @test id=retriesEnabledForResponseFailure + * @bug 8208693 + * @summary Verifies `HttpRequest::timeout` is effective for *response header* + * timeouts, where some initial responses are intentionally configured + * to fail to trigger retries. + * + * @library /test/jdk/java/net/httpclient/lib + * /test/lib + * @build TimeoutResponseTestSupport + * + * @run junit/othervm + * -Djdk.httpclient.auth.retrylimit=0 + * -Djdk.httpclient.disableRetryConnect + * -Djdk.httpclient.redirects.retrylimit=3 + * -Dtest.requestTimeoutMillis=1000 + * -Dtest.responseFailureWaitDurationMillis=600 + * TimeoutResponseHeaderTest + */ + +/** + * Verifies {@link HttpRequest#timeout() HttpRequest.timeout()} is effective + * for response header timeouts. + */ +class TimeoutResponseHeaderTest extends TimeoutResponseTestSupport { + + private static final Logger LOGGER = Utils.getDebugLogger( + TimeoutResponseHeaderTest.class.getSimpleName()::toString, Utils.DEBUG); + + static { + ServerRequestPair.SERVER_HANDLER_BEHAVIOUR = + ServerRequestPair.ServerHandlerBehaviour.BLOCK_BEFORE_HEADER_DELIVERY; + } + + /** + * Tests timeouts using + * {@link HttpClient#send(HttpRequest, HttpResponse.BodyHandler) HttpClient::send} + * against a server blocking without delivering any response headers. + */ + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testSend(ServerRequestPair pair) throws Exception { + try (HttpClient client = pair.createClientWithEstablishedConnection()) { + assertTimeoutPreemptively( + REQUEST_TIMEOUT.multipliedBy(2), + () -> assertThrowsHttpTimeoutException(() -> { + LOGGER.log("Sending the request"); + client.send(pair.request(), HttpResponse.BodyHandlers.discarding()); + })); + } + } + + /** + * Tests timeouts using + * {@link HttpClient#sendAsync(HttpRequest, HttpResponse.BodyHandler) HttpClient::sendAsync} + * against a server blocking without delivering any response headers. + */ + @ParameterizedTest + @MethodSource("serverRequestPairs") + void testSendAsync(ServerRequestPair pair) throws Exception { + try (HttpClient client = pair.createClientWithEstablishedConnection()) { + assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> { + LOGGER.log("Sending the request asynchronously"); + CompletableFuture> responseFuture = + client.sendAsync(pair.request(), HttpResponse.BodyHandlers.discarding()); + assertThrowsHttpTimeoutException(() -> { + LOGGER.log("Obtaining the response"); + responseFuture.get(); + }); + }); + } + } + +} diff --git a/test/jdk/java/net/httpclient/TimeoutResponseTestSupport.java b/test/jdk/java/net/httpclient/TimeoutResponseTestSupport.java new file mode 100644 index 0000000000000..de6b83fbe0d09 --- /dev/null +++ b/test/jdk/java/net/httpclient/TimeoutResponseTestSupport.java @@ -0,0 +1,413 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import jdk.httpclient.test.lib.common.HttpServerAdapters; +import jdk.internal.net.http.common.Logger; +import jdk.internal.net.http.common.Utils; +import jdk.internal.net.http.frame.ErrorFrame; +import jdk.internal.net.http.http3.Http3Error; +import jdk.test.lib.net.SimpleSSLContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.function.Executable; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpClient.Version; +import java.net.http.HttpOption; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpTimeoutException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static java.net.http.HttpClient.Builder.NO_PROXY; +import static java.net.http.HttpOption.Http3DiscoveryMode.HTTP_3_URI_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Utilities for {@code TimeoutResponse*Test}s. + * + * @see TimeoutResponseBodyTest Server response body timeout tests + * @see TimeoutResponseHeaderTest Server response header timeout tests + * @see TimeoutBasic Server connection timeout tests + */ +public class TimeoutResponseTestSupport { + + private static final String CLASS_NAME = TimeoutResponseTestSupport.class.getSimpleName(); + + private static final Logger LOGGER = Utils.getDebugLogger(CLASS_NAME::toString, Utils.DEBUG); + + private static final SSLContext SSL_CONTEXT = createSslContext(); + + protected static final Duration REQUEST_TIMEOUT = + Duration.ofMillis(Long.parseLong(System.getProperty("test.requestTimeoutMillis"))); + + static { + assertTrue( + REQUEST_TIMEOUT.isPositive(), + "was expecting `test.requestTimeoutMillis > 0`, found: " + REQUEST_TIMEOUT); + } + + protected static final int RETRY_LIMIT = + Integer.parseInt(System.getProperty("jdk.httpclient.redirects.retrylimit", "0")); + + private static final long RESPONSE_Failure_WAIT_DURATION_MILLIS = + Long.parseLong(System.getProperty("test.responseFailureWaitDurationMillis", "0")); + + static { + if (RETRY_LIMIT > 0) { + + // Verify that response failure wait duration is provided + if (RESPONSE_Failure_WAIT_DURATION_MILLIS <= 0) { + String message = String.format( + "`jdk.httpclient.redirects.retrylimit` (%s) is greater than zero. " + + "`test.responseFailureWaitDurationMillis` (%s) must be greater than zero too.", + RETRY_LIMIT, RESPONSE_Failure_WAIT_DURATION_MILLIS); + throw new AssertionError(message); + } + + // Verify that the total response failure waits exceed the request timeout + Duration totalResponseFailureWaitDuration = Duration + .ofMillis(RESPONSE_Failure_WAIT_DURATION_MILLIS) + .multipliedBy(RETRY_LIMIT); + if (totalResponseFailureWaitDuration.compareTo(REQUEST_TIMEOUT) <= 0) { + String message = ("`test.responseFailureWaitDurationMillis * jdk.httpclient.redirects.retrylimit` (%s * %s = %s) " + + "must be greater than `test.requestTimeoutMillis` (%s)") + .formatted( + RESPONSE_Failure_WAIT_DURATION_MILLIS, + RETRY_LIMIT, + totalResponseFailureWaitDuration, + REQUEST_TIMEOUT); + throw new AssertionError(message); + } + + } + } + + protected static final ServerRequestPair + HTTP1 = ServerRequestPair.of(Version.HTTP_1_1, false), + HTTPS1 = ServerRequestPair.of(Version.HTTP_1_1, true), + HTTP2 = ServerRequestPair.of(Version.HTTP_2, false), + HTTPS2 = ServerRequestPair.of(Version.HTTP_2, true), + HTTP3 = ServerRequestPair.of(Version.HTTP_3, true); + + private static SSLContext createSslContext() { + try { + return new SimpleSSLContext().get(); + } catch (IOException exception) { + throw new UncheckedIOException(exception); + } + } + + protected record ServerRequestPair( + HttpServerAdapters.HttpTestServer server, + HttpRequest request, + boolean secure) { + + private static final ExecutorService EXECUTOR = Executors.newVirtualThreadPerTaskExecutor(); + + private static final CountDownLatch SHUT_DOWN_LATCH = new CountDownLatch(1); + + private static final AtomicInteger SERVER_COUNTER = new AtomicInteger(); + + /** + * An arbitrary content length to cause the client wait for it. + * It just needs to be greater than zero, and big enough to trigger a timeout when delivered slowly. + */ + public static final int CONTENT_LENGTH = 1234; + + public enum ServerHandlerBehaviour { + BLOCK_BEFORE_HEADER_DELIVERY, + BLOCK_BEFORE_BODY_DELIVERY, + DELIVER_BODY_SLOWLY + } + + public static volatile ServerHandlerBehaviour SERVER_HANDLER_BEHAVIOUR; + + public static volatile int SERVER_HANDLER_PENDING_FAILURE_COUNT = 0; + + private static ServerRequestPair of(Version version, boolean secure) { + + // Create the server and the request URI + SSLContext sslContext = secure ? SSL_CONTEXT : null; + String serverId = "" + SERVER_COUNTER.getAndIncrement(); + HttpServerAdapters.HttpTestServer server = createServer(version, sslContext); + server.getVersion(); + String handlerPath = "/%s/".formatted(CLASS_NAME); + String requestUriScheme = secure ? "https" : "http"; + URI requestUri = URI.create("%s://%s%s-".formatted(requestUriScheme, server.serverAuthority(), handlerPath)); + + // Register the request handler + server.addHandler(createServerHandler(serverId), handlerPath); + + // Create the request + HttpRequest request = createRequestBuilder(requestUri, version).timeout(REQUEST_TIMEOUT).build(); + + // Create the pair + ServerRequestPair pair = new ServerRequestPair(server, request, secure); + pair.server.start(); + LOGGER.log("Server[%s] is started at `%s`", serverId, server.serverAuthority()); + return pair; + + } + + private static HttpServerAdapters.HttpTestServer createServer( + Version version, + SSLContext sslContext) { + try { + return switch (version) { + case HTTP_1_1, HTTP_2 -> HttpServerAdapters.HttpTestServer.create(version, sslContext, EXECUTOR); + case HTTP_3 -> HttpServerAdapters.HttpTestServer.create(HTTP_3_URI_ONLY, sslContext, EXECUTOR); + }; + } catch (IOException exception) { + throw new UncheckedIOException(exception); + } + } + + private static HttpServerAdapters.HttpTestHandler createServerHandler(String serverId) { + return (exchange) -> { + String connectionKey = exchange.getConnectionKey(); + LOGGER.log( + "Server[%s] has received request %s", + serverId, Map.of("connectionKey", connectionKey)); + try (exchange) { + + // Short-circuit on `HEAD` requests. + // They are used for admitting established connections to the pool. + if ("HEAD".equals(exchange.getRequestMethod())) { + LOGGER.log( + "Server[%s] is responding to the `HEAD` request %s", + serverId, Map.of("connectionKey", connectionKey)); + exchange.sendResponseHeaders(200, 0); + return; + } + + // Short-circuit if instructed to fail + synchronized (ServerRequestPair.class) { + if (SERVER_HANDLER_PENDING_FAILURE_COUNT > 0) { + LOGGER.log( + "Server[%s] is prematurely failing as instructed %s", + serverId, + Map.of( + "connectionKey", connectionKey, + "SERVER_HANDLER_PENDING_FAILURE_COUNT", SERVER_HANDLER_PENDING_FAILURE_COUNT)); + // Closing the exchange will trigger an `END_STREAM` without a headers frame. + // This is a protocol violation, hence we must reset the stream first. + // We are doing so using by rejecting the stream, which is known to make the client retry. + if (Version.HTTP_2.equals(exchange.getExchangeVersion())) { + exchange.resetStream(ErrorFrame.REFUSED_STREAM); + } else if (Version.HTTP_3.equals(exchange.getExchangeVersion())) { + exchange.resetStream(Http3Error.H3_REQUEST_REJECTED.code()); + } + SERVER_HANDLER_PENDING_FAILURE_COUNT--; + return; + } + } + + switch (SERVER_HANDLER_BEHAVIOUR) { + + case BLOCK_BEFORE_HEADER_DELIVERY: + sleepIndefinitely(serverId, connectionKey); + break; + + case BLOCK_BEFORE_BODY_DELIVERY: + sendResponseHeaders(serverId, exchange, connectionKey); + sleepIndefinitely(serverId, connectionKey); + break; + + case DELIVER_BODY_SLOWLY: + sendResponseHeaders(serverId, exchange, connectionKey); + sendResponseBodySlowly(serverId, exchange, connectionKey); + break; + } + + } catch (Exception exception) { + String message = String.format( + "Server[%s] has failed! %s", + serverId, Map.of("connectionKey", connectionKey)); + LOGGER.log(System.Logger.Level.ERROR, message, exception); + if (exception instanceof InterruptedException) { + // Restore the interrupt + Thread.currentThread().interrupt(); + } + throw new RuntimeException(message, exception); + } + }; + } + + private static void sleepIndefinitely(String serverId, String connectionKey) throws InterruptedException { + LOGGER.log("Server[%s] is sleeping %s", serverId, Map.of("connectionKey", connectionKey)); + SHUT_DOWN_LATCH.await(); + } + + private static void sendResponseHeaders( + String serverId, + HttpServerAdapters.HttpTestExchange exchange, + String connectionKey) + throws IOException { + LOGGER.log("Server[%s] is sending headers %s", serverId, Map.of("connectionKey", connectionKey)); + exchange.sendResponseHeaders(200, CONTENT_LENGTH); + // Force the headers to be flushed + exchange.getResponseBody().flush(); + } + + private static void sendResponseBodySlowly( + String serverId, + HttpServerAdapters.HttpTestExchange exchange, + String connectionKey) + throws Exception { + Duration perBytePauseDuration = Duration.ofMillis(100); + assertTrue( + perBytePauseDuration.multipliedBy(CONTENT_LENGTH).compareTo(REQUEST_TIMEOUT) > 0, + "Per-byte pause duration (%s) must be long enough to exceed the timeout (%s) when delivering the content (%s bytes)".formatted( + perBytePauseDuration, REQUEST_TIMEOUT, CONTENT_LENGTH)); + try (OutputStream responseBody = exchange.getResponseBody()) { + for (int i = 0; i < CONTENT_LENGTH; i++) { + LOGGER.log( + "Server[%s] is sending the body %s/%s %s", + serverId, i, CONTENT_LENGTH, Map.of("connectionKey", connectionKey)); + responseBody.write(i); + responseBody.flush(); + Thread.sleep(perBytePauseDuration); + } + throw new AssertionError("Delivery should never have succeeded due to timeout!"); + } catch (IOException _) { + // Client's timeout mechanism is expected to short-circuit and cut the stream. + // Hence, discard I/O failures. + } + } + + public HttpClient createClientWithEstablishedConnection() throws IOException, InterruptedException { + Version version = server.getVersion(); + HttpClient client = HttpServerAdapters + .createClientBuilderFor(version) + .version(version) + .sslContext(SSL_CONTEXT) + .proxy(NO_PROXY) + .build(); + // Ensure an established connection is admitted to the pool. This + // helps to cross out any possibilities of a timeout before a + // request makes it to the server handler. For instance, consider + // HTTP/1.1 to HTTP/2 upgrades, or long-running TLS handshakes. + HttpRequest headRequest = createRequestBuilder(request.uri(), version).HEAD().build(); + client.send(headRequest, HttpResponse.BodyHandlers.discarding()); + return client; + } + + private static HttpRequest.Builder createRequestBuilder(URI uri, Version version) { + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(uri).version(version); + if (Version.HTTP_3.equals(version)) { + requestBuilder.setOption(HttpOption.H3_DISCOVERY, HttpOption.Http3DiscoveryMode.HTTP_3_URI_ONLY); + } + return requestBuilder; + } + + @Override + public String toString() { + Version version = server.getVersion(); + String versionString = version.toString(); + return switch (version) { + case HTTP_1_1, HTTP_2 -> secure ? versionString.replaceFirst("_", "S_") : versionString; + case HTTP_3 -> versionString; + }; + } + + } + + @AfterAll + static void closeServers() { + + // Terminate all handlers before shutting down the server, which would block otherwise. + ServerRequestPair.SHUT_DOWN_LATCH.countDown(); + ServerRequestPair.EXECUTOR.shutdown(); + + // Shut down servers + Exception[] exceptionRef = {null}; + serverRequestPairs() + .forEach(pair -> { + try { + pair.server.stop(); + } catch (Exception exception) { + if (exceptionRef[0] == null) { + exceptionRef[0] = exception; + } else { + exceptionRef[0].addSuppressed(exception); + } + } + }); + if (exceptionRef[0] != null) { + throw new RuntimeException("failed closing one or more server resources", exceptionRef[0]); + } + + } + + /** + * Configures how many times the handler should fail. + */ + @BeforeEach + void resetServerHandlerFailureIndex() { + ServerRequestPair.SERVER_HANDLER_PENDING_FAILURE_COUNT = Math.max(0, RETRY_LIMIT - 1); + } + + /** + * Ensures that the handler has failed as many times as instructed. + */ + @AfterEach + void verifyServerHandlerFailureIndex() { + assertEquals(0, ServerRequestPair.SERVER_HANDLER_PENDING_FAILURE_COUNT); + } + + protected static Stream serverRequestPairs() { + return Stream.of(HTTP1, HTTPS1, HTTP2, HTTPS2, HTTP3); + } + + protected static void assertThrowsHttpTimeoutException(Executable executable) { + Exception rootException = assertThrows(Exception.class, executable); + // Due to intricacies involved in the way exceptions are generated and + // nested, there is no bullet-proof way to determine at which level of + // the causal chain an `HttpTimeoutException` will show up. Hence, we + // scan through the entire causal chain. + Throwable exception = rootException; + while (exception != null) { + if (exception instanceof HttpTimeoutException) { + return; + } + exception = exception.getCause(); + } + throw new AssertionError("was expecting an `HttpTimeoutException` in the causal chain", rootException); + } + +}