Skip to content

Commit

Permalink
8294916: Cancelling a request must eventually cause its response body…
Browse files Browse the repository at this point in the history
… subscriber to be unregistered

Reviewed-by: michaelm, jpai
  • Loading branch information
dfuch committed Oct 20, 2022
1 parent 4f994c0 commit dcd4650
Show file tree
Hide file tree
Showing 9 changed files with 644 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;

import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
Expand Down Expand Up @@ -210,11 +211,19 @@ static final class Http1ResponseBodySubscriber<U> extends HttpBodySubscriberWrap
@Override
protected void complete(Throwable t) {
try {
exchange.responseSubscriberCompleted(this);
exchange.unregisterResponseSubscriber(this);
} finally {
super.complete(t);
}
}

@Override
protected void onCancel() {
// If the subscription is cancelled the
// subscriber may or may not get completed.
// Therefore we need to unregister it
exchange.unregisterResponseSubscriber(this);
}
}

@Override
Expand Down Expand Up @@ -264,7 +273,7 @@ private void connectFlows(HttpConnection connection) {
// The Http1ResponseBodySubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
private void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
Throwable failed = null;
synchronized (lock) {
failed = this.failed;
Expand All @@ -279,8 +288,8 @@ void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
}
}

void responseSubscriberCompleted(HttpBodySubscriberWrapper<T> subscriber) {
client.subscriberCompleted(subscriber);
private void unregisterResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
client.unregisterSubscriber(subscriber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
private final AtomicLong pendingHttpRequestCount = new AtomicLong();
private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
private final AtomicLong pendingTCPConnectionCount = new AtomicLong();
private final AtomicLong pendingSubscribersCount = new AtomicLong();
private final AtomicBoolean isAlive = new AtomicBoolean();

/** A Set of, deadline first, ordered timeout events. */
Expand Down Expand Up @@ -548,16 +549,26 @@ public void registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (!selmgr.isClosed()) {
synchronized (selmgr) {
if (!selmgr.isClosed()) {
subscribers.add(subscriber);
if (subscribers.add(subscriber)) {
long count = pendingSubscribersCount.incrementAndGet();
if (debug.on()) {
debug.log("body subscriber registered: " + count);
}
}
return;
}
}
}
subscriber.onError(selmgr.selectorClosedException());
}

public void subscriberCompleted(HttpBodySubscriberWrapper<?> subscriber) {
subscribers.remove(subscriber);
public void unregisterSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (subscribers.remove(subscriber)) {
long count = pendingSubscribersCount.decrementAndGet();
if (debug.on()) {
debug.log("body subscriber unregistered: " + count);
}
}
}

private void closeConnection(HttpConnection conn) {
Expand Down Expand Up @@ -627,7 +638,7 @@ final long unreference() {
final long httpCount = pendingHttpOperationsCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
if (count == 0 && facadeRef.refersTo(null)) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
Expand All @@ -649,7 +660,7 @@ final long streamUnreference() {
final long http2Count = pendingHttp2StreamCount.decrementAndGet();
final long httpCount = pendingHttpOperationsCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
if (count == 0 && facadeRef.refersTo(null)) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
Expand All @@ -671,7 +682,7 @@ final long webSocketClose() {
final long webSocketCount = pendingWebSocketCount.decrementAndGet();
final long httpCount = pendingHttpOperationsCount.get();
final long http2Count = pendingHttp2StreamCount.get();
if (count == 0 && facade() == null) {
if (count == 0 && facadeRef.refersTo(null)) {
selmgr.wakeupSelector();
}
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
Expand All @@ -697,6 +708,7 @@ static final class HttpClientTracker implements Tracker {
final AtomicLong websocketCount;
final AtomicLong operationsCount;
final AtomicLong connnectionsCount;
final AtomicLong subscribersCount;
final Reference<?> reference;
final AtomicBoolean isAlive;
final String name;
Expand All @@ -706,6 +718,7 @@ static final class HttpClientTracker implements Tracker {
AtomicLong ws,
AtomicLong ops,
AtomicLong conns,
AtomicLong subscribers,
Reference<?> ref,
AtomicBoolean isAlive,
String name) {
Expand All @@ -715,11 +728,16 @@ static final class HttpClientTracker implements Tracker {
this.websocketCount = ws;
this.operationsCount = ops;
this.connnectionsCount = conns;
this.subscribersCount = subscribers;
this.reference = ref;
this.isAlive = isAlive;
this.name = name;
}
@Override
public long getOutstandingSubscribers() {
return subscribersCount.get();
}
@Override
public long getOutstandingOperations() {
return operationsCount.get();
}
Expand Down Expand Up @@ -759,6 +777,7 @@ public Tracker getOperationsTracker() {
pendingWebSocketCount,
pendingOperationCount,
pendingTCPConnectionCount,
pendingSubscribersCount,
facadeRef,
isAlive,
dbgTag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ private void registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscri
client().registerSubscriber(subscriber);
}

private void subscriberCompleted(Http2StreamResponseSubscriber<?> subscriber) {
client().subscriberCompleted(subscriber);
private void unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
client().unregisterSubscriber(subscriber);
}

@Override
Expand Down Expand Up @@ -1546,11 +1546,15 @@ final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U
@Override
protected void complete(Throwable t) {
try {
Stream.this.subscriberCompleted(this);
Stream.this.unregisterResponseSubscriber(this);
} finally {
super.complete(t);
}
}
@Override
protected void onCancel() {
Stream.this.unregisterResponseSubscriber(this);
}
}

private static final VarHandle STREAM_STATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
Expand Down Expand Up @@ -61,12 +62,33 @@ public void cancel() { }
final BodySubscriber<T> userSubscriber;
final AtomicBoolean completed = new AtomicBoolean();
final AtomicBoolean subscribed = new AtomicBoolean();
volatile Subscription subscription;
volatile SubscriptionWrapper subscription;
volatile Throwable withError;
public HttpBodySubscriberWrapper(BodySubscriber<T> userSubscriber) {
this.userSubscriber = userSubscriber;
}

private class SubscriptionWrapper implements Subscription {
final Subscription subscription;
SubscriptionWrapper(Subscription s) {
this.subscription = Objects.requireNonNull(s);
}
@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
try {
subscription.cancel();
onCancel();
} catch (Throwable t) {
onError(t);
}
}
}

final long id() { return id; }

@Override
Expand Down Expand Up @@ -97,6 +119,14 @@ private void propagateError(Throwable t) {
}
}

/**
* Called when the subscriber cancels its subscription.
* @apiNote
* This method may be used by subclasses to perform cleanup
* actions after a subscription has been cancelled.
*/
protected void onCancel() { }

/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
Expand Down Expand Up @@ -137,12 +167,12 @@ public CompletionStage<T> getBody() {

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// race condition with propagateError: we need to wait until
// subscription is finished before calling onError;
synchronized (this) {
if (subscribed.compareAndSet(false, true)) {
userSubscriber.onSubscribe(subscription);
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped);
} else {
// could be already subscribed and completed
// if an unexpected error occurred before the actual
Expand All @@ -156,8 +186,9 @@ public void onSubscribe(Flow.Subscription subscription) {
@Override
public void onNext(List<ByteBuffer> item) {
if (completed.get()) {
SubscriptionWrapper subscription = this.subscription;
if (subscription != null) {
subscription.cancel();
subscription.subscription.cancel();
}
} else {
userSubscriber.onNext(item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public interface Tracker {
long getOutstandingWebSocketOperations();
// number of TCP connections still opened
long getOutstandingTcpConnections();
// number of body subscribers not yet completed or canceled
long getOutstandingSubscribers();
// Whether the facade returned to the
// user is still referenced
boolean isFacadeReferenced();
Expand Down
22 changes: 22 additions & 0 deletions test/jdk/java/net/httpclient/CancelRequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ref.Reference;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
Expand Down Expand Up @@ -377,6 +378,13 @@ public void testGetSendAsync(String uri, boolean sameClient, boolean mayInterrup
assertEquals(cf2.isDone(), true);
assertEquals(cf2.isCancelled(), false);
assertEquals(latch.getCount(), 0);

var error = TRACKER.check(1,
(t) -> t.getOutstandingOperations() > 0 || t.getOutstandingSubscribers() > 0,
"subscribers for testGetSendAsync(%s)\n\t step [%s]".formatted(req.uri(), i),
false);
Reference.reachabilityFence(client);
if (error != null) throw error;
}
}

Expand Down Expand Up @@ -481,6 +489,13 @@ public Iterator<byte[]> iterator() {
assertEquals(cf2.isDone(), true);
assertEquals(cf2.isCancelled(), false);
assertEquals(latch.getCount(), 0);

var error = TRACKER.check(1,
(t) -> t.getOutstandingOperations() > 0 || t.getOutstandingSubscribers() > 0,
"subscribers for testPostSendAsync(%s)\n\t step [%s]".formatted(req.uri(), i),
false);
Reference.reachabilityFence(client);
if (error != null) throw error;
}
}

Expand Down Expand Up @@ -536,6 +551,13 @@ public void testPostInterrupt(String uri, boolean sameClient)
assertEquals(body, Stream.of(BODY.split("\\|")).collect(Collectors.joining()));
throw failed;
}

var error = TRACKER.check(1,
(t) -> t.getOutstandingOperations() > 0 || t.getOutstandingSubscribers() > 0,
"subscribers for testPostInterrupt(%s)\n\t step [%s]".formatted(req.uri(), i),
false);
Reference.reachabilityFence(client);
if (error != null) throw error;
}
}

Expand Down

3 comments on commit dcd4650

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varada1110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/backport jdk17u-dev

@openjdk
Copy link

@openjdk openjdk bot commented on dcd4650 Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varada1110 Could not automatically backport dcd46501 to openjdk/jdk17u-dev due to conflicts in the following files:

  • src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
  • src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
  • src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
  • src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java
  • src/java.net.http/share/classes/jdk/internal/net/http/common/OperationTrackers.java
  • test/jdk/java/net/httpclient/ReferenceTracker.java

Please fetch the appropriate branch/commit and manually resolve these conflicts by using the following commands in your personal fork of openjdk/jdk17u-dev. Note: these commands are just some suggestions and you can use other equivalent commands you know.

# Fetch the up-to-date version of the target branch
$ git fetch --no-tags https://git.openjdk.org/jdk17u-dev.git master:master

# Check out the target branch and create your own branch to backport
$ git checkout master
$ git checkout -b varada1110-backport-dcd46501

# Fetch the commit you want to backport
$ git fetch --no-tags https://git.openjdk.org/jdk.git dcd46501e6a25ac875d61bfbd412555b933ce34c

# Backport the commit
$ git cherry-pick --no-commit dcd46501e6a25ac875d61bfbd412555b933ce34c
# Resolve conflicts now

# Commit the files you have modified
$ git add files/with/resolved/conflicts
$ git commit -m 'Backport dcd46501e6a25ac875d61bfbd412555b933ce34c'

Once you have resolved the conflicts as explained above continue with creating a pull request towards the openjdk/jdk17u-dev with the title Backport dcd46501e6a25ac875d61bfbd412555b933ce34c.

Please sign in to comment.