From 6ab518681f42b4e3b683d4d1200b048a5c450784 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Wed, 2 May 2018 10:22:29 -0700 Subject: [PATCH] port MonoProcessor simplification in preparation for #1114 --- .../java/reactor/core/publisher/Mono.java | 5 +- .../core/publisher/MonoCompletionStage.java | 8 +- .../reactor/core/publisher/MonoProcessor.java | 538 +++++++++--------- .../core/publisher/MonoProcessorTest.java | 14 +- 4 files changed, 287 insertions(+), 278 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index cd7656722a..6c34690c38 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -3507,9 +3507,8 @@ public final CompletableFuture toFuture() { /** * Wrap this {@link Mono} into a {@link MonoProcessor} (turning it hot and allowing to block, - * cancel, as well as many other operations). Note that the {@link MonoProcessor} is - * {@link MonoProcessor#connect() connected to} (which is equivalent to calling subscribe - * on it). + * cancel, as well as many other operations). Note that the {@link MonoProcessor} + * is subscribed to its parent source if any. * *

* diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java b/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java index 45c14f12fb..834c10e739 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. + * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,9 +59,11 @@ public void subscribe(CoreSubscriber actual) { try { if (e != null) { actual.onError(e); - } else if (v != null) { + } + else if (v != null) { sds.complete(v); - } else { + } + else { actual.onComplete(); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java index 5770722f6c..655220c113 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. + * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,9 @@ import java.time.Duration; import java.util.Objects; import java.util.concurrent.CancellationException; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.LongSupplier; +import java.util.stream.Stream; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; @@ -80,46 +80,88 @@ public static MonoProcessor create(WaitStrategy waitStrategy) { final WaitStrategy waitStrategy; - Publisher source; - Subscription subscription; - volatile FluxProcessor processor; //deliberately initially null, don't use NOOP_PROCESSOR - volatile O value; - volatile Throwable error; - volatile int state; - volatile int wip; - volatile int connected; + volatile NextInner[] subscribers; + + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater SUBSCRIBERS = + AtomicReferenceFieldUpdater.newUpdater(MonoProcessor.class, + NextInner[].class, + "subscribers"); + + @SuppressWarnings("rawtypes") + static final NextInner[] EMPTY = new NextInner[0]; + + @SuppressWarnings("rawtypes") + static final NextInner[] TERMINATED = new NextInner[0]; + + @SuppressWarnings("rawtypes") + static final NextInner[] EMPTY_WITH_SOURCE = new NextInner[0]; + + Publisher source; + + Throwable error; + O value; + + + volatile Subscription subscription; + static final AtomicReferenceFieldUpdater UPSTREAM = + AtomicReferenceFieldUpdater.newUpdater(MonoProcessor.class, Subscription + .class, "subscription"); MonoProcessor(@Nullable Publisher source) { this(source, WaitStrategy.sleeping()); } MonoProcessor(@Nullable Publisher source, WaitStrategy waitStrategy) { - this.source = source; this.waitStrategy = Objects.requireNonNull(waitStrategy, "waitStrategy"); + this.source = source; + SUBSCRIBERS.lazySet(this, source != null ? EMPTY_WITH_SOURCE : EMPTY); } @Override public final void cancel() { - int state = this.state; - for (; ; ) { - if (state != STATE_READY && state != STATE_SUBSCRIBED && state != STATE_POST_SUBSCRIBED) { - return; - } - if (STATE.compareAndSet(this, state, STATE_CANCELLED)) { - break; - } - state = this.state; + if (isTerminated()) { + return; } - if (WIP.getAndIncrement(this) == 0) { - drainLoop(); + + Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); + if (s == Operators.cancelledSubscription()) { + return; + } + + source = null; + if (s != null) { + s.cancel(); } } @Override + @SuppressWarnings("unchecked") public void dispose() { - cancel(); + Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); + if (s == Operators.cancelledSubscription()) { + return; + } + + source = null; + if (s != null) { + s.cancel(); + } + + + NextInner[] a; + if ((a = SUBSCRIBERS.getAndSet(this, TERMINATED)) != TERMINATED) { + Exception e = new CancellationException("Disposed"); + error = e; + value = null; + for (NextInner as : a) { + as.onError(e); + } + } + + waitStrategy.signalAllWhenBlocking(); } /** @@ -161,22 +203,22 @@ O block(Runnable spinObserver) { if (!isPending()) { return peek(); } - else if(subscription == null) { - getOrStart(); - } + + connect(); try { - long endState = waitStrategy.waitFor(STATE_SUCCESS_VALUE, this, spinObserver); + //wait for a terminated state (minimum 3) + long endState = waitStrategy.waitFor(3L, this, spinObserver); switch ((int)endState) { - case STATE_SUCCESS_VALUE: + case 3: //terminated with value return value; - case STATE_ERROR: + case 4: //terminated without value + return null; + case 5: //terminated with error RuntimeException re = Exceptions.propagate(error); re = Exceptions.addSuppressed(re, new Exception("Mono#block terminated with an error")); throw re; - case STATE_COMPLETE_NO_VALUE: - return null; } throw new IllegalStateException("Mono has been cancelled"); } @@ -195,6 +237,46 @@ else if(subscription == null) { } } + /** + * Returns the internal state as a long fulfilled. + *

+ * + * @return the internal state from -1 Cancelled to 5 errored, beyond 3 included is + * fulfilled. + * @deprecated Should not use (to be removed in 3.2) + */ + @Override + public long getAsLong() { + //FIXME Should Be Removed alongside WaitStrategy + NextInner[] inners = subscribers; + if (inners == TERMINATED) { + if (error != null) { + return 5L; // terminated with error + } + if (value == null) { + return 4L; // terminated without value + } + return 3L; //terminated with value + } + + if (subscription == Operators.cancelledSubscription()) { + return -1L; //cancelled + } + + if (subscribers != EMPTY && subscribers != EMPTY_WITH_SOURCE) { + return 2L; // subscribed + } + + return 0L; + } + /** * Return the produced {@link Throwable} error if any or null * @@ -202,7 +284,7 @@ else if(subscription == null) { */ @Nullable public final Throwable getError() { - return error; + return isTerminated() ? error : null; } /** @@ -212,7 +294,7 @@ public final Throwable getError() { * otherwise. */ public boolean isCancelled() { - return state == STATE_CANCELLED; + return subscription == Operators.cancelledSubscription() && !isTerminated(); } /** @@ -221,7 +303,7 @@ public boolean isCancelled() { * @return {@code true} if this {@code MonoProcessor} was completed with an error, {@code false} otherwise. */ public final boolean isError() { - return state == STATE_ERROR; + return getError() != null; } /** @@ -230,7 +312,7 @@ public final boolean isError() { * @return {@code true} if this {@code MonoProcessor} is successful, {@code false} otherwise. */ public final boolean isSuccess() { - return state == STATE_COMPLETE_NO_VALUE || state == STATE_SUCCESS_VALUE; + return isTerminated() && error == null; } /** @@ -240,7 +322,7 @@ public final boolean isSuccess() { * @return {@code true} if this {@code MonoProcessor} is successful, {@code false} otherwise. */ public final boolean isTerminated() { - return state > STATE_POST_SUBSCRIBED; + return subscribers == TERMINATED; } @Override @@ -250,128 +332,87 @@ public boolean isDisposed() { @Override public final void onComplete() { - Subscription s = subscription; - int state = this.state; - if ((source != null && s == null) || state >= STATE_SUCCESS_VALUE) { - return; - } - subscription = null; - source = null; - - final int finalState = STATE_COMPLETE_NO_VALUE; - - for (; ; ) { - if (STATE.compareAndSet(this, state, finalState)) { - waitStrategy.signalAllWhenBlocking(); - break; - } - state = this.state; - } - - if (WIP.getAndIncrement(this) == 0) { - drainLoop(); - } + onNext(null); } @Override + @SuppressWarnings("unchecked") public final void onError(Throwable cause) { - Subscription s = subscription; + Objects.requireNonNull(cause, "onError cannot be null"); - if ((source != null && s == null) || this.error != null) { + if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription()) + == Operators.cancelledSubscription()) { Operators.onErrorDroppedMulticast(cause); return; } - this.error = cause; - subscription = null; + error = cause; + value = null; source = null; - int state = this.state; - for (; ; ) { - if (state != STATE_READY && state != STATE_SUBSCRIBED && state != STATE_POST_SUBSCRIBED) { - Operators.onErrorDroppedMulticast(cause); - return; - } - if (STATE.compareAndSet(this, state, STATE_ERROR)) { - waitStrategy.signalAllWhenBlocking(); - break; - } - state = this.state; - } - if (WIP.getAndIncrement(this) == 0) { - drainLoop(); + for (NextInner as : SUBSCRIBERS.getAndSet(this, TERMINATED)) { + as.onError(cause); } + + waitStrategy.signalAllWhenBlocking(); } @Override - public final void onNext(O value) { - Subscription s = subscription; - - if (value != null && ((source != null && s == null) || this.value != null)) { - Operators.onNextDroppedMulticast(value); - return; - } - subscription = null; - - final int finalState; - if(value != null) { - finalState = STATE_SUCCESS_VALUE; - this.value = value; - if (s != null && !(source instanceof Mono)) { - s.cancel(); + @SuppressWarnings("unchecked") + public final void onNext(@Nullable O value) { + Subscription s; + if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription())) + == Operators.cancelledSubscription()) { + if (value != null) { + Operators.onNextDroppedMulticast(value); } - } - else { //shouldn't happen - finalState = STATE_COMPLETE_NO_VALUE; + return; } + this.value = value; + Publisher parent = source; source = null; - int state = this.state; - for (; ; ) { - if (state != STATE_READY && state != STATE_SUBSCRIBED && state != STATE_POST_SUBSCRIBED) { - if(value != null) { - Operators.onNextDroppedMulticast(value); - } - return; - } - if (STATE.compareAndSet(this, state, finalState)) { - waitStrategy.signalAllWhenBlocking(); - break; + NextInner[] array = SUBSCRIBERS.getAndSet(this, TERMINATED); + + if (value == null) { + for (NextInner as : array) { + as.onComplete(); } - state = this.state; } + else { + if (s != null && !(parent instanceof Mono)) { + s.cancel(); + } - - if (WIP.getAndIncrement(this) == 0) { - drainLoop(); + for (NextInner as : array) { + as.complete(value); + } } + + waitStrategy.signalAllWhenBlocking(); } @Override public final void onSubscribe(Subscription subscription) { - if (Operators.validate(this.subscription, subscription)) { - this.subscription = subscription; - if (STATE.compareAndSet(this, STATE_READY, STATE_SUBSCRIBED)){ - subscription.request(Long.MAX_VALUE); - } + if (Operators.setOnce(UPSTREAM, this, subscription)) { + subscription.request(Long.MAX_VALUE); + } + } - if (WIP.getAndIncrement(this) == 0) { - drainLoop(); - } + @Override + public Context currentContext() { + //FIXME when #1114 merged + NextInner[] inners = this.subscribers; + if (inners.length > 0) { + return inners[0].actual.currentContext(); } + return Context.empty(); } - /** - * Returns the internal state from -1 Cancelled to 5 errored, beyond 3 included is - * fulfilled. - * - * @return the internal state from -1 Cancelled to 5 errored, beyond 3 included is - * fulfilled. - */ @Override - public long getAsLong() { - return state; + public Stream inners() { + return Stream.of(subscribers); } /** @@ -384,87 +425,77 @@ public long getAsLong() { */ @Nullable public O peek() { - int endState = this.state; + if (!isTerminated()) { + return null; + } - if (endState == STATE_SUCCESS_VALUE) { + if (value != null) { return value; } - else if (endState == STATE_ERROR) { + + if (error != null) { RuntimeException re = Exceptions.propagate(error); re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error")); throw re; } - else { - return null; - } + + return null; } @Override public final void request(long n) { - if (Operators.validate(n) && - WIP.getAndIncrement(this) == 0) { - drainLoop(); - } + Operators.validate(n); } @Override public void subscribe(final CoreSubscriber actual) { - for (; ; ) { - int endState = this.state; - if (endState == STATE_COMPLETE_NO_VALUE) { - Operators.complete(actual); - return; - } - else if (endState == STATE_SUCCESS_VALUE) { - actual.onSubscribe(Operators.scalarSubscription(actual, value)); - return; - } - else if (endState == STATE_ERROR) { - Operators.error(actual, error); - return; + NextInner as = new NextInner<>(actual, this); + actual.onSubscribe(as); + if (add(as)) { + if (as.isCancelled()) { + remove(as); } - else if (endState == STATE_CANCELLED) { - Operators.error(actual, new CancellationException("Mono has previously been cancelled")); - return; + } + else { + Throwable ex = error; + if (ex != null) { + actual.onError(ex); } - Processor out = getOrStart(); - if (out == NOOP_PROCESSOR) { - continue; + else { + O v = value; + if (v != null) { + as.complete(v); + } + else { + as.onComplete(); + } } - out.subscribe(actual); - break; } + } - if (WIP.getAndIncrement(this) == 0) { - drainLoop(); + void connect() { + Publisher parent = source; + if (parent != null && SUBSCRIBERS.compareAndSet(this, EMPTY_WITH_SOURCE, EMPTY)) { + parent.subscribe(this); } } @Override @Nullable public Object scanUnsafe(Attr key) { - if (key == Attr.ACTUAL) return processor; + //touch guard + boolean t = isTerminated(); + + if (key == Attr.TERMINATED) return t; if (key == Attr.PARENT) return subscription; if (key == Attr.ERROR) return error; if (key == Attr.PREFETCH) return Integer.MAX_VALUE; if (key == Attr.CANCELLED) return isCancelled(); - if (key == Attr.TERMINATED) return isTerminated(); return null; } final boolean isPending() { - return !isTerminated() && !isCancelled(); - } - - final void connect() { - if(CONNECTED.compareAndSet(this, 0, 1)){ - if(source == null){ - onSubscribe(Operators.emptySubscription()); - } - else{ - source.subscribe(this); - } - } + return !isTerminated(); } /** @@ -473,8 +504,7 @@ final void connect() { * @return the number of active {@link Subscriber} or {@literal -1} if untracked */ public final long downstreamCount() { - //noinspection ConstantConditions - return Scannable.from(processor).inners().count(); + return subscribers.length; } /** @@ -486,124 +516,104 @@ public final boolean hasDownstreams() { return downstreamCount() != 0; } - @SuppressWarnings("unchecked") - final void drainLoop() { - int missed = 1; - - int state; - for (; ; ) { - state = this.state; - if (state > STATE_POST_SUBSCRIBED) { - Processor p = PROCESSOR.getAndSet(this, NOOP_PROCESSOR); - if (p != NOOP_PROCESSOR && p != null) { - switch (state) { - case STATE_COMPLETE_NO_VALUE: - p.onComplete(); - break; - case STATE_SUCCESS_VALUE: - p.onNext(value); - p.onComplete(); - break; - case STATE_ERROR: - p.onError(error); - break; - } - return; - } + boolean add(NextInner ps) { + for (;;) { + NextInner[] a = subscribers; + + if (a == TERMINATED) { + return false; } - Subscription subscription = this.subscription; - - if (subscription != null && state == STATE_CANCELLED) { - FluxProcessor p = PROCESSOR.getAndSet(this, NOOP_PROCESSOR); - if (p != NOOP_PROCESSOR) { - this.subscription = null; - this.source = null; - subscription.cancel(); - //we need p = null as a 3rd possible value to detect the case were we should null out the source/subscription - if (p != null) { - p.dispose(); - } - return; + + int n = a.length; + @SuppressWarnings("unchecked") + NextInner[] b = new NextInner[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = ps; + + if (SUBSCRIBERS.compareAndSet(this, a, b)) { + Publisher parent = source; + if (parent != null && a == EMPTY_WITH_SOURCE) { + parent.subscribe(this); } + return true; } + } + } - if (state == STATE_SUBSCRIBED && STATE.compareAndSet(this, STATE_SUBSCRIBED, STATE_POST_SUBSCRIBED)) { - Processor p = PROCESSOR.get(this); - if (p != null && p != NOOP_PROCESSOR) { - p.onSubscribe(this); + @SuppressWarnings("unchecked") + void remove(NextInner ps) { + for (;;) { + NextInner[] a = subscribers; + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == ps) { + j = i; + break; } } - missed = WIP.addAndGet(this, -missed); - if (missed == 0) { - break; + if (j < 0) { + return; } - } - } - @SuppressWarnings("unchecked") - FluxProcessor getOrStart(){ - FluxProcessor out = processor; - if (out == null) { - out = ReplayProcessor.cacheLastOrDefault(value); - if (PROCESSOR.compareAndSet(this, null, out)) { - connect(); + NextInner[] b; + + if (n == 1) { + b = EMPTY; + } else { + b = new NextInner[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); } - else { - out = PROCESSOR.get(this); + if (SUBSCRIBERS.compareAndSet(this, a, b)) { + return; } } - return out; } - @SuppressWarnings("rawtypes") - final static class NoopProcessor extends FluxProcessor { - - @Override - public void onComplete() { + final static class NextInner extends Operators.MonoSubscriber { + final MonoProcessor parent; + NextInner(CoreSubscriber actual, MonoProcessor parent) { + super(actual); + this.parent = parent; } @Override - public void onError(Throwable t) { - + public void cancel() { + if (STATE.getAndSet(this, CANCELLED) != CANCELLED) { + parent.remove(this); + } } @Override - public void onNext(Object o) { - + public void onComplete() { + if (!isCancelled()) { + actual.onComplete(); + } } @Override - public void onSubscribe(Subscription s) { - + public void onError(Throwable t) { + if (isCancelled()) { + Operators.onOperatorError(t, currentContext()); + } else { + actual.onError(t); + } } @Override - public void subscribe(CoreSubscriber actual) { - + public Object scanUnsafe(Attr key) { + if (key == Attr.PARENT) { + return parent; + } + return super.scanUnsafe(key); } } - final static NoopProcessor NOOP_PROCESSOR = new NoopProcessor(); - @SuppressWarnings("rawtypes") - final static AtomicIntegerFieldUpdater STATE = - AtomicIntegerFieldUpdater.newUpdater(MonoProcessor.class, "state"); - @SuppressWarnings("rawtypes") - final static AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(MonoProcessor.class, "wip"); - @SuppressWarnings("rawtypes") - final static AtomicIntegerFieldUpdater CONNECTED = - AtomicIntegerFieldUpdater.newUpdater(MonoProcessor.class, "connected"); - @SuppressWarnings("rawtypes") - final static AtomicReferenceFieldUpdater PROCESSOR = - AtomicReferenceFieldUpdater.newUpdater(MonoProcessor.class, FluxProcessor.class, - "processor"); - final static int STATE_CANCELLED = -1; - final static int STATE_READY = 0; - final static int STATE_SUBSCRIBED = 1; - final static int STATE_POST_SUBSCRIBED = 2; - final static int STATE_SUCCESS_VALUE = 3; - final static int STATE_COMPLETE_NO_VALUE = 4; - final static int STATE_ERROR = 5; } \ No newline at end of file diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoProcessorTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoProcessorTest.java index 16df12c0c5..a6fa9ad96b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoProcessorTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoProcessorTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. + * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.reactivestreams.Subscription; import reactor.core.Scannable; import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; import reactor.util.function.Tuple2; import static org.assertj.core.api.Assertions.assertThat; @@ -46,7 +47,7 @@ public void noRetentionOnTermination() throws InterruptedException { WeakReference> refFuture = new WeakReference<>(future); Mono source = Mono.fromFuture(future); - Mono data = source.map(Date::toString).cache(); + Mono data = source.map(Date::toString).log().cache().log(); future.complete(date); assertThat(data.block()).isEqualTo(date.toString()); @@ -545,9 +546,6 @@ public void scanProcessorSubscription() { assertThat(test.scan(Scannable.Attr.ACTUAL)).isNull(); assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(subscription); - - test.getOrStart(); - assertThat(test.scan(Scannable.Attr.ACTUAL)).isNotNull(); } @Test @@ -574,10 +572,10 @@ public void monoToProcessorReusesInstance() { @Test public void monoToProcessorConnects() { - MonoProcessor connectedProcessor = Mono.just("foo") - .toProcessor(); + TestPublisher tp = TestPublisher.create(); + MonoProcessor connectedProcessor = tp.mono().toProcessor(); - assertThat(connectedProcessor.connected).isEqualTo(1); + assertThat(connectedProcessor.subscription).isNotNull(); } @Test