Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8294916: Cancelling a request must eventually cause its response body subscriber to be unregistered #10659

Closed
wants to merge 9 commits into from

Conversation

dfuch
Copy link
Member

@dfuch dfuch commented Oct 11, 2022

When JDK-8277969 was implemented, a list of outstanding response subscribers was added to HttpClientImpl. A body subscriber is added to the list after being created and is removed from the list when it is completed, either successfully or exceptionally.

It appears that in the case where the subscription is cancelled before the subscriber is completed, the subscriber might remain registered in the list forever, or at least until the HttpClient gets garbage collected. This can be easily reproduced using streaming subscribers, such as BodySubscriber::ofInputStream. In the case where the input stream is closed without having read all the bytes, Subscription::cancel will be called. Whether the subscriber gets unregistered or not at that point becomes racy.

Indeed, the reactive stream specification doesn't guarantee whether onComplete or onError will be called or not after a subscriber cancels its subscription. Any cleanup that would have been performed by onComplete/onError might therefore need to be performed when the subscription is cancelled too.


Progress

  • Change must be properly reviewed (1 review required, with at least 1 Reviewer)
  • Change must not contain extraneous whitespace
  • Commit message must refer to an issue

Issue

  • JDK-8294916: Cancelling a request must eventually cause its response body subscriber to be unregistered

Reviewers

Reviewing

Using git

Checkout this PR locally:
$ git fetch https://git.openjdk.org/jdk pull/10659/head:pull/10659
$ git checkout pull/10659

Update a local copy of the PR:
$ git checkout pull/10659
$ git pull https://git.openjdk.org/jdk pull/10659/head

Using Skara CLI tools

Checkout this PR locally:
$ git pr checkout 10659

View PR using the GUI difftool:
$ git pr show -t 10659

Using diff file

Download this PR as a diff file:
https://git.openjdk.org/jdk/pull/10659.diff

@bridgekeeper
Copy link

bridgekeeper bot commented Oct 11, 2022

👋 Welcome back dfuchs! A progress list of the required criteria for merging this PR into master will be added to the body of your pull request. There are additional pull request commands available for use with this pull request.

@openjdk openjdk bot added the rfr Pull request is ready for review label Oct 11, 2022
@openjdk
Copy link

openjdk bot commented Oct 11, 2022

@dfuch The following label will be automatically applied to this pull request:

  • net

When this pull request is ready to be reviewed, an "RFR" email will be sent to the corresponding mailing list. If you would like to change these labels, use the /label pull request command.

@openjdk openjdk bot added the net net-dev@openjdk.org label Oct 11, 2022
@mlbridge
Copy link

mlbridge bot commented Oct 11, 2022

@ashr123
Copy link

ashr123 commented Oct 11, 2022

@dfuch where is the part that removes the subscriber in case HttpResponse.BodyHandlers#ofInputStream is used and the inputStream closed before the response body is received?

i.e. I expect that even if I use HttpResponse.BodyHandlers#ofInputStream with HttpClient#send in while(true) loop it will retain only 1 subscriber at all times.

Example code for clarification (not complete code):

private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient();

public static void main(String[] args) throws IOException, InterruptedException {
	final URI uri = URI.create(<Some URI string>);
	final int minRedAlertEventContentLength = """
			{"cat":"1","data":[],"desc":"","id":0,"title":""}""".getBytes(StandardCharsets.UTF_8).length;

	while (true) {
		try {
			final HttpResponse<InputStream> httpResponse = HTTP_CLIENT.send(
					HttpRequest.newBuilder(uri)
							.build(),
					HttpResponse.BodyHandlers.ofInputStream()
			);

			try (InputStream httpResponseBody = httpResponse.body()) {
				if (httpResponse.statusCode() != HttpURLConnection.HTTP_OK) {
					sleep();
					continue;
				}
				if (httpResponse.headers().firstValueAsLong("Content-Length").orElse(-1) > minRedAlertEventContentLength)
					System.out.println(JSON_MAPPER.readValue(
							httpResponseBody,
							<Some class>.class
					));
			}
		} catch (IOException | InterruptedException e) {
			System.err.println(e);
		}
	}
}

@dfuch
Copy link
Member Author

dfuch commented Oct 12, 2022

That's exactly what this change is going to fix. What happens in the code you're showing here is that if the status code is not 200, then the input stream is closed without having read any bytes. This will cause the underlying subscriber to cancel its subscription, possibly before any response bytes have been received. And that is where the leak comes from, as there's no guarantee that the subscriber's onComplete/onError will ever be called in that case. With this fix, the subscriber will be taken off the list when the subscription is cancelled, and that should fix the leak.

@ashr123
Copy link

ashr123 commented Oct 12, 2022

@dfuch see

if (httpResponse.headers().firstValueAsLong("Content-Length").orElse(-1) > minRedAlertEventContentLength)

there are cases that the response code is 200 but for my reasons I don't want to read the body (for example body that contains only BOM bytes).

I guess that fixes it too 😃

thanks!!


@Override
public void cancel() {
HttpBodySubscriberWrapper.this.cancel(subscription);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Daniel, at first when I noticed this code, it looked odd to me that we aren't calling the underlying subscription.cancel(). I then noticed that the HttpBodySubscriberWrapper.cancel() actually does the cancellation of this underyling subscription. It's slightly odd to me that we are doing it there instead of here. Furthermore, the HttpBodySubscriberWrapper (which implements the Flow interface) has APIs like onComplete(), onError(). Perhaps it would be more consistent to have an onCancel in HttpBodySubscriberWrapper and then have the actual cancellation of the subscription done here in the SubscriptionWrapper and then call the onCancel(Subscription) on the HttpBodySubscriberWrapper? Something like:

class SubscriptionWrapper ... {
  @Override
  public void cancel() {
    try {
      subscription.cancel();
    } finally {
         HttpBodySubscriberWrapper.this.onCancel(subscription);
    }
  }
}
class HttpBodySubscriberWrapper {
    protected void onCancel(Subscription cancelledSubscription) {
        
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. done.

// race condition with propagateError: we need to wait until
// subscription is finished before calling onError;
synchronized (this) {
if (subscribed.compareAndSet(false, true)) {
userSubscriber.onSubscribe(subscription);
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it simpler to read, perhaps change it to:

this.subscription = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription);

It's OK with me if you prefer to have it in the current form.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to leave it like that because this.subscription is volatile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I overlooked the fact that this was a volatile field. So this isn't just a stylistic preference and the current form you have here adds value. So yes, looks fine to me.
I additionally checked the generated bytecode to be sure that this does indeed do just one access to the volatile field (I wasn't sure what it would look like when you are writing to it as well as passing it as a method parameter).

@@ -157,7 +178,7 @@ public void onSubscribe(Flow.Subscription subscription) {
public void onNext(List<ByteBuffer> item) {
if (completed.get()) {
if (subscription != null) {
subscription.cancel();
subscription.subscription.cancel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we intentionally accessing and cancelling the underlying subscription here, instead of the wrapper? Do we intentionally want to bypass the on-cancellation logic in this flow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. completed.get() is true here which means that we're receiving onNext after onError or onComplete have been called. Which also means that the stream is guaranteed to be unregistered (at some point). Receiving onNext after onError or onComplete have been called should not happen according to reactive stream specification. But if it does, we really want to make sure the underlying subscription is cancelled. Though I agree it's belt and braces.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for that explanation. Looks good to me.

volatile Throwable withError;
public HttpBodySubscriberWrapper(BodySubscriber<T> userSubscriber) {
this.userSubscriber = userSubscriber;
}

class SubscriptionWrapper implements Subscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this can be made private class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

public Object[][] alltests() {
String[] uris = uris();
Object[][] result = new Object[uris.length * 2][];
//Object[][] result = new Object[uris.length][];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Thanks.

//Object[][] result = new Object[uris.length][];
int i = 0;
for (boolean sameClient : List.of(false, true)) {
//if (!sameClient) continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

.executor(executor)
.sslContext(sslContext)
.build();
return shared ? client : TRACKER.track(client);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this method looks a bit odd to me. We seem to be creating a new client and then checking if the caller passed true for shared and if they did then we are returning a brand new client?

Looking at the usage of this makeNewClient() method, we should perhaps not even accept a parameter for this method and instead just return a newly created client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - let me see what I can do. What happens here is that we have a big hammer, the TRACKER, that checks shutdown for all clients. If we track the shared client we can't use the hammer properly because until teardown() is called the shared client will remain alive. But in this particular, test - we still need to track some properties of the shared client, so I believe we should track it. But we don't want to track that it's shutdown (gah). I will need to add some changes to the ReferenceTracker to support this.

try (InputStream is = t.getRequestBody()) {
req = is.readAllBytes();
}
t.sendResponseHeaders(200, -1); // chunked/variable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR - but this is extra confusing that our HttpTestExchange expects -1 for chunked responses whereas the actual server com.sun.net.httpserver.HttpExchange expects 0 for chunked responses.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh. Yes - I am to blame. The com.sun.httpserver API always frustrated me because I couldn't do sendResponseHeaders(200, resp.length);. I always had to special case resp.length == 0 ? -1 : resp.length . So when I created the adapters I fixed it there. But I agree - if you don't know the history it adds some confusion. I'm not sure I want to change it ;-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I want to change it ;-)

I wasn't expecting this to be changed :) At this point, it's now become a habit to pay extra attention to the value we pass for this method.

} catch (InterruptedException x) {
// OK
}
out.printf("Server wrote %d bytes%n", req.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to move this printf before the call to sleep, since that's when the write completes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

try (InputStream is = client.send(req, BodyHandlers.ofInputStream()).body()) {
for (int k = 0; k < j; k++) {
read = is.read();
assertEquals(read, BODY.charAt(k));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Daniel, I don't follow this loop. The number of times it is iterating seems to be dependent on a statically decided value of j (which can't exceed 2). Each iteration it reads 1 byte and compares that byte with the pre-defined response content. Shouldn't this loop instead be doing k < BODY.length();?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think with a fresher mind, I see what this loop does - it intentionally doesn't read the entire response content and instead closes the stream before the whole response is read, to trigger the original issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - the first time around we don't read anything and we cancel right away. The second time around we read something and then we cancel.

HttpClient client = null;
uri = uri + "/testInputStream";
out.printf("%n%s testInputStream(%s, %b)%n", now(), uri, sameClient);
for (int i=0; i< ITERATION_COUNT; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reconsider running each test more than once? Here it looks like we are running 3 * 2 = 6 times for testing this use case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standard procedure for the HttpClient. The first request is somewhat special because it's the first. It may trigger creation of the connection, protocol upgrade, etc... The second should find the connection in the pool - it can be special as well in the sense that it's the first time we fetch something from the pool ;-) . So in many tests we test the same things 3 times (granted: we could probably reduce the ITERATION_COUNT to 2, but having it go up to 3 helps convince ourselves that yes, it didn't pass the first time only by chance).

// lets split the response in several chunks...
String msg = (req != null && req.length != 0)
? new String(req, UTF_8)
: BODY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, none of the test requests send a request body. So perhaps we can simplify this part to just always send this static BODY content without these additional checks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - I have changed that code to return 418 (I'm a teapot) if any bytes are received.

@dfuch
Copy link
Member Author

dfuch commented Oct 17, 2022

I have updated the PR. There's a somewhat larger change in how tests can now track HttpClients. I noticed that ReferenceTracker::check(gracedelay) wasn't actually checking that all client selector managers threads had exited (as I thought it did) so I changed it to do that, and the SmallTimeout test started failing, so I had to fix that test to null out references and actually wait until all tasks had been executed.


var error = TRACKER.check(1,
(t) -> t.getOutstandingOperations() > 0 || t.getOutstandingSubscribers() > 0,
"subscribers for testInputStream(%s)\n\t step [%s]".formatted(req.uri(), i),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Daniel,
Is this a copy/paste error - the reference to testInputStream method name? Same with other places that have changed in this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gah! Yes - thanks for noticing.

@@ -79,6 +83,7 @@ static HttpResult of(HttpRequest request, Throwable t) {
public static void main(String[] args) throws Exception {
HttpClient client = HttpClient.newHttpClient();
ReferenceTracker.INSTANCE.track(client);
Reference<HttpClient> reference = new WeakReference<>(client);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the use of WeakReference here. I see that we have a ReachabilityFence for the client in the finally block of this test where we then null out client. So, if I'm understanding this right, this WeakReference is essentially a no-op. i.e. we probably don't need it because we are anyway holding on to the client for the lifetime of this test program?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference is passed to another thread but should remain alive until that other thread has terminated - which is ensured by waiting for the executor to shutdown. What happens here is that the test failed because the other threads started in this block were keeping the reference alive. I'm using a reference here because passing client to a lambda makes client final and that prevents me from nulling out the client variable before calling gc. Using a weak reference solves the issue (since I don't have to null it and it won't keep the client alive).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still unclear as to why the other threads were keeping the reference alive though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see the comment above about ReferenceTracker::check(gracedelay) Never mind

@@ -174,6 +181,9 @@ public static void main(String[] args) throws Exception {
checkReturn(requests);

executor.shutdownNow();
if (!executor.awaitTermination(500, TimeUnit.MILLISECONDS)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this timeout value (whatever value we decide) introduce any potential intermittent failures, especially on CI systems? Would it perhaps be better to just call executor.close() after that call to executor.shutdownNow(), so that if the tasks really don't complete, then the jtreg test timeout will make it fail with a timeout and we don't have to decide what timeout is a good timeout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

onCancel();
} catch (Throwable t) {
onError(t);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making this change. Do you think, we should only call onError() if subscription.cancel() fails? In its current form above, it will end up calling the onError() if the onCancel() fails for whatever reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onCancel is not supposed to fail - and we really want the cleanup to be performed so I believe it's fair to call onError in that case.

@jaikiran
Copy link
Member

Thank you for the changes to the new test CancelStreamedBodyTest. They looks fine to me.

@@ -627,7 +638,7 @@ final long unreference() {
final long httpCount = pendingHttpOperationsCount.decrementAndGet();
final long http2Count = pendingHttp2StreamCount.get();
final long webSocketCount = pendingWebSocketCount.get();
if (count == 0 && facade() == null) {
if (count == 0 && facadeRef.refersTo(null)) {
selmgr.wakeupSelector();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice there is only one remaining reference to the facade() method. It might make sense to get rid of it and replace the call site with facadeRef.refersTo

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only remaining call I see is here:

        return new BuilderImpl(this.facade(), proxySelector);

and that call needs to return the facade - we can't replace it with refersTo. I'm not seeing any other calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I mean to say facadeRef.get() there. It's not a big deal though.

@dfuch
Copy link
Member Author

dfuch commented Oct 18, 2022

I have updated the PR.

Copy link
Member

@Michael-Mc-Mahon Michael-Mc-Mahon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me.

@openjdk
Copy link

openjdk bot commented Oct 19, 2022

@dfuch This change now passes all automated pre-integration checks.

ℹ️ This project also has non-automated pre-integration requirements. Please see the file CONTRIBUTING.md for details.

After integration, the commit message for the final commit will be:

8294916: Cancelling a request must eventually cause its response body subscriber to be unregistered

Reviewed-by: michaelm, jpai

You can use pull request commands such as /summary, /contributor and /issue to adjust it as needed.

At the time when this comment was updated there had been 2 new commits pushed to the master branch:

  • e27bea0: 8290011: IGV: Remove dead code and cleanup
  • d37ce4c: 8290368: Introduce LDAP and RMI protocol-specific object factory filters to JNDI implementation

Please see this link for an up-to-date comparison between the source branch of this pull request and the master branch.
As there are no conflicts, your changes will automatically be rebased on top of these commits when integrating. If you prefer to avoid this automatic rebasing, please check the documentation for the /integrate command for further details.

➡️ To integrate this PR with the above commit message to the master branch, type /integrate in a new comment.

@openjdk openjdk bot added the ready Pull request is ready to be integrated label Oct 19, 2022
Copy link
Member

@jaikiran jaikiran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the updates, Daniel. The latest version of this PR looks good to me.

@dfuch
Copy link
Member Author

dfuch commented Oct 20, 2022

/integrate

@openjdk
Copy link

openjdk bot commented Oct 20, 2022

Going to push as commit dcd4650.
Since your change was applied there have been 14 commits pushed to the master branch:

  • 4f994c0: 8295709: Linux AArch64 builds broken after JDK-8294438
  • 545021b: 8294438: Fix misleading-indentation warnings in hotspot
  • c5e0464: 8291456: com/sun/jdi/ClassUnloadEventTest.java failed with: Wrong number of class unload events: expected 10 got 4
  • 8d4c077: 8295302: Do not use ArrayList when LambdaForm has a single ClassData
  • 017e798: 8293939: Move continuation_enter_setup and friends
  • f872467: 8255746: Make PrintCompilation available on a per method level
  • 388a56e: 8294467: Fix sequence-point warnings in Hotspot
  • ceb5b08: 8294468: Fix char-subscripts warnings in Hotspot
  • 7b1c676: 8295662: jdk/incubator/vector tests fail "assert(VM_Version::supports_avx512vlbw()) failed"
  • 5eaf568: 8295668: validate-source failure after JDK-8290011
  • ... and 4 more: https://git.openjdk.org/jdk/compare/21aeb9e7946fc7450ee48939944a69c8aa04bcce...master

Your commit was automatically rebased without conflicts.

@openjdk openjdk bot added the integrated Pull request has been integrated label Oct 20, 2022
@openjdk openjdk bot closed this Oct 20, 2022
@openjdk openjdk bot removed ready Pull request is ready to be integrated rfr Pull request is ready for review labels Oct 20, 2022
@openjdk
Copy link

openjdk bot commented Oct 20, 2022

@dfuch Pushed as commit dcd4650.

💡 You may see a message that your pull request was closed with unmerged commits. This can be safely ignored.

@dfuch
Copy link
Member Author

dfuch commented Oct 20, 2022

Integrated. Thanks for all the reviews!

@ashr123
Copy link

ashr123 commented Oct 20, 2022

Thank you @dfuch for the effort both here and on StackOverflow!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
integrated Pull request has been integrated net net-dev@openjdk.org
4 participants