From edf238b65e441a1d626f3a4ba06170badd05ca7c Mon Sep 17 00:00:00 2001 From: Daniel Fuchs Date: Fri, 17 Feb 2023 14:43:58 +0000 Subject: [PATCH] 8302635: Race condition in HttpBodySubscriberWrapper when cancelling request Reviewed-by: jpai --- .../jdk/internal/net/http/Http1Exchange.java | 18 +- .../classes/jdk/internal/net/http/Stream.java | 13 +- .../common/HttpBodySubscriberWrapper.java | 225 ++++++++++++++++-- .../net/httpclient/CancelRequestTest.java | 2 +- 4 files changed, 213 insertions(+), 45 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java index 9c4968e492b7e..d2715678a2aca 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -209,24 +209,12 @@ static final class Http1ResponseBodySubscriber extends HttpBodySubscriberWrap } @Override - protected void onSubscribed() { + protected void register() { exchange.registerResponseSubscriber(this); } @Override - protected void complete(Throwable t) { - try { - exchange.unregisterResponseSubscriber(this); - } finally { - super.complete(t); - } - } - - @Override - protected void onCancel() { - // If the subscription is cancelled the - // subscriber may or may not get completed. - // Therefore we need to unregister it + protected void unregister() { exchange.unregisterResponseSubscriber(this); } } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index dc659d5f9718f..ce750e48916d9 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -1606,21 +1606,12 @@ final class Http2StreamResponseSubscriber extends HttpBodySubscriberWrapper userSubscriber; - final AtomicBoolean completed = new AtomicBoolean(); - final AtomicBoolean subscribed = new AtomicBoolean(); + private volatile int state; final ReentrantLock subscriptionLock = new ReentrantLock(); volatile SubscriptionWrapper subscription; volatile Throwable withError; @@ -83,14 +88,55 @@ public void request(long n) { @Override public void cancel() { try { - subscription.cancel(); - onCancel(); + try { + subscription.cancel(); + } finally { + if (markCancelled()) { + onCancel(); + } + } } catch (Throwable t) { onError(t); } } } + private final boolean markState(final int flag) { + int state = this.state; + if ((state & flag) == flag) { + return false; + } + synchronized (this) { + state = this.state; + if ((state & flag) == flag) { + return false; + } + state = this.state = (state | flag); + } + assert (state & flag) == flag; + return true; + } + + private boolean markSubscribed() { + return markState(SUBSCRIBED); + } + + private boolean markCancelled() { + return markState(CANCELLED); + } + + private boolean markCompleted() { + return markState(COMPLETED); + } + + private boolean markRegistered() { + return markState(REGISTERED); + } + + private boolean markUnregistered() { + return markState(UNREGISTERED); + } + final long id() { return id; } @Override @@ -101,8 +147,9 @@ public boolean needsExecutor() { // propagate the error to the user subscriber, even if not // subscribed yet. private void propagateError(Throwable t) { + var state = this.state; assert t != null; - assert completed.get(); + assert (state & COMPLETED) != 0; try { // if unsubscribed at this point, it will not // get subscribed later - so do it now and @@ -111,7 +158,7 @@ private void propagateError(Throwable t) { // subscription is finished before calling onError; subscriptionLock.lock(); try { - if (subscribed.compareAndSet(false, true)) { + if (markSubscribed()) { userSubscriber.onSubscribe(NOP); } } finally { @@ -125,34 +172,139 @@ private void propagateError(Throwable t) { } } + /** + * This method attempts to mark the state of this + * object as registered, and then call the + * {@link #register()} method. + *

+ * The state will be marked as registered, and the + * {@code register()} method will be called only + * if not already registered or unregistered, + * or cancelled, or completed. + * + * @return {@code true} if {@link #register()} was called, + * false otherwise. + */ + protected final boolean tryRegister() { + subscriptionLock.lock(); + try { + int state = this.state; + if ((state & (REGISTERED | UNREGISTERED | CANCELLED | COMPLETED)) != 0) return false; + if (markRegistered()) { + register(); + return true; + } + } finally { + subscriptionLock.unlock(); + } + return false; + } + + /** + * This method attempts to mark the state of this + * object as unregistered, and then call the + * {@link #unregister()} method. + *

+ * The {@code unregister()} method will be called only + * if already registered and not yet unregistered. + * Whether {@code unregister()} is called or not, + * the state is marked as unregistered, to prevent + * {@link #tryRegister()} from calling {@link #register()} + * after {@link #tryUnregister()} has been called. + * + * @return {@code true} if {@link #unregister()} was called, + * false otherwise. + */ + protected final boolean tryUnregister() { + subscriptionLock.lock(); + try { + int state = this.state; + if ((state & REGISTERED) == 0) { + markUnregistered(); + return false; + } + if (markUnregistered()) { + unregister(); + return true; + } + } finally { + subscriptionLock.unlock(); + } + return false; + } + + /** + * This method can be implemented by subclasses + * to perform registration actions. It will not be + * called if already registered or unregistered. + * @apiNote + * This method is called while holding a subscription + * lock. + * @see #tryRegister() + */ + protected void register() { + assert subscriptionLock.isHeldByCurrentThread(); + } + + /** + * This method can be implemented by subclasses + * to perform unregistration actions. It will not be + * called if not already registered, or already unregistered. + * @apiNote + * This method is called while holding a subscription + * lock. + * @see #tryUnregister() + */ + protected void unregister() { + assert subscriptionLock.isHeldByCurrentThread(); + } + /** * Called when the subscriber cancels its subscription. * @apiNote * This method may be used by subclasses to perform cleanup * actions after a subscription has been cancelled. + * @implSpec + * This method calls {@link #tryUnregister()} */ - protected void onCancel() { } + protected void onCancel() { + // If the subscription is cancelled the + // subscriber may or may not get completed. + // Therefore we need to unregister it + tryUnregister(); + } /** * Called right before the userSubscriber::onSubscribe is called. * @apiNote * This method may be used by subclasses to perform cleanup - * related actions after a subscription has been succesfully + * related actions after a subscription has been successfully * accepted. + * This method is called while holding a subscription + * lock. + * @implSpec + * This method calls {@link #tryRegister()} */ - protected void onSubscribed() { } + protected void onSubscribed() { + tryRegister(); + } /** * Complete the subscriber, either normally or exceptionally * ensure that the subscriber is completed only once. * @param t a throwable, or {@code null} + * @implSpec + * If not {@linkplain #completed()} yet, this method + * calls {@link #tryUnregister()} */ - protected void complete(Throwable t) { - if (completed.compareAndSet(false, true)) { + public final void complete(Throwable t) { + if (markCompleted()) { + tryUnregister(); t = withError = Utils.getCompletionCause(t); if (t == null) { try { - assert subscribed.get(); + var state = this.state; + assert (state & SUBSCRIBED) != 0; userSubscriber.onComplete(); } catch (Throwable x) { // Simply propagate the error by calling @@ -179,10 +331,45 @@ protected void complete(Throwable t) { * {@return true if this subscriber has already completed, either normally * or abnormally} */ - public boolean completed() { - return completed.get(); + public final boolean completed() { + int state = this.state; + return (state & COMPLETED) != 0; } + /** + * {@return true if this subscriber has already subscribed} + */ + public final boolean subscribed() { + int state = this.state; + return (state & SUBSCRIBED) != 0; + } + + /** + * {@return true if this subscriber has already been registered} + */ + public final boolean registered() { + int state = this.state; + return (state & REGISTERED) != 0; + } + + /** + * {@return true if this subscriber has already been unregistered} + */ + public final boolean unregistered() { + int state = this.state; + return (state & UNREGISTERED) != 0; + } + + /** + * {@return true if this subscriber's subscription has already + * been cancelled} + */ + public final boolean cancelled() { + int state = this.state; + return (state & CANCELLED) != 0; + } + + @Override public CompletionStage getBody() { return userSubscriber.getBody(); @@ -194,7 +381,7 @@ public void onSubscribe(Flow.Subscription subscription) { // subscription is finished before calling onError; subscriptionLock.lock(); try { - if (subscribed.compareAndSet(false, true)) { + if (markSubscribed()) { onSubscribed(); SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription); userSubscriber.onSubscribe(this.subscription = wrapped); @@ -208,8 +395,9 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(List item) { - assert subscribed.get(); - if (completed.get()) { + var state = this.state; + assert (state & SUBSCRIBED) != 0; + if ((state & COMPLETED) != 0) { SubscriptionWrapper subscription = this.subscription; if (subscription != null) { subscription.subscription.cancel(); @@ -222,6 +410,7 @@ public void onNext(List item) { public void onError(Throwable throwable) { complete(throwable); } + @Override public void onComplete() { complete(null); diff --git a/test/jdk/java/net/httpclient/CancelRequestTest.java b/test/jdk/java/net/httpclient/CancelRequestTest.java index ef817bf3ee618..3d531611a7f54 100644 --- a/test/jdk/java/net/httpclient/CancelRequestTest.java +++ b/test/jdk/java/net/httpclient/CancelRequestTest.java @@ -23,7 +23,7 @@ /* * @test - * @bug 8245462 8229822 8254786 8297075 8297149 8298340 + * @bug 8245462 8229822 8254786 8297075 8297149 8298340 8302635 * @summary Tests cancelling the request. * @library /test/lib /test/jdk/java/net/httpclient/lib * @key randomness