-
Notifications
You must be signed in to change notification settings - Fork 5.8k
8302635: Race condition in HttpBodySubscriberWrapper when cancelling request #12587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
👋 Welcome back dfuchs! A progress list of the required criteria for merging this PR into |
Webrevs
|
static final AtomicLong IDS = new AtomicLong(); | ||
final long id = IDS.incrementAndGet(); | ||
final BodySubscriber<T> userSubscriber; | ||
final AtomicBoolean completed = new AtomicBoolean(); | ||
final AtomicBoolean subscribed = new AtomicBoolean(); | ||
volatile int state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello Daniel, should we make this private
and all the newly introduced state values, too? Or maybe you want to leave it package private to be consistent with some other fields here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
@@ -125,34 +172,139 @@ private void propagateError(Throwable t) { | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't fully wrapped my head around the possible flow of the user subscription, but is there a possiblity where this call to onError(...)
here results in an reentrant call to this propagateError()
? For that matter, not just reentrant but perhaps from a different thread concurrently? The reason I ask is, should we call this onError just once? The java.util.concurrent.Flow.Subscriber.onError(Throwable)
method says this:
Method invoked upon an unrecoverable error encountered by a
Publisher or Subscription, after which no other Subscriber
methods are invoked by the Subscription.
So I'm wondering if we should maintain some state to only invoke it once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the purpose of this class: complete() and propagateError() should ensure that onError() is only called once in the wrapped subscriber. The markCompleted() call should ensure that, even if there is a reentrant call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right - the error propagation through propagateError()
always happens through the complete()
method of this wrapper class and that method has the necessary state management to call this only once.
static final AtomicLong IDS = new AtomicLong(); | ||
final long id = IDS.incrementAndGet(); | ||
final BodySubscriber<T> userSubscriber; | ||
final AtomicBoolean completed = new AtomicBoolean(); | ||
final AtomicBoolean subscribed = new AtomicBoolean(); | ||
volatile int state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java
Outdated
Show resolved
Hide resolved
@@ -125,34 +172,139 @@ private void propagateError(Throwable t) { | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the purpose of this class: complete() and propagateError() should ensure that onError() is only called once in the wrapped subscriber. The markCompleted() call should ensure that, even if there is a reentrant call.
…ttpBodySubscriberWrapper.java make `state` private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look good to me. HttpBodySubscriberWrapper
and Http1Exchange
files will need a copyright year update, before integrating.
@dfuch This change now passes all automated pre-integration checks. ℹ️ This project also has non-automated pre-integration requirements. Please see the file CONTRIBUTING.md for details. After integration, the commit message for the final commit will be:
You can use pull request commands such as /summary, /contributor and /issue to adjust it as needed. At the time when this comment was updated there had been 18 new commits pushed to the
As there are no conflicts, your changes will automatically be rebased on top of these commits when integrating. If you prefer to avoid this automatic rebasing, please check the documentation for the /integrate command for further details. ➡️ To integrate this PR with the above commit message to the |
…m/dfuch/jdk into HttpBodySubscriberWrapper-8302635
|
/integrate |
Going to push as commit edf238b.
Your commit was automatically rebased without conflicts. |
The HttpBodySubscriberWrapper is a class that ensures that a subscriber will be subscribed to before it is completed. It also provides hooks to its two subclasses (one for HTTP/1, one for HTTP/2) that allows subclasses to register the susbscriber with the HttpClient at subscription time, and to unregister it when it is eventualy completed, or when the subscription is cancelled.
There is however a race condition that can happen when a subscription is cancelled: it can happen that unregister is called before register. The CancelRequestTest has been observed failing once or twice on personal jobs. Though the particular mechanics of this race is hard to understand, the logs of the tests have brought sufficient evidence that this is what was happening.
The symptom is finding one subscriber still registered after completion of the exchange:
test CancelRequestTest.testGetSendAsync("https://localhost:42711/https1/x/same/interrupt", true, true): failure
java.lang.AssertionError: WARNING: tracker for HttpClientImpl(13) has outstanding operations:
Pending HTTP Requests: 0
Pending HTTP/1.1 operations: 0
Pending HTTP/2 streams: 0
Pending WebSocket operations: 0
Pending TCP connections: 0
Pending Subscribers: 1
Total pending operations: 0
Facade referenced: true
Selector alive: true
The proposed fix hoist special hooks for register/unregister in the superclass, merges all various volatile boolean states into a single int state, and protect the state changes to subscribed/register/unregister by the same subscription lock.
If cancelling the subscription happens at around the same time that the subscriber is subscribed this ensures that the subscriber won't be removed from the map before it is added.
Progress
Issue
Reviewers
Reviewing
Using
git
Checkout this PR locally:
$ git fetch https://git.openjdk.org/jdk pull/12587/head:pull/12587
$ git checkout pull/12587
Update a local copy of the PR:
$ git checkout pull/12587
$ git pull https://git.openjdk.org/jdk pull/12587/head
Using Skara CLI tools
Checkout this PR locally:
$ git pr checkout 12587
View PR using the GUI difftool:
$ git pr show -t 12587
Using diff file
Download this PR as a diff file:
https://git.openjdk.org/jdk/pull/12587.diff