diff --git a/benchmarks/src/main/java/reactor/core/publisher/FluxPublishBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/FluxPublishBenchmark.java new file mode 100644 index 0000000000..b40af13ed0 --- /dev/null +++ b/benchmarks/src/main/java/reactor/core/publisher/FluxPublishBenchmark.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; + +@BenchmarkMode({Mode.AverageTime}) +@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +public class FluxPublishBenchmark { + @Param({"0", "10", "1000", "100000"}) + int rangeSize; + + Flux source; + + @Setup(Level.Invocation) + public void setup() { + source = Flux.range(0, rangeSize) + .hide() + .publish() + .autoConnect(Runtime.getRuntime() + .availableProcessors()); + } + + + @State(Scope.Thread) + public static class JmhSubscriber extends CountDownLatch implements CoreSubscriber { + + Blackhole blackhole; + + Subscription s; + + public JmhSubscriber() { + super(1); + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + blackhole.consume(t); + } + + @Override + public void onError(Throwable t) { + blackhole.consume(t); + countDown(); + } + + @Override + public void onComplete() { + countDown(); + } + } + + @SuppressWarnings("unused") + @Benchmark + @Threads(Threads.MAX) + public Object measureThroughput(Blackhole blackhole, JmhSubscriber subscriber) throws InterruptedException { + subscriber.blackhole = blackhole; + source.subscribe(subscriber); + subscriber.await(); + return subscriber; + } +} diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxPublishStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxPublishStressTest.java index f1e77df700..d7e71a2235 100644 --- a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxPublishStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxPublishStressTest.java @@ -26,6 +26,8 @@ import org.openjdk.jcstress.infra.results.IIIIII_Result; import reactor.core.scheduler.Schedulers; import reactor.util.annotation.Nullable; +import org.openjdk.jcstress.infra.results.III_Result; +import reactor.core.Disposable; import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE; @@ -445,4 +447,50 @@ public void arbiter(IIIIII_Result r) { r.r6 = subscriber2.onErrorCalls.get(); } } + +// TODO: uncomment me. Proper discard is not supported yet since we dont have stable +// downstream context available all the time. This should be uncommented once we have +// an explicitly passed onDiscard handler +// @JCStressTest +// @Outcome(id = {"10, 1, 0"}, expect = ACCEPTABLE, desc = "all values and completion delivered") +// @Outcome(id = {"10, 0, 1"}, expect = ACCEPTABLE, desc = "some values are delivered some dropped since overflow") +// @State +// public static class ConcurrentDisposeAndProduceStressTest { +// +// final Sinks.Many producer = Sinks.unsafe().many().multicast().directAllOrNothing(); +// +// final ConnectableFlux sharedSource = producer.asFlux().publish(5); +// +// final StressSubscriber subscriber = new StressSubscriber<>(); +// +// final Disposable disposable; +// +// { +// sharedSource.subscribe(subscriber); +// disposable = sharedSource.connect(); +// } +// +// @Actor +// public void dispose() { +// disposable.dispose(); +// } +// +// @Actor +// public void emitValues() { +// for (int i = 0; i < 10; i++) { +// if (producer.tryEmitNext(i) != Sinks.EmitResult.OK) { +// Operators.onDiscard(i, subscriber.context); +// } +// } +// +// producer.tryEmitComplete(); +// } +// +// @Arbiter +// public void arbiter(III_Result r) { +// r.r1 = subscriber.onNextCalls.get() + subscriber.onNextDiscarded.get(); +// r.r2 = subscriber.onCompleteCalls.get(); +// r.r3 = subscriber.onErrorCalls.get(); +// } +// } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java index 7932c57bb1..ef0d7587bf 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java @@ -19,7 +19,6 @@ import java.util.Objects; import java.util.Queue; import java.util.concurrent.CancellationException; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; @@ -98,7 +97,7 @@ public void connect(Consumer cancelSupport) { s = u; } - doConnect = s.tryConnect(); + doConnect = s.tryConnect(); break; } @@ -130,12 +129,15 @@ public void subscribe(CoreSubscriber actual) { if (c.add(inner)) { if (inner.isCancelled()) { c.remove(inner); - } else { + } + else { inner.parent = c; } - c.drain(); + + c.drainFromInner(); break; - } else if (!this.resetUponSourceTermination) { + } + else if (!this.resetUponSourceTermination) { if (c.error != null) { inner.actual.onError(c.error); } else { @@ -168,12 +170,7 @@ static final class PublishSubscriber final FluxPublish parent; - volatile Subscription s; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater S = - AtomicReferenceFieldUpdater.newUpdater(PublishSubscriber.class, - Subscription.class, - "s"); + Subscription s; volatile PubSubInner[] subscribers; @@ -183,16 +180,10 @@ static final class PublishSubscriber PubSubInner[].class, "subscribers"); - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(PublishSubscriber.class, "wip"); - - volatile int connected; + volatile long state; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater CONNECTED = - AtomicIntegerFieldUpdater.newUpdater(PublishSubscriber.class, - "connected"); + static final AtomicLongFieldUpdater STATE = + AtomicLongFieldUpdater.newUpdater(PublishSubscriber.class, "state"); //notes: FluxPublish needs to distinguish INIT from CANCELLED in order to correctly //drop values in case of an early connect() without any subscribers. @@ -203,11 +194,11 @@ static final class PublishSubscriber @SuppressWarnings("rawtypes") static final PubSubInner[] TERMINATED = new PublishInner[0]; - volatile Queue queue; + Queue queue; int sourceMode; - volatile boolean done; + boolean done; volatile Throwable error; @@ -217,7 +208,6 @@ static final class PublishSubscriber Throwable.class, "error"); - @SuppressWarnings("unchecked") PublishSubscriber(int prefetch, FluxPublish parent) { this.prefetch = prefetch; this.parent = parent; @@ -230,7 +220,9 @@ boolean isTerminated(){ @Override public void onSubscribe(Subscription s) { - if (Operators.setOnce(S, this, s)) { + if (Operators.validate(this.s, s)) { + this.s = s; + if (s instanceof Fuseable.QueueSubscription) { @SuppressWarnings("unchecked") Fuseable.QueueSubscription f = (Fuseable.QueueSubscription) s; @@ -239,46 +231,82 @@ public void onSubscribe(Subscription s) { if (m == Fuseable.SYNC) { sourceMode = m; queue = f; - drain(); + long previousState = markSubscriptionSetAndAddWork(this); + if (isCancelled(previousState)) { + s.cancel(); + return; + } + + if (hasWorkInProgress(previousState)) { + return; + } + + drain(previousState | SUBSCRIPTION_SET_FLAG | 1); return; } + if (m == Fuseable.ASYNC) { sourceMode = m; queue = f; - s.request(Operators.unboundedOrPrefetch(prefetch)); + long previousState = markSubscriptionSet(this); + if (isCancelled(previousState)) { + s.cancel(); + } + else { + s.request(Operators.unboundedOrPrefetch(prefetch)); + } return; } } queue = parent.queueSupplier.get(); - - s.request(Operators.unboundedOrPrefetch(prefetch)); + long previousState = markSubscriptionSet(this); + if (isCancelled(previousState)) { + s.cancel(); + } + else { + s.request(Operators.unboundedOrPrefetch(prefetch)); + } } } @Override - public void onNext(T t) { + public void onNext(@Nullable T t) { if (done) { if (t != null) { Operators.onNextDropped(t, currentContext()); } return; } - if (sourceMode == Fuseable.ASYNC) { - drain(); - return; - } - - if (!queue.offer(t)) { + boolean isAsyncMode = sourceMode == Fuseable.ASYNC; + if (!isAsyncMode && !queue.offer(t)) { Throwable ex = Operators.onOperatorError(s, - Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), t, currentContext()); + Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), + t, + currentContext()); if (!Exceptions.addThrowable(ERROR, this, ex)) { Operators.onErrorDroppedMulticast(ex, subscribers); return; } done = true; } - drain(); + + long previousState = addWork(this); + + if (isFinalized(previousState)) { + clear(); + return; + } + + if (isTerminated(previousState) || isCancelled(previousState)) { + return; + } + + if (hasWorkInProgress(previousState)) { + return; + } + + drain(previousState + 1); } @Override @@ -287,13 +315,22 @@ public void onError(Throwable t) { Operators.onErrorDroppedMulticast(t, subscribers); return; } - if (Exceptions.addThrowable(ERROR, this, t)) { - done = true; - drain(); - } - else { + + done = true; + if (!Exceptions.addThrowable(ERROR, this, t)) { Operators.onErrorDroppedMulticast(t, subscribers); } + + long previousState = markTerminated(this); + if (isTerminated(previousState) || isCancelled(previousState)) { + return; + } + + if (hasWorkInProgress(previousState)) { + return; + } + + drain((previousState | TERMINATED_FLAG) + 1); } @Override @@ -302,7 +339,17 @@ public void onComplete() { return; } done = true; - drain(); + + long previousState = markTerminated(this); + if (isTerminated(previousState) || isCancelled(previousState)) { + return; + } + + if (hasWorkInProgress(previousState)) { + return; + } + + drain((previousState | TERMINATED_FLAG) + 1); } @Override @@ -311,19 +358,40 @@ public void dispose() { return; } if (CONNECTION.compareAndSet(parent, this, null)) { - Operators.terminate(S, this); - if (WIP.getAndIncrement(this) != 0) { + long previousState = markCancelled(this); + if (isTerminated(previousState) || isCancelled(previousState)) { + return; + } + + if (hasWorkInProgress(previousState)) { return; } - disconnectAction(); + + disconnectAction(previousState); } } - void disconnectAction() { + void clear() { + if (sourceMode == Fuseable.NONE) { + T t; + while ((t = queue.poll()) != null) { + Operators.onDiscard(t, currentContext()); + } + } + else { + queue.clear(); + } + } + + void disconnectAction(long previousState) { + if (isSubscriptionSet(previousState)) { + this.s.cancel(); + clear(); + } + @SuppressWarnings("unchecked") PubSubInner[] inners = SUBSCRIBERS.getAndSet(this, CANCELLED); if (inners.length > 0) { - queue.clear(); CancellationException ex = new CancellationException("Disconnected"); for (PubSubInner inner : inners) { @@ -348,7 +416,6 @@ boolean add(PublishInner inner) { } } - @SuppressWarnings("unchecked") public void remove(PubSubInner inner) { for (; ; ) { PubSubInner[] a = subscribers; @@ -392,16 +459,26 @@ PubSubInner[] terminate() { } boolean tryConnect() { - return connected == 0 && CONNECTED.compareAndSet(this, 0, 1); + long previousState = markConnected(this); + + return !isConnected(previousState); } - final void drain() { - if (WIP.getAndIncrement(this) != 0) { + final void drainFromInner() { + long previousState = addWorkIfSubscribed(this); + + if (!isSubscriptionSet(previousState)) { return; } - int missed = 1; + if (hasWorkInProgress(previousState)) { + return; + } + drain(previousState + 1); + } + + final void drain(long expectedState) { for (; ; ) { boolean d = done; @@ -411,7 +488,7 @@ final void drain() { boolean empty = q == null || q.isEmpty(); - if (checkTerminated(d, empty)) { + if (checkTerminated(d, empty, null)) { return; } @@ -446,7 +523,7 @@ final void drain() { d = true; v = null; } - if (checkTerminated(d, v == null)) { + if (checkTerminated(d, v == null, v)) { return; } if (mode != Fuseable.SYNC) { @@ -474,7 +551,7 @@ final void drain() { empty = v == null; - if (checkTerminated(d, empty)) { + if (checkTerminated(d, empty, v)) { return; } @@ -482,7 +559,7 @@ final void drain() { //async mode only needs to break but SYNC mode needs to perform terminal cleanup here... if (mode == Fuseable.SYNC) { done = true; - checkTerminated(true, true); + checkTerminated(true, true, null); } break; } @@ -509,21 +586,28 @@ final void drain() { } else if (q != null && mode == Fuseable.SYNC) { done = true; - if (checkTerminated(true, empty)) { + if (checkTerminated(true, empty, null)) { break; } } - missed = WIP.addAndGet(this, -missed); - if (missed == 0) { - break; + expectedState = markWorkDone(this, expectedState); + if (isCancelled(expectedState)) { + clearAndFinalize(this); + return; + } + + if (!hasWorkInProgress(expectedState)) { + return; } } } - boolean checkTerminated(boolean d, boolean empty) { - if (s == Operators.cancelledSubscription()) { - disconnectAction(); + boolean checkTerminated(boolean d, boolean empty, @Nullable T t) { + long state = this.state; + if (isCancelled(state)) { + Operators.onDiscard(t, currentContext()); + disconnectAction(state); return true; } if (d) { @@ -578,9 +662,193 @@ public Object scanUnsafe(Attr key) { @Override public boolean isDisposed() { - return s == Operators.cancelledSubscription() || done; + long state = this.state; + return isTerminated(state) || isCancelled(state); } + static void clearAndFinalize(PublishSubscriber instance) { + for (; ; ) { + final long state = instance.state; + + if (isFinalized(state)) { + instance.clear(); + return; + } + + if (isSubscriptionSet(state)) { + instance.clear(); + } + + if (STATE.compareAndSet( + instance, state, + (state & ~WORK_IN_PROGRESS_MASK) | FINALIZED_FLAG)) { + break; + } + } + } + + static long addWork(PublishSubscriber instance) { + for (;;) { + long state = instance.state; + + if (STATE.compareAndSet(instance, state, addWork(state))) { + return state; + } + } + } + + static long addWorkIfSubscribed(PublishSubscriber instance) { + for (;;) { + long state = instance.state; + + if (!isSubscriptionSet(state)) { + return state; + } + + if (STATE.compareAndSet(instance, state, addWork(state))) { + return state; + } + } + } + + static long addWork(long state) { + if ((state & WORK_IN_PROGRESS_MASK) == WORK_IN_PROGRESS_MASK) { + return (state &~ WORK_IN_PROGRESS_MASK) | 1; + } + else { + return state + 1; + } + } + + static long markTerminated(PublishSubscriber instance) { + for (;;) { + long state = instance.state; + + if (isCancelled(state) || isTerminated(state)) { + return state; + } + + long nextState = addWork(state); + if (STATE.compareAndSet(instance, state, nextState | TERMINATED_FLAG)) { + return state; + } + } + } + + static long markConnected(PublishSubscriber instance) { + for (;;) { + long state = instance.state; + + if (isConnected(state)) { + return state; + } + + if (STATE.compareAndSet(instance, state, state | CONNECTED_FLAG)) { + return state; + } + } + } + + static long markSubscriptionSet(PublishSubscriber instance) { + for (;;) { + long state = instance.state; + + if (isCancelled(state)) { + return state; + } + + if (STATE.compareAndSet(instance, state, state | SUBSCRIPTION_SET_FLAG)) { + return state; + } + } + } + + static long markSubscriptionSetAndAddWork(PublishSubscriber instance) { + for (;;) { + long state = instance.state; + + if (isCancelled(state)) { + return state; + } + + long nextState = addWork(state); + if (STATE.compareAndSet(instance, state, nextState | SUBSCRIPTION_SET_FLAG)) { + return state; + } + } + } + + static long markCancelled(PublishSubscriber instance) { + for (;;) { + long state = instance.state; + + if (isCancelled(state)) { + return state; + } + + long nextState = addWork(state); + if (STATE.compareAndSet(instance, state, nextState | CANCELLED_FLAG)) { + return state; + } + } + } + + static long markWorkDone(PublishSubscriber instance, long expectedState) { + for (;;) { + long state = instance.state; + + if (expectedState != state) { + return state; + } + + long nextState = state & ~WORK_IN_PROGRESS_MASK; + if (STATE.compareAndSet(instance, state, nextState)) { + return nextState; + } + } + } + + static boolean isConnected(long state) { + return (state & CONNECTED_FLAG) == CONNECTED_FLAG; + } + + static boolean isFinalized(long state) { + return (state & FINALIZED_FLAG) == FINALIZED_FLAG; + } + + static boolean isCancelled(long state) { + return (state & CANCELLED_FLAG) == CANCELLED_FLAG; + } + + static boolean isTerminated(long state) { + return (state & TERMINATED_FLAG) == TERMINATED_FLAG; + } + + static boolean isSubscriptionSet(long state) { + return (state & SUBSCRIPTION_SET_FLAG) == SUBSCRIPTION_SET_FLAG; + } + + static boolean hasWorkInProgress(long state) { + return (state & WORK_IN_PROGRESS_MASK) > 0; + } + + static final long FINALIZED_FLAG = + 0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + + static final long CANCELLED_FLAG = + 0b0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + + static final long TERMINATED_FLAG = + 0b0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + + static final long SUBSCRIPTION_SET_FLAG = + 0b0000_1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + + static final long CONNECTED_FLAG = + 0b0000_0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + + static final long WORK_IN_PROGRESS_MASK = + 0b0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_1111_1111_1111L; } static abstract class PubSubInner implements InnerProducer { @@ -649,7 +917,7 @@ static final class PublishInner extends PubSubInner { void drainParent() { PublishSubscriber p = parent; if (p != null) { - p.drain(); + p.drainFromInner(); } } @@ -658,7 +926,7 @@ void removeAndDrainParent() { PublishSubscriber p = parent; if (p != null) { p.remove(this); - p.drain(); + p.drainFromInner(); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java b/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java index f1cc13d99d..dbcbebf449 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/OnDiscardShouldNotLeakTest.java @@ -169,6 +169,13 @@ public class OnDiscardShouldNotLeakTest { .map(Function.identity()) .map(Function.identity()) .publishOn(Schedulers.immediate())), + // TODO: uncomment me. Proper discard is not supported yet since we dont have stable + // downstream context available all the time. This should be uncommented once we have + // an explicitly passed onDiscard handler + /*DiscardScenario.fluxSource("publishOnAndPublish", main -> main + .publishOn(Schedulers.immediate()) + .publish() + .refCount()),*/ DiscardScenario.sinkSource("unicastSink", Sinks.unsafe().many().unicast()::onBackpressureBuffer, null), DiscardScenario.sinkSource("unicastSinkAndPublishOn", Sinks.unsafe().many().unicast()::onBackpressureBuffer, f -> f.publishOn(Schedulers.immediate())),