Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
8245462: HttpClient send throws InterruptedException when interrupted…
… but does not cancel request

Allows an HTTP operation to be cancelled by calling CompletableFuture::cancel(true)

Reviewed-by: michaelm, chegar, alanb
  • Loading branch information
dfuch committed Aug 28, 2020
1 parent 19216ac commit b02054be18321dd666636a13f73a0a6134f87815
Show file tree
Hide file tree
Showing 14 changed files with 974 additions and 49 deletions.
@@ -40,8 +40,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.net.http.HttpResponse.BodyHandler;
@@ -53,8 +51,10 @@
*
* <p> An {@code HttpClient} can be used to send {@linkplain HttpRequest
* requests} and retrieve their {@linkplain HttpResponse responses}. An {@code
* HttpClient} is created through a {@link HttpClient#newBuilder() builder}. The
* builder can be used to configure per-client state, like: the preferred
* HttpClient} is created through a {@link HttpClient.Builder builder}.
* The {@link #newBuilder() newBuilder} method returns a builder that creates
* instances of the default {@code HttpClient} implementation.
* The builder can be used to configure per-client state, like: the preferred
* protocol version ( HTTP/1.1 or HTTP/2 ), whether to follow redirects, a
* proxy, an authenticator, etc. Once built, an {@code HttpClient} is immutable,
* and can be used to send multiple requests.
@@ -165,6 +165,9 @@ public static HttpClient newHttpClient() {
/**
* Creates a new {@code HttpClient} builder.
*
* <p> Builders returned by this method create instances
* of the default {@code HttpClient} implementation.
*
* @return an {@code HttpClient.Builder}
*/
public static Builder newBuilder() {
@@ -529,6 +532,23 @@ public enum Redirect {
* response status, headers, and body ( as handled by given response body
* handler ).
*
* <p> If the operation is interrupted, the default {@code HttpClient}
* implementation attempts to cancel the HTTP exchange and
* {@link InterruptedException} is thrown.
* No guarantee is made as to exactly <em>when</em> the cancellation request
* may be taken into account. In particular, the request might still get sent
* to the server, as its processing might already have started asynchronously
* in another thread, and the underlying resources may only be released
* asynchronously.
* <ul>
* <li>With HTTP/1.1, an attempt to cancel may cause the underlying
* connection to be closed abruptly.
* <li>With HTTP/2, an attempt to cancel may cause the stream to be reset,
* or in certain circumstances, may also cause the connection to be
* closed abruptly, if, for instance, the thread is currently trying
* to write to the underlying socket.
* </ul>
*
* @param <T> the response body type
* @param request the request
* @param responseBodyHandler the response body handler
@@ -588,6 +608,24 @@ public enum Redirect {
* information.</li>
* </ul>
*
* <p> The default {@code HttpClient} implementation returns
* {@code CompletableFuture} objects that are <em>cancelable</em>.
* {@code CompletableFuture} objects {@linkplain CompletableFuture#newIncompleteFuture()
* derived} from cancelable futures are themselves <em>cancelable</em>.
* Invoking {@linkplain CompletableFuture#cancel(boolean) cancel(true)}
* on a cancelable future that is not completed, attempts to cancel the HTTP exchange
* in an effort to release underlying resources as soon as possible.
* No guarantee is made as to exactly <em>when</em> the cancellation request
* may be taken into account. In particular, the request might still get sent
* to the server, as its processing might already have started asynchronously
* in another thread, and the underlying resources may only be released
* asynchronously.
* <ul>
* <li>With HTTP/1.1, an attempt to cancel may cause the underlying connection
* to be closed abruptly.
* <li>With HTTP/2, an attempt to cancel may cause the stream to be reset.
* </ul>
*
* @param <T> the response body type
* @param request the request
* @param responseBodyHandler the response body handler
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -81,7 +81,8 @@
final AccessControlContext acc;
final MultiExchange<T> multi;
final Executor parentExecutor;
boolean upgrading; // to HTTP/2
volatile boolean upgrading; // to HTTP/2
volatile boolean upgraded; // to HTTP/2
final PushGroup<T> pushGroup;
final String dbgTag;

@@ -139,12 +140,15 @@ HttpClientImpl client() {
// exchange so that it can be aborted/timed out mid setup.
static final class ConnectionAborter {
private volatile HttpConnection connection;
private volatile boolean closeRequested;

void connection(HttpConnection connection) {
this.connection = connection;
if (closeRequested) closeConnection();
}

void closeConnection() {
closeRequested = true;
HttpConnection connection = this.connection;
this.connection = null;
if (connection != null) {
@@ -155,6 +159,11 @@ void closeConnection() {
}
}
}

void disable() {
connection = null;
closeRequested = false;
}
}

// Called for 204 response - when no body is permitted
@@ -278,6 +287,30 @@ private void checkCancelled() {
}
}

<T> CompletableFuture<T> checkCancelled(CompletableFuture<T> cf, HttpConnection connection) {
return cf.handle((r,t) -> {
if (t == null) {
if (multi.requestCancelled()) {
// if upgraded, we don't close the connection.
// cancelling will be handled by the HTTP/2 exchange
// in its own time.
if (!upgraded) {
t = getCancelCause();
if (t == null) t = new IOException("Request cancelled");
if (debug.on()) debug.log("exchange cancelled during connect: " + t);
try {
connection.close();
} catch (Throwable x) {
if (debug.on()) debug.log("Failed to close connection", x);
}
return MinimalFuture.<T>failedFuture(t);
}
}
}
return cf;
}).thenCompose(Function.identity());
}

public void h2Upgrade() {
upgrading = true;
request.setH2Upgrade(client.client2());
@@ -299,7 +332,10 @@ synchronized IOException getCancelCause() {
Throwable t = getCancelCause();
checkCancelled();
if (t != null) {
return MinimalFuture.failedFuture(t);
if (debug.on()) {
debug.log("exchange was cancelled: returned failed cf (%s)", String.valueOf(t));
}
return exchangeCF = MinimalFuture.failedFuture(t);
}

CompletableFuture<? extends ExchangeImpl<T>> cf, res;
@@ -481,11 +517,14 @@ HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
debug.log("Ignored body");
// we pass e::getBuffer to allow the ByteBuffers to accumulate
// while we build the Http2Connection
ex.upgraded();
upgraded = true;
return Http2Connection.createAsync(e.connection(),
client.client2(),
this, e::drainLeftOverBytes)
.thenCompose((Http2Connection c) -> {
boolean cached = c.offerConnection();
if (cached) connectionAborter.disable();
Stream<T> s = c.getStream(1);

if (s == null) {
@@ -520,11 +559,12 @@ HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
}
// Check whether the HTTP/1.1 was cancelled.
if (t == null) t = e.getCancelCause();
// if HTTP/1.1 exchange was timed out, don't
// try to go further.
if (t instanceof HttpTimeoutException) {
s.cancelImpl(t);
return MinimalFuture.failedFuture(t);
// if HTTP/1.1 exchange was timed out, or the request
// was cancelled don't try to go further.
if (t instanceof HttpTimeoutException || multi.requestCancelled()) {
if (t == null) t = new IOException("Request cancelled");
s.cancelImpl(t);
return MinimalFuture.failedFuture(t);
}
if (debug.on())
debug.log("Getting response async %s", s);
@@ -228,4 +228,9 @@ abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
* @return the cause for which this exchange was canceled, if available.
*/
abstract Throwable getCancelCause();

// Mark the exchange as upgraded
// Needed to handle cancellation during the upgrade from
// Http1Exchange to Stream
void upgraded() { }
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -62,6 +62,7 @@
final HttpClientImpl client;
final Executor executor;
private final Http1AsyncReceiver asyncReceiver;
private volatile boolean upgraded;

/** Records a possible cancellation raised before any operation
* has been initiated, or an error received while sending the request. */
@@ -487,10 +488,15 @@ private void cancelImpl(Throwable cause) {
}
}
} finally {
connection.close();
if (!upgraded)
connection.close();
}
}

void upgraded() {
upgraded = true;
}

private void runInline(Runnable run) {
assert !client.isSelectorThread();
run.run();
@@ -543,6 +549,16 @@ private void requestMoreBody() {
}
}

private void cancelUpstreamSubscription() {
final Executor exec = client.theExecutor();
if (debug.on()) debug.log("cancelling upstream publisher");
if (bodySubscriber != null) {
exec.execute(bodySubscriber::cancelSubscription);
} else if (debug.on()) {
debug.log("bodySubscriber is null");
}
}

// Invoked only by the publisher
// ALL tasks should execute off the Selector-Manager thread
/** Returns the next portion of the HTTP request, or the error. */
@@ -551,12 +567,7 @@ private DataPair getOutgoing() {
final DataPair dp = outgoing.pollFirst();

if (writePublisher.cancelled) {
if (debug.on()) debug.log("cancelling upstream publisher");
if (bodySubscriber != null) {
exec.execute(bodySubscriber::cancelSubscription);
} else if (debug.on()) {
debug.log("bodySubscriber is null");
}
cancelUpstreamSubscription();
headersSentCF.completeAsync(() -> this, exec);
bodySentCF.completeAsync(() -> this, exec);
return null;
@@ -642,6 +653,30 @@ String dbgString() {
return tag;
}

@SuppressWarnings("fallthrough")
private boolean checkRequestCancelled() {
if (exchange.multi.requestCancelled()) {
if (debug.on()) debug.log("request cancelled");
if (subscriber == null) {
if (debug.on()) debug.log("no subscriber yet");
return true;
}
switch (state) {
case BODY:
cancelUpstreamSubscription();
// fall trough to HEADERS
case HEADERS:
Throwable cause = getCancelCause();
if (cause == null) cause = new IOException("Request cancelled");
subscriber.onError(cause);
writeScheduler.stop();
return true;
}
}
return false;
}


final class WriteTask implements Runnable {
@Override
public void run() {
@@ -655,10 +690,13 @@ public void run() {
return;
}

if (checkRequestCancelled()) return;

if (subscriber == null) {
if (debug.on()) debug.log("no subscriber yet");
return;
}

if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
while (hasOutgoing() && demand.tryDecrement()) {
DataPair dp = getOutgoing();
@@ -683,6 +721,7 @@ public void run() {
// The next Subscriber will eventually take over.

} else {
if (checkRequestCancelled()) return;
if (debug.on())
debug.log("onNext with " + Utils.remaining(data) + " bytes");
subscriber.onNext(data);
@@ -526,6 +526,10 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
throws IOException, InterruptedException
{
CompletableFuture<HttpResponse<T>> cf = null;

// if the thread is already interrupted no need to go further.
// cf.get() would throw anyway.
if (Thread.interrupted()) throw new InterruptedException();
try {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();

0 comments on commit b02054b

Please sign in to comment.