From a53a6443ff5ccfaee6c261a6bf8686dd36ecea9a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 15 Apr 2016 14:52:47 +0200 Subject: [PATCH] Pushed MonoWhen operator. --- .../java/reactor/core/publisher/Mono.java | 14 +- .../java/reactor/core/publisher/MonoWhen.java | 209 ++++++++++++++++++ 2 files changed, 216 insertions(+), 7 deletions(-) create mode 100644 src/main/java/reactor/core/publisher/MonoWhen.java diff --git a/src/main/java/reactor/core/publisher/Mono.java b/src/main/java/reactor/core/publisher/Mono.java index c647e7c9de..042c01b216 100644 --- a/src/main/java/reactor/core/publisher/Mono.java +++ b/src/main/java/reactor/core/publisher/Mono.java @@ -457,7 +457,7 @@ public static Mono never() { * @return a {@link Mono}. */ public static Mono> when(Mono p1, Mono p2) { - return zip(Tuple.fn2(), p1, p2); + return new MonoWhen(p1, p2); } /** @@ -477,7 +477,7 @@ public static Mono> when(Mono p1, Mono Mono> when(Mono p1, Mono p2, Mono p3) { - return zip(Tuple.fn3(), p1, p2, p3); + return new MonoWhen(p1, p2, p3); } /** @@ -502,7 +502,7 @@ public static Mono> when(Mono p2, Mono p3, Mono p4) { - return zip(Tuple.fn4(), p1, p2, p3, p4); + return new MonoWhen(p1, p2, p3, p4); } /** @@ -530,7 +530,7 @@ public static Mono> when(Mono p3, Mono p4, Mono p5) { - return zip(Tuple.fn5(), p1, p2, p3, p4, p5); + return new MonoWhen(p1, p2, p3, p4, p5); } /** @@ -561,7 +561,7 @@ public static Mono> when Mono p4, Mono p5, Mono p6) { - return zip(Tuple.fn6(), p1, p2, p3, p4, p5, p6); + return new MonoWhen(p1, p2, p3, p4, p5, p6); } /** @@ -579,7 +579,7 @@ public static Mono> when @SafeVarargs @SuppressWarnings({"unchecked","varargs"}) private static Mono when(Mono... monos) { - return zip(array -> (T[])array, monos); + return new MonoWhen<>(monos).map(v -> (T[])v.toArray()); } /** @@ -617,7 +617,7 @@ private static Mono zip(Function combin */ @SuppressWarnings("unchecked") public static Mono when(final Iterable> monos) { - return zip(array -> (T[])array, monos); + return new MonoWhen(monos).map(t -> (T[])t.toArray()); } /** diff --git a/src/main/java/reactor/core/publisher/MonoWhen.java b/src/main/java/reactor/core/publisher/MonoWhen.java new file mode 100644 index 0000000000..b17e13125d --- /dev/null +++ b/src/main/java/reactor/core/publisher/MonoWhen.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.Objects; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +import reactor.core.subscriber.DeferredScalarSubscriber; +import reactor.core.tuple.*; +import reactor.core.util.BackpressureUtils; + +/** + * Waits for all Mono sources to produce a value or terminate, and if + * all of them produced a value, emit a Tuple of those values; otherwise + * terminate. + * + * @param the source value types + */ +public final class MonoWhen extends Mono { + + final Mono[] sources; + + final Iterable> sourcesIterable; + + @SafeVarargs + public MonoWhen(Mono... sources) { + this.sources = Objects.requireNonNull(sources, "sources"); + this.sourcesIterable = null; + } + + public MonoWhen(Iterable> sourcesIterable) { + this.sources = null; + this.sourcesIterable = Objects.requireNonNull(sourcesIterable, "sourcesIterable"); + } + + @SuppressWarnings("unchecked") + @Override + public void subscribe(Subscriber s) { + Mono[] a; + int n = 0; + if (sources != null) { + a = sources; + n = a.length; + } else { + a = new Mono[8]; + for (Mono m : sourcesIterable) { + if (n == a.length) { + Mono[] b = new Mono[n + (n >> 2)]; + System.arraycopy(a, 0, b, 0, n); + a = b; + } + a[n++] = m; + } + } + + MonoWhenCoordinator parent = new MonoWhenCoordinator<>(s, n); + s.onSubscribe(parent); + parent.subscribe(a); + } + + static final class MonoWhenCoordinator + extends DeferredScalarSubscriber + implements Subscription { + final MonoWhenSubscriber[] subscribers; + + volatile int done; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater DONE = + AtomicIntegerFieldUpdater.newUpdater(MonoWhenCoordinator.class, "done"); + + @SuppressWarnings("unchecked") + public MonoWhenCoordinator(Subscriber subscriber, int n) { + super(subscriber); + subscribers = new MonoWhenSubscriber[n]; + for (int i = 0; i < n; i++) { + subscribers[i] = new MonoWhenSubscriber<>(this); + } + } + + void subscribe(Mono[] sources) { + MonoWhenSubscriber[] a = subscribers; + for (int i = 0; i < a.length; i++) { + sources[i].subscribe(a[i]); + } + } + + void signal() { + MonoWhenSubscriber[] a = subscribers; + int n = a.length; + if (DONE.incrementAndGet(this) != n) { + return; + } + + Object[] o = new Object[n]; + Throwable error = null; + Throwable compositeError = null; + boolean hasEmpty = false; + + for (int i = 0; i < a.length; i++) { + MonoWhenSubscriber m = a[i]; + T v = m.value; + if (v != null) { + o[i] = v; + } else { + Throwable e = m.error; + if (e != null) { + if (compositeError != null) { + compositeError.addSuppressed(e); + } else + if (error != null) { + compositeError = new Throwable("Multiple errors"); + compositeError.addSuppressed(error); + compositeError.addSuppressed(e); + } else { + error = e; + } + } else { + hasEmpty = true; + } + } + } + + if (compositeError != null) { + subscriber.onError(compositeError); + } else + if (error != null) { + subscriber.onError(compositeError); + } else + if (hasEmpty) { + subscriber.onComplete(); + } else { + complete(Tuple.of(o)); + } + } + + @Override + public void cancel() { + if (!isCancelled()) { + super.cancel(); + for (MonoWhenSubscriber ms : subscribers) { + ms.cancel(); + } + } + } + } + + static final class MonoWhenSubscriber implements Subscriber { + + final MonoWhenCoordinator parent; + + volatile Subscription s; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater S = + AtomicReferenceFieldUpdater.newUpdater(MonoWhenSubscriber.class, Subscription.class, "s"); + + T value; + Throwable error; + + public MonoWhenSubscriber(MonoWhenCoordinator parent) { + this.parent = parent; + } + + @Override + public void onSubscribe(Subscription s) { + if (BackpressureUtils.setOnce(S, this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + if (value == null) { + value = t; + parent.signal(); + } + } + + @Override + public void onError(Throwable t) { + error = t; + parent.signal(); + } + + @Override + public void onComplete() { + parent.signal(); + } + + void cancel() { + BackpressureUtils.terminate(S, this); + } + } +}