diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java index 9c4968e492b7e..d2715678a2aca 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -209,24 +209,12 @@ static final class Http1ResponseBodySubscriber extends HttpBodySubscriberWrap } @Override - protected void onSubscribed() { + protected void register() { exchange.registerResponseSubscriber(this); } @Override - protected void complete(Throwable t) { - try { - exchange.unregisterResponseSubscriber(this); - } finally { - super.complete(t); - } - } - - @Override - protected void onCancel() { - // If the subscription is cancelled the - // subscriber may or may not get completed. - // Therefore we need to unregister it + protected void unregister() { exchange.unregisterResponseSubscriber(this); } } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index dc659d5f9718f..ce750e48916d9 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -1606,21 +1606,12 @@ final class Http2StreamResponseSubscriber extends HttpBodySubscriberWrapper userSubscriber; - final AtomicBoolean completed = new AtomicBoolean(); - final AtomicBoolean subscribed = new AtomicBoolean(); + private volatile int state; final ReentrantLock subscriptionLock = new ReentrantLock(); volatile SubscriptionWrapper subscription; volatile Throwable withError; @@ -83,14 +88,55 @@ public void request(long n) { @Override public void cancel() { try { - subscription.cancel(); - onCancel(); + try { + subscription.cancel(); + } finally { + if (markCancelled()) { + onCancel(); + } + } } catch (Throwable t) { onError(t); } } } + private final boolean markState(final int flag) { + int state = this.state; + if ((state & flag) == flag) { + return false; + } + synchronized (this) { + state = this.state; + if ((state & flag) == flag) { + return false; + } + state = this.state = (state | flag); + } + assert (state & flag) == flag; + return true; + } + + private boolean markSubscribed() { + return markState(SUBSCRIBED); + } + + private boolean markCancelled() { + return markState(CANCELLED); + } + + private boolean markCompleted() { + return markState(COMPLETED); + } + + private boolean markRegistered() { + return markState(REGISTERED); + } + + private boolean markUnregistered() { + return markState(UNREGISTERED); + } + final long id() { return id; } @Override @@ -101,8 +147,9 @@ public boolean needsExecutor() { // propagate the error to the user subscriber, even if not // subscribed yet. private void propagateError(Throwable t) { + var state = this.state; assert t != null; - assert completed.get(); + assert (state & COMPLETED) != 0; try { // if unsubscribed at this point, it will not // get subscribed later - so do it now and @@ -111,7 +158,7 @@ private void propagateError(Throwable t) { // subscription is finished before calling onError; subscriptionLock.lock(); try { - if (subscribed.compareAndSet(false, true)) { + if (markSubscribed()) { userSubscriber.onSubscribe(NOP); } } finally { @@ -125,34 +172,139 @@ private void propagateError(Throwable t) { } } + /** + * This method attempts to mark the state of this + * object as registered, and then call the + * {@link #register()} method. + *
+ * The state will be marked as registered, and the + * {@code register()} method will be called only + * if not already registered or unregistered, + * or cancelled, or completed. + * + * @return {@code true} if {@link #register()} was called, + * false otherwise. + */ + protected final boolean tryRegister() { + subscriptionLock.lock(); + try { + int state = this.state; + if ((state & (REGISTERED | UNREGISTERED | CANCELLED | COMPLETED)) != 0) return false; + if (markRegistered()) { + register(); + return true; + } + } finally { + subscriptionLock.unlock(); + } + return false; + } + + /** + * This method attempts to mark the state of this + * object as unregistered, and then call the + * {@link #unregister()} method. + *
+ * The {@code unregister()} method will be called only
+ * if already registered and not yet unregistered.
+ * Whether {@code unregister()} is called or not,
+ * the state is marked as unregistered, to prevent
+ * {@link #tryRegister()} from calling {@link #register()}
+ * after {@link #tryUnregister()} has been called.
+ *
+ * @return {@code true} if {@link #unregister()} was called,
+ * false otherwise.
+ */
+ protected final boolean tryUnregister() {
+ subscriptionLock.lock();
+ try {
+ int state = this.state;
+ if ((state & REGISTERED) == 0) {
+ markUnregistered();
+ return false;
+ }
+ if (markUnregistered()) {
+ unregister();
+ return true;
+ }
+ } finally {
+ subscriptionLock.unlock();
+ }
+ return false;
+ }
+
+ /**
+ * This method can be implemented by subclasses
+ * to perform registration actions. It will not be
+ * called if already registered or unregistered.
+ * @apiNote
+ * This method is called while holding a subscription
+ * lock.
+ * @see #tryRegister()
+ */
+ protected void register() {
+ assert subscriptionLock.isHeldByCurrentThread();
+ }
+
+ /**
+ * This method can be implemented by subclasses
+ * to perform unregistration actions. It will not be
+ * called if not already registered, or already unregistered.
+ * @apiNote
+ * This method is called while holding a subscription
+ * lock.
+ * @see #tryUnregister()
+ */
+ protected void unregister() {
+ assert subscriptionLock.isHeldByCurrentThread();
+ }
+
/**
* Called when the subscriber cancels its subscription.
* @apiNote
* This method may be used by subclasses to perform cleanup
* actions after a subscription has been cancelled.
+ * @implSpec
+ * This method calls {@link #tryUnregister()}
*/
- protected void onCancel() { }
+ protected void onCancel() {
+ // If the subscription is cancelled the
+ // subscriber may or may not get completed.
+ // Therefore we need to unregister it
+ tryUnregister();
+ }
/**
* Called right before the userSubscriber::onSubscribe is called.
* @apiNote
* This method may be used by subclasses to perform cleanup
- * related actions after a subscription has been succesfully
+ * related actions after a subscription has been successfully
* accepted.
+ * This method is called while holding a subscription
+ * lock.
+ * @implSpec
+ * This method calls {@link #tryRegister()}
*/
- protected void onSubscribed() { }
+ protected void onSubscribed() {
+ tryRegister();
+ }
/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
* @param t a throwable, or {@code null}
+ * @implSpec
+ * If not {@linkplain #completed()} yet, this method
+ * calls {@link #tryUnregister()}
*/
- protected void complete(Throwable t) {
- if (completed.compareAndSet(false, true)) {
+ public final void complete(Throwable t) {
+ if (markCompleted()) {
+ tryUnregister();
t = withError = Utils.getCompletionCause(t);
if (t == null) {
try {
- assert subscribed.get();
+ var state = this.state;
+ assert (state & SUBSCRIBED) != 0;
userSubscriber.onComplete();
} catch (Throwable x) {
// Simply propagate the error by calling
@@ -179,10 +331,45 @@ protected void complete(Throwable t) {
* {@return true if this subscriber has already completed, either normally
* or abnormally}
*/
- public boolean completed() {
- return completed.get();
+ public final boolean completed() {
+ int state = this.state;
+ return (state & COMPLETED) != 0;
}
+ /**
+ * {@return true if this subscriber has already subscribed}
+ */
+ public final boolean subscribed() {
+ int state = this.state;
+ return (state & SUBSCRIBED) != 0;
+ }
+
+ /**
+ * {@return true if this subscriber has already been registered}
+ */
+ public final boolean registered() {
+ int state = this.state;
+ return (state & REGISTERED) != 0;
+ }
+
+ /**
+ * {@return true if this subscriber has already been unregistered}
+ */
+ public final boolean unregistered() {
+ int state = this.state;
+ return (state & UNREGISTERED) != 0;
+ }
+
+ /**
+ * {@return true if this subscriber's subscription has already
+ * been cancelled}
+ */
+ public final boolean cancelled() {
+ int state = this.state;
+ return (state & CANCELLED) != 0;
+ }
+
+
@Override
public CompletionStage