Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,9 @@ void onReaderReset() {
Http3Error resetError = Http3Error.fromCode(errorCode)
.orElse(Http3Error.H3_REQUEST_CANCELLED);
if (!requestSent || !responseReceived) {
if (!responseReceived && resetError == Http3Error.H3_REQUEST_REJECTED) {
exchange.markUnprocessedByPeer();
}
Comment on lines +1339 to +1341
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, server handler's exchange.resetStream(Http3Error.H3_REQUEST_REJECTED.code()) results in client to fail the request, instead of retrying it. I am not sure if H3_REQUEST_REJECTED is the only error code we should guard against.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured the problem is more convoluted than it appears. @jaikiran will soon land a PR adressing this issue. I will hold this PR until then.

cancelImpl(new IOException("Stream %s reset by peer: %s"
.formatted(streamId(), resetReason)),
resetError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

package jdk.internal.net.http;

import java.io.IOError;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.ConnectException;
import java.net.http.HttpClient.Version;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.StreamLimitException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.ListIterator;
Expand Down Expand Up @@ -396,6 +396,7 @@ private HttpResponse<T> setNewResponse(HttpRequest request, Response r, T body,

private CompletableFuture<HttpResponse<T>>
responseAsync0(CompletableFuture<Void> start) {
AtomicReference<HttpResponse.BodyHandler<T>> effectiveResponseHandlerRef = new AtomicReference<>();
return start.thenCompose( _ -> {
// this is the first attempt to have the request processed by the server
attempts.set(1);
Expand All @@ -412,11 +413,76 @@ private HttpResponse<T> setNewResponse(HttpRequest request, Response r, T body,
} else
return handleNoBody(r, exch);
}
return exch.readBodyAsync(responseHandler)
HttpResponse.BodyHandler<T> effectiveResponseHandler = effectiveResponseHandlerRef.get();
if (effectiveResponseHandler == null) {
effectiveResponseHandlerRef.set(effectiveResponseHandler = responseTimerEvent != null
? new TimerCancellingBodyHandlerWrapper(responseHandler)
: responseHandler);
}
return exch.readBodyAsync(effectiveResponseHandler)
.thenApply((T body) -> setNewResponse(r.request, r, body, exch));
}).exceptionallyCompose(this::whenCancelled);
}

/**
* Decorates a {@link HttpResponse.BodyHandler} to {@link #cancelTimer()} at completion.
*/
private final class TimerCancellingBodyHandlerWrapper implements HttpResponse.BodyHandler<T> {

private final HttpResponse.BodyHandler<T> delegate;

private TimerCancellingBodyHandlerWrapper(HttpResponse.BodyHandler<T> delegate) {
this.delegate = delegate;
}

@Override
public BodySubscriber<T> apply(HttpResponse.ResponseInfo responseInfo) {
BodySubscriber<T> subscriber = delegate.apply(responseInfo);
return new TimerCancellingBodySubscriberWrapper(subscriber);
}

}

/**
* Decorates a {@link HttpResponse.BodySubscriber} to {@link #cancelTimer()} at completion.
*/
private final class TimerCancellingBodySubscriberWrapper implements HttpResponse.BodySubscriber<T> {

private final HttpResponse.BodySubscriber<T> delegate;

private TimerCancellingBodySubscriberWrapper(BodySubscriber<T> delegate) {
this.delegate = delegate;
}

@Override
public CompletionStage<T> getBody() {
return delegate.getBody();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
delegate.onSubscribe(subscription);
}

@Override
public void onNext(List<ByteBuffer> buffers) {
delegate.onNext(buffers);
}

@Override
public void onError(Throwable throwable) {
cancelTimer();
delegate.onError(throwable);
}

@Override
public void onComplete() {
cancelTimer();
delegate.onComplete();
}

}

// returns a CancellationException that wraps the given cause
// if cancel(boolean) was called, the given cause otherwise
private Throwable wrapIfCancelled(Throwable cause) {
Expand Down Expand Up @@ -467,6 +533,8 @@ private CompletableFuture<Response> retryRequest() {

private CompletableFuture<Response> responseAsyncImpl(final boolean applyReqFilters) {
if (currentreq.timeout().isPresent()) {
// Retried/Forwarded requests should reset the timer, if present
cancelTimer();
responseTimerEvent = ResponseTimerEvent.of(this);
client.registerTimer(responseTimerEvent);
}
Expand Down Expand Up @@ -502,7 +570,6 @@ private CompletableFuture<Response> responseAsyncImpl(final boolean applyReqFilt
}
return completedFuture(response);
} else {
cancelTimer();
setNewResponse(currentreq, response, null, exch);
if (currentreq.isWebSocket()) {
// need to close the connection and open a new one.
Expand All @@ -520,11 +587,18 @@ private CompletableFuture<Response> responseAsyncImpl(final boolean applyReqFilt
} })
.handle((response, ex) -> {
// 5. handle errors and cancel any timer set
cancelTimer();
if (ex == null) {
assert response != null;
return completedFuture(response);
}

// Cancel the timer. Note that we only do so if the
// response has completed exceptionally. That is, we don't
// cancel the timer if there are no exceptions, since the
// response body might still get consumed, and it is
// still subject to the response timer.
cancelTimer();

// all exceptions thrown are handled here
final RetryContext retryCtx = checkRetryEligible(ex, exch);
assert retryCtx != null : "retry context is null";
Expand Down
225 changes: 225 additions & 0 deletions test/jdk/java/net/httpclient/TimeoutResponseBodyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Utils;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.InputStream;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.fail;

/*
* @test id=retriesDisabled
* @bug 8208693
* @summary Verifies `HttpRequest::timeout` is effective for *response body*
* timeouts when all retry mechanisms are disabled.
*
* @library /test/lib
* /test/jdk/java/net/httpclient/lib
* @build TimeoutResponseTestSupport
*
* @run junit/othervm
* -Djdk.httpclient.auth.retrylimit=0
* -Djdk.httpclient.disableRetryConnect
* -Djdk.httpclient.redirects.retrylimit=0
* -Dtest.requestTimeoutMillis=1000
* TimeoutResponseBodyTest
*/

/*
* @test id=retriesEnabledForResponseFailure
* @bug 8208693
* @summary Verifies `HttpRequest::timeout` is effective for *response body*
* timeouts, where some initial responses are intentionally configured
* to fail to trigger retries.
*
* @library /test/lib
* /test/jdk/java/net/httpclient/lib
* @build TimeoutResponseTestSupport
*
* @run junit/othervm
* -Djdk.httpclient.auth.retrylimit=0
* -Djdk.httpclient.disableRetryConnect
* -Djdk.httpclient.redirects.retrylimit=3
* -Dtest.requestTimeoutMillis=1000
* -Dtest.responseFailureWaitDurationMillis=600
* TimeoutResponseBodyTest
*/

/**
* Verifies {@link HttpRequest#timeout() HttpRequest.timeout()} is effective
* for <b>response body</b> timeouts.
*
* @implNote
*
* Using a response body subscriber (i.e., {@link InputStream}) of type that
* allows gradual consumption of the response body after successfully building
* an {@link HttpResponse} instance to ensure timeouts are propagated even
* after the {@code HttpResponse} construction.
* <p>
* Each test is provided a pristine ephemeral client to avoid any unexpected
* effects due to pooling.
*/
class TimeoutResponseBodyTest extends TimeoutResponseTestSupport {

private static final Logger LOGGER = Utils.getDebugLogger(
TimeoutResponseBodyTest.class.getSimpleName()::toString, Utils.DEBUG);

/**
* Tests timeouts using
* {@link HttpClient#send(HttpRequest, HttpResponse.BodyHandler) HttpClient::send}
* against a server blocking without delivering the response body.
*/
@ParameterizedTest
@MethodSource("serverRequestPairs")
void testSendOnMissingBody(ServerRequestPair pair) throws Exception {

ServerRequestPair.SERVER_HANDLER_BEHAVIOUR =
ServerRequestPair.ServerHandlerBehaviour.BLOCK_BEFORE_BODY_DELIVERY;

try (HttpClient client = pair.createClientWithEstablishedConnection()) {
assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> {
LOGGER.log("Sending the request");
HttpResponse<InputStream> response = client.send(
pair.request(), HttpResponse.BodyHandlers.ofInputStream());
LOGGER.log("Consuming the obtained response");
verifyResponseBodyDoesNotArrive(response);
});
}

}

/**
* Tests timeouts using
* {@link HttpClient#sendAsync(HttpRequest, HttpResponse.BodyHandler) HttpClient::sendAsync}
* against a server blocking without delivering the response body.
*/
@ParameterizedTest
@MethodSource("serverRequestPairs")
void testSendAsyncOnMissingBody(ServerRequestPair pair) throws Exception {

ServerRequestPair.SERVER_HANDLER_BEHAVIOUR =
ServerRequestPair.ServerHandlerBehaviour.BLOCK_BEFORE_BODY_DELIVERY;

try (HttpClient client = pair.createClientWithEstablishedConnection()) {
assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> {
LOGGER.log("Sending the request asynchronously");
CompletableFuture<HttpResponse<InputStream>> responseFuture = client.sendAsync(
pair.request(), HttpResponse.BodyHandlers.ofInputStream());
LOGGER.log("Obtaining the response");
HttpResponse<InputStream> response = responseFuture.get();
LOGGER.log("Consuming the obtained response");
verifyResponseBodyDoesNotArrive(response);
});
}

}

private static void verifyResponseBodyDoesNotArrive(HttpResponse<InputStream> response) {
assertEquals(200, response.statusCode());
assertThrowsHttpTimeoutException(() -> {
try (InputStream responseBodyStream = response.body()) {
int readByte = responseBodyStream.read();
fail("Unexpected read byte: " + readByte);
}
});
}

/**
* Tests timeouts using
* {@link HttpClient#send(HttpRequest, HttpResponse.BodyHandler) HttpClient::send}
* against a server delivering the response body very slowly.
*/
@ParameterizedTest
@MethodSource("serverRequestPairs")
void testSendOnSlowBody(ServerRequestPair pair) throws Exception {

ServerRequestPair.SERVER_HANDLER_BEHAVIOUR =
ServerRequestPair.ServerHandlerBehaviour.DELIVER_BODY_SLOWLY;

try (HttpClient client = pair.createClientWithEstablishedConnection()) {
assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> {
LOGGER.log("Sending the request");
HttpResponse<InputStream> response = client.send(
pair.request(), HttpResponse.BodyHandlers.ofInputStream());
LOGGER.log("Consuming the obtained response");
verifyResponseBodyArrivesSlow(response);
});
}

}

/**
* Tests timeouts using
* {@link HttpClient#sendAsync(HttpRequest, HttpResponse.BodyHandler) HttpClient::sendAsync}
* against a server delivering the response body very slowly.
*/
@ParameterizedTest
@MethodSource("serverRequestPairs")
void testSendAsyncOnSlowBody(ServerRequestPair pair) throws Exception {

ServerRequestPair.SERVER_HANDLER_BEHAVIOUR =
ServerRequestPair.ServerHandlerBehaviour.DELIVER_BODY_SLOWLY;

try (HttpClient client = pair.createClientWithEstablishedConnection()) {
assertTimeoutPreemptively(REQUEST_TIMEOUT.multipliedBy(2), () -> {
LOGGER.log("Sending the request asynchronously");
CompletableFuture<HttpResponse<InputStream>> responseFuture = client.sendAsync(
pair.request(), HttpResponse.BodyHandlers.ofInputStream());
LOGGER.log("Obtaining the response");
HttpResponse<InputStream> response = responseFuture.get();
LOGGER.log("Consuming the obtained response");
verifyResponseBodyArrivesSlow(response);
});
}

}

private static void verifyResponseBodyArrivesSlow(HttpResponse<InputStream> response) {
assertEquals(200, response.statusCode());
assertThrowsHttpTimeoutException(() -> {
try (InputStream responseBodyStream = response.body()) {
int i = 0;
int l = ServerRequestPair.CONTENT_LENGTH;
for (; i < l; i++) {
LOGGER.log("Reading byte %s/%s", i, l);
int readByte = responseBodyStream.read();
if (readByte < 0) {
break;
}
assertEquals(i, readByte);
}
fail("Should not have reached here! (i=%s)".formatted(i));
}
});
}

}
Loading