Skip to content

Commit

Permalink
8316580: HttpClient with StructuredTaskScope does not close when a ta…
Browse files Browse the repository at this point in the history
…sk fails

Reviewed-by: djelinski
  • Loading branch information
dfuch committed Sep 28, 2023
1 parent 065203d commit fc98998
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,23 +283,25 @@ public String toString() {
}
}

static void registerPending(PendingRequest pending) {
static <T> CompletableFuture<T> registerPending(PendingRequest pending, CompletableFuture<T> res) {
// shortcut if cf is already completed: no need to go through the trouble of
// registering it
if (pending.cf.isDone()) return;
if (pending.cf.isDone()) return res;

var client = pending.client;
var cf = pending.cf;
var id = pending.id;
boolean added = client.pendingRequests.add(pending);
// this may immediately remove `pending` from the set is the cf is already completed
pending.ref = cf.whenComplete((r,t) -> client.pendingRequests.remove(pending));
var ref = res.whenComplete((r,t) -> client.pendingRequests.remove(pending));
pending.ref = ref;
assert added : "request %d was already added".formatted(id);
// should not happen, unless the selector manager has already
// exited abnormally
if (client.selmgr.isClosed()) {
pending.abort(client.selmgr.selectorClosedException());
}
return ref;
}

static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
Expand Down Expand Up @@ -931,8 +933,9 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();
} catch (InterruptedException ie) {
if (cf != null )
if (cf != null) {
cf.cancel(true);
}
throw ie;
} catch (ExecutionException e) {
final Throwable throwable = e.getCause();
Expand Down Expand Up @@ -1052,19 +1055,23 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}

// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}

// The mexCf is the Cf we need to abort if the SelectorManager thread
// is aborted.
PendingRequest pending = new PendingRequest(id, requestImpl, mexCf, mex, this);
registerPending(pending);
return res;
} catch(Throwable t) {
res = registerPending(pending, res);

if (exchangeExecutor != null) {
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
return res.isDone() ? res
: res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
} else {
// make a defensive copy that can be safely canceled
// by the caller
return res.isDone() ? res : res.copy();
}
} catch (Throwable t) {
requestUnreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class MultiExchange<T> implements Cancelable {
Exchange<T> previous;
volatile Throwable retryCause;
volatile boolean expiredOnce;
volatile HttpResponse<T> response = null;
volatile HttpResponse<T> response;

// Maximum number of times a request will be retried/redirected
// for any reason
Expand Down Expand Up @@ -278,13 +278,18 @@ public void cancel(IOException cause) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.cancelled;
boolean firstCancel = false;
if (!cancelled && mayInterruptIfRunning) {
if (interrupted.get() == null) {
interrupted.compareAndSet(null,
firstCancel = interrupted.compareAndSet(null,
new CancellationException("Request cancelled"));
debug.log("multi exchange recording: " + interrupted.get());
} else {
debug.log("multi exchange recorded: " + interrupted.get());
}
if (debug.on()) {
if (firstCancel) {
debug.log("multi exchange recording: " + interrupted.get());
} else {
debug.log("multi exchange recorded: " + interrupted.get());
}
}
this.cancelled = true;
var exchange = getExchange();
Expand Down Expand Up @@ -377,17 +382,30 @@ private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T>
}).exceptionallyCompose(this::whenCancelled);
}

// returns a CancellationExcpetion that wraps the given cause
// if cancel(boolean) was called, the given cause otherwise
private Throwable wrapIfCancelled(Throwable cause) {
CancellationException interrupt = interrupted.get();
if (interrupt == null) return cause;

var cancel = new CancellationException(interrupt.getMessage());
// preserve the stack trace of the original exception to
// show where the call to cancel(boolean) came from
cancel.setStackTrace(interrupt.getStackTrace());
cancel.initCause(Utils.getCancelCause(cause));
return cancel;
}

// if the request failed because the multi exchange was cancelled,
// make sure the reported exception is wrapped in CancellationException
private CompletableFuture<HttpResponse<T>> whenCancelled(Throwable t) {
CancellationException x = interrupted.get();
if (x != null) {
// make sure to fail with CancellationException if cancel(true)
// was called.
t = x.initCause(Utils.getCancelCause(t));
var x = wrapIfCancelled(t);
if (x instanceof CancellationException) {
if (debug.on()) {
debug.log("MultiExchange interrupted with: " + t.getCause());
debug.log("MultiExchange interrupted with: " + x.getCause());
}
}
return MinimalFuture.failedFuture(t);
return MinimalFuture.failedFuture(x);
}

static class NullSubscription implements Flow.Subscription {
Expand Down

1 comment on commit fc98998

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.