Skip to content

Commit

Permalink
finish migrating spec to java
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Jan 12, 2017
1 parent c948b6d commit 9eede0e
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 291 deletions.
288 changes: 134 additions & 154 deletions src/test/java/reactor/core/publisher/MonoProcessorTest.java
Expand Up @@ -15,15 +15,23 @@
*/
package reactor.core.publisher;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;

import static org.assertj.core.api.Assertions.assertThat;

public class MonoProcessorTest {

@Test(expected = IllegalStateException.class)
public void MonoProcessorResultNotAvailable() {
MonoProcessor<String> mp = MonoProcessor.create();
mp.block(Duration.ofMillis(1));
}

@Test
public void MonoProcessorRejectedDoOnTerminate() {
MonoProcessor<String> mp = MonoProcessor.create();
Expand Down Expand Up @@ -185,158 +193,130 @@ public void MonoProcessorDoubleSignal() {
mp.onError(new Exception("test"));
}

//
// def "Multiple promises can be combined"() {
// given: "two fulfilled promises"
// def bc1 = EmitterProcessor.<Integer> create().connect()
// def promise1 = bc1.doOnNext { println 'hey' + it }.next()
// def bc2 = MonoProcessor.<Integer> create()
// def promise2 = bc2.flux().log().next()
//
// when: "a combined promise is first created"
// def combined = Mono.when(promise1, promise2).subscribe()
//
// then: "it is pending"
// !combined.pending
//
// when: "the first promise is fulfilled"
// bc1.onNext 1
//
// then: "the combined promise is still pending"
// !combined.pending
//
// when: "the second promise if fulfilled"
// bc2.onNext 2
//
// then: "the combined promise is fulfilled with both values"
// combined.block().t1 == 1
// combined.block().t2 == 2
// combined.success
// }
//
// def "A combined promise is rejected once any of its component promises are rejected"() {
// given: "two unfulfilled promises"
// def promise1 = MonoProcessor.<Integer> create()
// def promise2 = MonoProcessor.<Integer> create()
//
// when: "a combined promise is first created"
// def combined = Mono.when(promise1, promise2).subscribeWith(MonoProcessor.create())
//
// then: "it is pending"
// !combined.pending
//
// when: "a component promise is rejected"
// promise1.onError new Exception()
//
// then: "the combined promise is rejected"
// combined.error
// }
//
// def "A single promise can be 'combined'"() {
// given: "one unfulfilled promise"
// MonoProcessor<Integer> promise1 = MonoProcessor.create()
//
// when: "a combined promise is first created"
// def combined = MonoProcessor.create()
// Mono.when({d -> d } as Function, promise1).log().subscribe(combined)
//
// then: "it is pending"
// !combined.pending
//
// when: "the first promise is fulfilled"
// promise1.onNext 1
//
// then: "the combined promise is fulfilled"
// combined.block(Duration.ofSeconds(1)) == [1]
// combined.success
// }
// def "A filtered promise is not fulfilled if the filter does not allow the value to pass through"() {
// given: "a promise with a filter that only accepts even values"
// def promise = MonoProcessor.create()
// def filtered = promise.flux().filter { it % 2 == 0 }.next()
//
// when: "the promise is fulfilled with an odd value"
// promise.onNext 1
//
// then: "the filtered promise is not fulfilled"
// !filtered.block()
// }
//
// def "A filtered promise is fulfilled if the filter allows the value to pass through"() {
// given: "a promise with a filter that only accepts even values"
// def promise = MonoProcessor.create()
// promise.flux().filter { it % 2 == 0 }.next()
//
// when: "the promise is fulfilled with an even value"
// promise.onNext 2
//
// then: "the filtered promise is fulfilled"
// promise.success
// promise.peek() == 2
// }
//
// def "If a filter throws an exception the filtered promise is rejected"() {
// given: "a promise with a filter that throws an error"
// def promise = MonoProcessor.create()
// def e = new RuntimeException()
// def filteredMonoProcessor = promise.flux().filter { throw e }.next()
//
// when: "the promise is fulfilled"
// promise.onNext 2
// filteredMonoProcessor.block()
//
// then: "the filtered promise is rejected"
// thrown RuntimeException
// }
// def "Errors stop compositions"() {
// given: "a promise"
// def p1 = MonoProcessor.<String> create()
//
// final latch = new CountDownLatch(1)
//
// when: "p1 is consumed by p2"
// MonoProcessor p2 = p1.log().doOnSuccess({ Integer.parseInt it }).
// doOnError{ latch.countDown() }.
// log().
// map { println('not in log'); true }.subscribe()
//
// and: "setting a value"
// p1.onNext 'not a number'
// p2.blockMillis(1_000)
//
// then: 'No value'
// thrown(RuntimeException)
// latch.count == 0
// }
//
// def "Can poll instead of await to automatically handle InterruptedException"() {
// given: "a promise"
// def p1 = MonoProcessor.<String> create()
// def s = Schedulers.newSingle('test')
// when: "p1 is consumed by p2"
// def p2 = p1
// .publishOn(s)
// .map {
// println Thread.currentThread()
// sleep(3000)
// Integer.parseInt it
// }
//
// and: "setting a value"
// p1.onNext '1'
// println "emitted"
// p2.block(Duration.ofSeconds(1))
//
// then: 'No value'
// thrown IllegalStateException
//
// when: 'polling undefinitely'
// def v = p2.block()
//
// then: 'Value!'
// v
//
// cleanup:
// s.dispose()
// }
@Test
public void whenMonoProcessor() {
MonoProcessor<Integer> mp = MonoProcessor.create();
MonoProcessor<Integer> mp2 = MonoProcessor.create();
MonoProcessor<Tuple2<Integer, Integer>> mp3 = MonoProcessor.create();

StepVerifier.create(Mono.when(mp, mp2)
.subscribeWith(mp3))
.then(() -> assertThat(mp3.isPending()).isTrue())
.then(() -> mp.onNext(1))
.then(() -> assertThat(mp3.isPending()).isTrue())
.then(() -> mp2.onNext(2))
.then(() -> {
assertThat(mp3.isTerminated()).isTrue();
assertThat(mp3.isSuccess()).isTrue();
assertThat(mp3.isPending()).isFalse();
assertThat(mp3.peek()
.getT1()).isEqualTo(1);
assertThat(mp3.peek()
.getT2()).isEqualTo(2);
})
.expectNextMatches(t -> t.getT1() == 1 && t.getT2() == 2)
.verifyComplete();
}

@Test
public void whenMonoProcessor2() {
MonoProcessor<Integer> mp = MonoProcessor.create();
MonoProcessor<Integer> mp3 = MonoProcessor.create();

StepVerifier.create(Mono.when(d -> (Integer)d[0], mp)
.subscribeWith(mp3))
.then(() -> assertThat(mp3.isPending()).isTrue())
.then(() -> mp.onNext(1))
.then(() -> {
assertThat(mp3.isTerminated()).isTrue();
assertThat(mp3.isSuccess()).isTrue();
assertThat(mp3.isPending()).isFalse();
assertThat(mp3.peek()).isEqualTo(1);
})
.expectNext(1)
.verifyComplete();
}

@Test
public void whenMonoProcessorRejected() {
MonoProcessor<Integer> mp = MonoProcessor.create();
MonoProcessor<Integer> mp2 = MonoProcessor.create();
MonoProcessor<Tuple2<Integer, Integer>> mp3 = MonoProcessor.create();

StepVerifier.create(Mono.when(mp, mp2)
.subscribeWith(mp3))
.then(() -> assertThat(mp3.isPending()).isTrue())
.then(() -> mp.onError(new Exception("test")))
.then(() -> {
assertThat(mp3.isTerminated()).isTrue();
assertThat(mp3.isSuccess()).isFalse();
assertThat(mp3.isPending()).isFalse();
assertThat(mp3.getError()).hasMessage("test");
})
.verifyErrorMessage("test");
}


@Test
public void filterMonoProcessor() {
MonoProcessor<Integer> mp = MonoProcessor.create();
MonoProcessor<Integer> mp2 = MonoProcessor.create();
StepVerifier.create(mp.filter(s -> s % 2 == 0).subscribeWith(mp2))
.then(() -> mp.onNext(2))
.then(() -> assertThat(mp2.isError()).isFalse())
.then(() -> assertThat(mp2.isSuccess()).isTrue())
.then(() -> assertThat(mp2.peek()).isEqualTo(2))
.then(() -> assertThat(mp2.isTerminated()).isTrue())
.expectNext(2)
.verifyComplete();
}


@Test
public void filterMonoProcessorNot() {
MonoProcessor<Integer> mp = MonoProcessor.create();
MonoProcessor<Integer> mp2 = MonoProcessor.create();
StepVerifier.create(mp.filter(s -> s % 2 == 0).subscribeWith(mp2))
.then(() -> mp.onNext(1))
.then(() -> assertThat(mp2.isError()).isFalse())
.then(() -> assertThat(mp2.isSuccess()).isTrue())
.then(() -> assertThat(mp2.peek()).isNull())
.then(() -> assertThat(mp2.isTerminated()).isTrue())
.verifyComplete();
}

@Test
public void filterMonoProcessorError() {
MonoProcessor<Integer> mp = MonoProcessor.create();
MonoProcessor<Integer> mp2 = MonoProcessor.create();
StepVerifier.create(mp.filter(s -> {throw new RuntimeException("test"); })
.subscribeWith
(mp2))
.then(() -> mp.onNext(2))
.then(() -> assertThat(mp2.isError()).isTrue())
.then(() -> assertThat(mp2.isSuccess()).isFalse())
.then(() -> assertThat(mp2.getError()).hasMessage("test"))
.then(() -> assertThat(mp2.isTerminated()).isTrue())
.verifyErrorMessage("test");
}

@Test
public void doOnSuccessMonoProcessorError() {
MonoProcessor<Integer> mp = MonoProcessor.create();
MonoProcessor<Integer> mp2 = MonoProcessor.create();
AtomicReference<Throwable> ref = new AtomicReference<>();

StepVerifier.create(mp.doOnSuccess(s -> {throw new RuntimeException("test"); })
.doOnError(ref::set)
.subscribeWith
(mp2))
.then(() -> mp.onNext(2))
.then(() -> assertThat(mp2.isError()).isTrue())
.then(() -> assertThat(ref.get()).hasMessage("test"))
.then(() -> assertThat(mp2.isSuccess()).isFalse())
.then(() -> assertThat(mp2.getError()).hasMessage("test"))
.then(() -> assertThat(mp2.isTerminated()).isTrue())
.verifyErrorMessage("test");
}

}

0 comments on commit 9eede0e

Please sign in to comment.