Skip to content

Commit

Permalink
8277969: HttpClient SelectorManager shuts down when custom Executor r…
Browse files Browse the repository at this point in the history
…ejects a task

Reviewed-by: jpai, michaelm
  • Loading branch information
dfuch committed Apr 20, 2022
1 parent 6c6d522 commit 5291ec8
Show file tree
Hide file tree
Showing 24 changed files with 1,862 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
package jdk.internal.net.http;

import java.io.IOException;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.URI;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -26,11 +26,12 @@
package jdk.internal.net.http;

import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.ResponseInfo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
Expand Down Expand Up @@ -66,7 +67,7 @@ final Exchange<T> getExchange() {
return exchange;
}

HttpClient client() {
HttpClientImpl client() {
return exchange.client();
}

Expand Down Expand Up @@ -181,6 +182,22 @@ abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor);

/**
* Creates and wraps an {@link HttpResponse.BodySubscriber} from a {@link
* HttpResponse.BodyHandler} for the given {@link ResponseInfo}.
* An {@code HttpBodySubscriberWrapper} wraps a response body subscriber and makes
* sure its completed/onError methods are called only once, and that its onSusbscribe
* is called before onError. This is useful when errors occur asynchronously, and
* most typically when the error occurs before the {@code BodySubscriber} has
* subscribed.
* @param handler a body handler
* @param response a response info
* @return a new {@code HttpBodySubscriberWrapper} to handle the response
*/
HttpBodySubscriberWrapper<T> createResponseSubscriber(HttpResponse.BodyHandler<T> handler, ResponseInfo response) {
return new HttpBodySubscriberWrapper<>(handler.apply(response));
}

/**
* Ignore/consume the body.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -497,7 +497,8 @@ void onReadError(Throwable ex) {
final Throwable t = (recorded == null ? ex : recorded);
if (debug.on())
debug.log("recorded " + t + "\n\t delegate: " + delegate
+ "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
+ "\n\t queue.isEmpty: " + queue.isEmpty()
+ "\n\tstopRequested: " + stopRequested, ex);
if (Log.errors()) {
Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -27,10 +27,10 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand All @@ -39,15 +39,16 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail;

/**
* Encapsulates one HTTP/1.1 request/response exchange.
Expand Down Expand Up @@ -78,16 +79,18 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();

/** The write publisher, responsible for writing the complete request ( both
* headers and body ( if any ). */
* headers and body ( if any )). */
private final Http1Publisher writePublisher = new Http1Publisher();

/** Completed when the header have been published, or there is an error */
private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>();
/** Completed when the body has been published, or there is an error */
private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();

/** The subscriber to the request's body published. Maybe null. */
private volatile Http1BodySubscriber bodySubscriber;
/** The subscriber to the request's body published. May be null. */
private volatile Http1RequestBodySubscriber bodySubscriber;
/** The subscriber to the response's body received. May be null. */
private volatile BodySubscriber<T> responseSubscriber;

enum State { INITIAL,
HEADERS,
Expand Down Expand Up @@ -117,12 +120,12 @@ public String toString() {
* concrete implementations: {@link Http1Request.StreamSubscriber}, and
* {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
* fixed length bodies, respectively. */
abstract static class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
abstract static class Http1RequestBodySubscriber implements Flow.Subscriber<ByteBuffer> {
final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
private volatile Flow.Subscription subscription;
volatile boolean complete;
private final Logger debug;
Http1BodySubscriber(Logger debug) {
Http1RequestBodySubscriber(Logger debug) {
assert debug != null;
this.debug = debug;
}
Expand Down Expand Up @@ -159,8 +162,8 @@ final void cancelSubscription() {
}
}

static Http1BodySubscriber completeSubscriber(Logger debug) {
return new Http1BodySubscriber(debug) {
static Http1RequestBodySubscriber completeSubscriber(Logger debug) {
return new Http1RequestBodySubscriber(debug) {
@Override public void onSubscribe(Flow.Subscription subscription) { error(); }
@Override public void onNext(ByteBuffer item) { error(); }
@Override public void onError(Throwable throwable) { error(); }
Expand All @@ -173,6 +176,34 @@ private void error() {
}
}

/**
* The Http1AsyncReceiver ensures that all calls to
* the subscriber, including onSubscribe, occur sequentially.
* There could however be some race conditions that could happen
* in case of unexpected errors thrown at unexpected places, which
* may cause onError to be called multiple times.
* The Http1BodySubscriber will ensure that the user subscriber
* is actually completed only once - and only after it is
* subscribed.
* @param <U> The type of response.
*/
static final class Http1ResponseBodySubscriber<U> extends HttpBodySubscriberWrapper<U> {
final Http1Exchange<U> exchange;
Http1ResponseBodySubscriber(BodySubscriber<U> userSubscriber, Http1Exchange<U> exchange) {
super(userSubscriber);
this.exchange = exchange;
}

@Override
protected void complete(Throwable t) {
try {
exchange.responseSubscriberCompleted(this);
} finally {
super.complete(t);
}
}
}

@Override
public String toString() {
return "HTTP/1.1 " + request.toString();
Expand Down Expand Up @@ -217,6 +248,28 @@ private void connectFlows(HttpConnection connection) {
asyncReceiver.subscriber());
}

// The Http1ResponseBodySubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
Throwable failed = null;
synchronized (lock) {
failed = this.failed;
if (failed == null) {
this.responseSubscriber = subscriber;
}
}
if (failed != null) {
subscriber.onError(failed);
} else {
client.registerSubscriber(subscriber);
}
}

void responseSubscriberCompleted(HttpBodySubscriberWrapper<T> subscriber) {
client.subscriberCompleted(subscriber);
}

@Override
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
// create the response before sending the request headers, so that
Expand Down Expand Up @@ -321,12 +374,12 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
if (debug.on()) debug.log("bodySubscriber is %s",
bodySubscriber == null ? null : bodySubscriber.getClass());
if (bodySubscriber == null) {
bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1BodySubscriber.COMPLETED);
bodySubscriber = Http1RequestBodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1RequestBodySubscriber.COMPLETED);
} else {
// start
bodySubscriber.whenSubscribed
.thenAccept((s) -> cancelIfFailed(s))
.thenAccept(this::cancelIfFailed)
.thenAccept((s) -> requestMoreBody());
}
} catch (Throwable t) {
Expand Down Expand Up @@ -370,15 +423,24 @@ CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(),
response.responseHeaders(),
HTTP_1_1));
var responseInfo = new ResponseInfoImpl(response.responseCode(),
response.responseHeaders(), HTTP_1_1);
BodySubscriber<T> bs = createResponseSubscriber(handler, responseInfo);
CompletableFuture<T> bodyCF = response.readBody(bs,
returnConnectionToPool,
executor);
return bodyCF;
}

@Override
Http1ResponseBodySubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
BodySubscriber<T> subscriber = handler.apply(response);
Http1ResponseBodySubscriber<T> bs =
new Http1ResponseBodySubscriber<T>(subscriber, this);
registerResponseSubscriber(bs);
return bs;
}

@Override
CompletableFuture<Void> ignoreBody() {
return response.ignoreBody(executor);
Expand Down Expand Up @@ -430,8 +492,10 @@ void cancel(IOException cause) {
private void cancelImpl(Throwable cause) {
LinkedList<CompletableFuture<?>> toComplete = null;
int count = 0;
Throwable error;
Throwable error = null;
BodySubscriber<?> subscriber;
synchronized (lock) {
subscriber = responseSubscriber;
if ((error = failed) == null) {
failed = error = cause;
}
Expand Down Expand Up @@ -464,6 +528,15 @@ private void cancelImpl(Throwable cause) {
operations.clear();
}
}

// complete subscriber if needed
if (subscriber != null && error != null) {
var failure = error;
if (client.isSelectorThread()) {
executor.execute(() -> subscriber.onError(failure));
} else subscriber.onError(failure);
}

try {
Log.logError("Http1Exchange.cancel: count=" + count);
if (toComplete != null) {
Expand Down Expand Up @@ -598,7 +671,7 @@ private DataPair getOutgoing() {
headersSentCF.completeAsync(() -> this, exec);
break;
case BODY:
if (dp.data == Http1BodySubscriber.COMPLETED) {
if (dp.data == Http1RequestBodySubscriber.COMPLETED) {
synchronized (lock) {
state = State.COMPLETING;
}
Expand Down Expand Up @@ -709,7 +782,7 @@ public void run() {
writeScheduler.stop();
} else {
List<ByteBuffer> data = dp.data;
if (data == Http1BodySubscriber.COMPLETED) {
if (data == Http1RequestBodySubscriber.COMPLETED) {
synchronized (lock) {
assert state == State.COMPLETING : "Unexpected state:" + state;
state = State.COMPLETED;
Expand Down Expand Up @@ -754,7 +827,8 @@ public void cancel() {
}
}

HttpClient client() {
@Override
final HttpClientImpl client() {
return client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -38,7 +38,7 @@
import java.util.function.BiPredicate;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import jdk.internal.net.http.Http1Exchange.Http1BodySubscriber;
import jdk.internal.net.http.Http1Exchange.Http1RequestBodySubscriber;
import jdk.internal.net.http.common.HttpHeadersBuilder;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
Expand Down Expand Up @@ -314,8 +314,8 @@ List<ByteBuffer> headers() {
return List.of(b);
}

Http1BodySubscriber continueRequest() {
Http1BodySubscriber subscriber;
Http1RequestBodySubscriber continueRequest() {
Http1RequestBodySubscriber subscriber;
if (streaming) {
subscriber = new StreamSubscriber();
requestPublisher.subscribe(subscriber);
Expand All @@ -329,7 +329,7 @@ Http1BodySubscriber continueRequest() {
return subscriber;
}

final class StreamSubscriber extends Http1BodySubscriber {
final class StreamSubscriber extends Http1RequestBodySubscriber {

StreamSubscriber() { super(debug); }

Expand Down Expand Up @@ -392,7 +392,7 @@ public void onComplete() {
}
}

final class FixedContentSubscriber extends Http1BodySubscriber {
final class FixedContentSubscriber extends Http1RequestBodySubscriber {

private volatile long contentWritten;
FixedContentSubscriber() { super(debug); }
Expand Down
Loading

3 comments on commit 5291ec8

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varada1110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/backport jdk17u-dev

@openjdk
Copy link

@openjdk openjdk bot commented on 5291ec8 Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varada1110 Could not automatically backport 5291ec8d to openjdk/jdk17u-dev due to conflicts in the following files:

  • src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
  • src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java
  • src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
  • src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java
  • src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
  • src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java

Please fetch the appropriate branch/commit and manually resolve these conflicts by using the following commands in your personal fork of openjdk/jdk17u-dev. Note: these commands are just some suggestions and you can use other equivalent commands you know.

# Fetch the up-to-date version of the target branch
$ git fetch --no-tags https://git.openjdk.org/jdk17u-dev.git master:master

# Check out the target branch and create your own branch to backport
$ git checkout master
$ git checkout -b varada1110-backport-5291ec8d

# Fetch the commit you want to backport
$ git fetch --no-tags https://git.openjdk.org/jdk.git 5291ec8d56b0e89aa96c3d53d9dcf093480cf48f

# Backport the commit
$ git cherry-pick --no-commit 5291ec8d56b0e89aa96c3d53d9dcf093480cf48f
# Resolve conflicts now

# Commit the files you have modified
$ git add files/with/resolved/conflicts
$ git commit -m 'Backport 5291ec8d56b0e89aa96c3d53d9dcf093480cf48f'

Once you have resolved the conflicts as explained above continue with creating a pull request towards the openjdk/jdk17u-dev with the title Backport 5291ec8d56b0e89aa96c3d53d9dcf093480cf48f.

Please sign in to comment.