Skip to content

Commit

Permalink
8297149: REDO JDK-8296889: Race condition when cancelling a request
Browse files Browse the repository at this point in the history
8297075: java/net/httpclient/CancelStreamedBodyTest.java fails with "java.lang.AssertionError: WARNING: tracker for HttpClientImpl(1) has outstanding operations"

Reviewed-by: jpai
  • Loading branch information
dfuch committed Nov 17, 2022
1 parent 4120db1 commit 134acab
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ static final class Http1ResponseBodySubscriber<U> extends HttpBodySubscriberWrap
this.exchange = exchange;
}

@Override
protected void onSubscribed() {
exchange.registerResponseSubscriber(this);
}

@Override
protected void complete(Throwable t) {
try {
Expand Down Expand Up @@ -459,7 +464,6 @@ Http1ResponseBodySubscriber<T> createResponseSubscriber(BodyHandler<T> handler,
BodySubscriber<T> subscriber = handler.apply(response);
Http1ResponseBodySubscriber<T> bs =
new Http1ResponseBodySubscriber<T>(subscriber, this);
registerResponseSubscriber(bs);
return bs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
Http2StreamResponseSubscriber<T> subscriber =
new Http2StreamResponseSubscriber<>(handler.apply(response));
registerResponseSubscriber(subscriber);
return subscriber;
}

Expand Down Expand Up @@ -1543,17 +1542,22 @@ final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U
super(subscriber);
}

@Override
protected void onSubscribed() {
registerResponseSubscriber(this);
}

@Override
protected void complete(Throwable t) {
try {
Stream.this.unregisterResponseSubscriber(this);
unregisterResponseSubscriber(this);
} finally {
super.complete(t);
}
}
@Override
protected void onCancel() {
Stream.this.unregisterResponseSubscriber(this);
unregisterResponseSubscriber(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;

Expand Down Expand Up @@ -62,6 +64,7 @@ public void cancel() { }
final BodySubscriber<T> userSubscriber;
final AtomicBoolean completed = new AtomicBoolean();
final AtomicBoolean subscribed = new AtomicBoolean();
final ReentrantLock subscriptionLock = new ReentrantLock();
volatile SubscriptionWrapper subscription;
volatile Throwable withError;
public HttpBodySubscriberWrapper(BodySubscriber<T> userSubscriber) {
Expand Down Expand Up @@ -100,16 +103,20 @@ public boolean needsExecutor() {
// subscribed yet.
private void propagateError(Throwable t) {
assert t != null;
assert completed.get();
try {
// if unsubscribed at this point, it will not
// get subscribed later - so do it now and
// propagate the error
// Race condition with onSubscribe: we need to wait until
// subscription is finished before calling onError;
synchronized (this) {
subscriptionLock.lock();
try {
if (subscribed.compareAndSet(false, true)) {
userSubscriber.onSubscribe(NOP);
}
} finally {
subscriptionLock.unlock();
}
} finally {
// if onError throws then there is nothing to do
Expand All @@ -127,6 +134,15 @@ private void propagateError(Throwable t) {
*/
protected void onCancel() { }

/**
* Called right before the userSubscriber::onSubscribe is called.
* @apiNote
* This method may be used by subclasses to perform cleanup
* related actions after a subscription has been succesfully
* accepted.
*/
protected void onSubscribed() { }

/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
Expand Down Expand Up @@ -169,22 +185,23 @@ public CompletionStage<T> getBody() {
public void onSubscribe(Flow.Subscription subscription) {
// race condition with propagateError: we need to wait until
// subscription is finished before calling onError;
synchronized (this) {
subscriptionLock.lock();
try {
if (subscribed.compareAndSet(false, true)) {
onSubscribed();
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped);
} else {
// could be already subscribed and completed
// if an unexpected error occurred before the actual
// subscription - though that's not supposed
// happen.
assert completed.get();
subscription.cancel();
}
} finally {
subscriptionLock.unlock();
}
}

@Override
public void onNext(List<ByteBuffer> item) {
assert subscribed.get();
if (completed.get()) {
SubscriptionWrapper subscription = this.subscription;
if (subscription != null) {
Expand Down
2 changes: 1 addition & 1 deletion test/jdk/java/net/httpclient/CancelRequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/*
* @test
* @bug 8245462 8229822 8254786
* @bug 8245462 8229822 8254786 8297075 8297149
* @summary Tests cancelling the request.
* @library /test/lib http2/server
* @key randomness
Expand Down
2 changes: 1 addition & 1 deletion test/jdk/java/net/httpclient/CancelStreamedBodyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/*
* @test
* @bug 8294916
* @bug 8294916 8297075 8297149
* @summary Tests that closing a streaming handler (ofInputStream()/ofLines())
* without reading all the bytes unregisters the underlying subscriber.
* @library /test/lib http2/server
Expand Down

1 comment on commit 134acab

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.