Skip to content

Commit

Permalink
fix #334 Add doOnEach on Mono and ParallelFlux
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Jan 9, 2017
1 parent ee73625 commit e1d9549
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,31 @@ public final Mono<T> doOnSuccess(Consumer<? super T> onSuccess) {
return onAssembly(new MonoPeekTerminal<>(this, onSuccess, null, null));
}

/**
* Triggers side-effects when the {@link Mono} emits an item, fails with an error
* or completes successfully. All these events are represented as a {@link Signal}
* that is passed to the side-effect callback. Note that this is an advanced operator,
* typically used for monitoring of a Mono.
*
* @param signalConsumer the mandatory callback to call on
* {@link Subscriber#onNext(Object)}, {@link Subscriber#onError(Throwable)} and
* {@link Subscriber#onComplete()}
* @return an observed {@link Mono}
* @see #doOnNext(Consumer)
* @see #doOnError(Consumer)
* @see #materialize()
* @see Signal
*/
public final Mono<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
Objects.requireNonNull(signalConsumer, "signalConsumer");
return doOnSignal(this,
null,
v -> signalConsumer.accept(Signal.next(v)),
e -> signalConsumer.accept(Signal.<T>error(e)),
() -> signalConsumer.accept(Signal.<T>complete()),
null, null, null);
}

/**
* Triggered when the {@link Mono} completes with an error.
*
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/reactor/core/publisher/ParallelFlux.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,39 @@ public final ParallelFlux<T> doOnComplete(Runnable onComplete) {
return doOnSignal(this, null, null, null, onComplete, null, null, null, null);
}

/**
* Triggers side-effects when the {@link ParallelFlux} emits an item, fails with an error
* or completes successfully. All these events are represented as a {@link Signal}
* that is passed to the side-effect callback. Note that with {@link ParallelFlux} and
* the {@link #subscribe(Consumer) lambda-based subscribes} or the
* {@link #subscribe(Subscriber[]) array-based one}, onError and onComplete will be
* invoked as many times as there are rails, resulting in as many corresponding
* {@link Signal} being seen in the callback.
* <p>
* Use of {@link #subscribe(Subscriber)}, which calls {@link #sequential()}, might
* cancel some rails, resulting in less signals being observed. This is an advanced
* operator, typically used for monitoring of a ParallelFlux.
*
* @param signalConsumer the mandatory callback to call on
* {@link Subscriber#onNext(Object)}, {@link Subscriber#onError(Throwable)} and
* {@link Subscriber#onComplete()}
* @return an observed {@link ParallelFlux}
* @see #doOnNext(Consumer)
* @see #doOnError(Consumer)
* @see #doOnComplete(Runnable)
* @see #subscribe(Subscriber[])
* @see Signal
*/
public final ParallelFlux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
Objects.requireNonNull(signalConsumer, "signalConsumer");
return doOnSignal(this,
v -> signalConsumer.accept(Signal.next(v)),
null,
e -> signalConsumer.accept(Signal.<T>error(e)),
() -> signalConsumer.accept(Signal.<T>complete()),
null, null, null, null);
}

/**
* Call the specified consumer with the exception passing through any 'rail'.
*
Expand Down
85 changes: 85 additions & 0 deletions src/test/java/reactor/core/publisher/ParallelFluxTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,28 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;

public class ParallelFluxTest {

@Test
Expand Down Expand Up @@ -451,4 +463,77 @@ public void collectAsync3Take() {
.assertNoError()
.assertComplete();
}

@Test
public void testDoOnEachSignal() throws InterruptedException {
List<Signal<Integer>> signals = Collections.synchronizedList(new ArrayList<>(4));
List<Integer> values = Collections.synchronizedList(new ArrayList<>(2));
ParallelFlux<Integer> flux = Flux.just(1, 2)
.parallel(3)
.doOnEach(signals::add)
.doOnEach(s -> {
if (s.isOnNext())
values.add(s.get());
});

//we use a lambda subscriber and latch to avoid using `sequential`
CountDownLatch latch = new CountDownLatch(2);
flux.subscribe(v -> {}, e -> latch.countDown(), latch::countDown);

assertTrue(latch.await(2, TimeUnit.SECONDS));

assertThat(signals.size(), is(5));
assertThat("first onNext signal isn't first value", signals.get(0).get(), is(1));
assertThat("second onNext signal isn't last value", signals.get(1).get(), is(2));
assertTrue("onComplete for rail 1 expected", signals.get(2).isOnComplete());
assertTrue("onComplete for rail 2 expected", signals.get(3).isOnComplete());
assertTrue("onComplete for rail 3 expected", signals.get(4).isOnComplete());
assertThat("1st onNext value unexpected", values.get(0), is(1));
assertThat("2nd onNext value unexpected", values.get(1), is(2));
}

@Test
public void testDoOnEachSignalWithError() throws InterruptedException {
List<Signal<Integer>> signals = Collections.synchronizedList(new ArrayList<>(4));
ParallelFlux<Integer> flux = Flux.<Integer>error(new IllegalArgumentException("boom"))
.parallel(2)
.runOn(Schedulers.parallel())
.doOnEach(signals::add);

//we use a lambda subscriber and latch to avoid using `sequential`
CountDownLatch latch = new CountDownLatch(2);
flux.subscribe(v -> {}, e -> latch.countDown(), latch::countDown);

assertTrue(latch.await(2, TimeUnit.SECONDS));

assertThat(signals.toString(), signals.size(), is(2));
assertTrue("rail 1 onError expected", signals.get(0).isOnError());
assertTrue("rail 2 onError expected", signals.get(1).isOnError());
assertThat("plain exception rail 1 expected", signals.get(0).getThrowable().getMessage(), is("boom"));
assertThat("plain exception rail 2 expected", signals.get(1).getThrowable().getMessage(), is("boom"));
}

@Test(expected = NullPointerException.class)
public void testDoOnEachSignalNullConsumer() {
Flux.just(1).parallel().doOnEach(null);
}

@Test
public void testDoOnEachSignalToSubscriber() {
AssertSubscriber<Integer> peekSubscriber = AssertSubscriber.create();
ParallelFlux<Integer> flux = Flux.just(1, 2)
.parallel(3)
.doOnEach(s -> s.accept(peekSubscriber));

//we use a lambda subscriber and latch to avoid using `sequential`
flux.subscribe(v -> {});

peekSubscriber.assertNotSubscribed();
peekSubscriber.assertValues(1, 2);

Assertions.assertThatExceptionOfType(AssertionError.class)
.isThrownBy(peekSubscriber::assertComplete)
.withMessage("Multiple completions: 3");
}

}
71 changes: 71 additions & 0 deletions src/test/java/reactor/core/publisher/scenarios/MonoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,97 @@
*/
package reactor.core.publisher.scenarios;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.function.Tuple2;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* @author Stephane Maldini
*/
public class MonoTests {

@Test
public void testDoOnEachSignal() {
List<Signal<Integer>> signals = new ArrayList<>(4);
Mono<Integer> mono = Mono.just(1)
.doOnEach(signals::add);
StepVerifier.create(mono)
.expectSubscription()
.expectNext(1)
.expectComplete()
.verify();

assertThat(signals.size(), is(2));
assertThat("onNext", signals.get(0).get(), is(1));
assertTrue("onComplete expected", signals.get(1).isOnComplete());
}

@Test
public void testDoOnEachEmpty() {
List<Signal<Integer>> signals = new ArrayList<>(4);
Mono<Integer> mono = Mono.<Integer>empty()
.doOnEach(signals::add);
StepVerifier.create(mono)
.expectSubscription()
.expectComplete()
.verify();

assertThat(signals.size(), is(1));
assertTrue("onComplete expected", signals.get(0).isOnComplete());

}

@Test
public void testDoOnEachSignalWithError() {
List<Signal<Integer>> signals = new ArrayList<>(4);
Mono<Integer> mono = Mono.<Integer>error(new IllegalArgumentException("foo"))
.doOnEach(signals::add);
StepVerifier.create(mono)
.expectSubscription()
.expectErrorMessage("foo")
.verify();

assertThat(signals.size(), is(1));
assertTrue("onError expected", signals.get(0).isOnError());
assertThat("plain exception expected", signals.get(0).getThrowable().getMessage(),
is("foo"));
}

@Test(expected = NullPointerException.class)
public void testDoOnEachSignalNullConsumer() {
Mono.just(1).doOnEach(null);
}

@Test
public void testDoOnEachSignalToSubscriber() {
AssertSubscriber<Integer> peekSubscriber = AssertSubscriber.create();
Mono<Integer> mono = Mono.just(1)
.doOnEach(s -> s.accept(peekSubscriber));
StepVerifier.create(mono)
.expectSubscription()
.expectNext(1)
.expectComplete()
.verify();

peekSubscriber.assertNotSubscribed();
peekSubscriber.assertValues(1);
peekSubscriber.assertComplete();
}

@Test
public void testMonoThenManySupplier() {
Expand Down

0 comments on commit e1d9549

Please sign in to comment.