Skip to content

Commit

Permalink
Test fixes and doc tweak
Browse files Browse the repository at this point in the history
  • Loading branch information
smaldini committed Aug 1, 2020
1 parent 3543e88 commit 34b2cc1
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
32 changes: 20 additions & 12 deletions reactor-core/src/main/java/reactor/core/publisher/Sinks.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ private Sinks() {
}

/**
* A {@link Sink} with the following characteristics:
* A {@link Sink} which exclusively produces one terminal signal: error or complete.
* It has the following characteristics:
* <ul>
* <li>Multicast</li>
* <li>Backpressure : this sink does not need any demand since it can only signal error or completion</li>
Expand All @@ -49,16 +50,15 @@ public static Sink<Void> empty() {
}

/**
* A {@link Sink} builder that works like a conceptual Promise: it can produce a single signal
* A {@link Sink} that works like a conceptual Promise: it can produce a single signal
* with or without a value at any time. A single signal is replayed to late subscribers.
*/
public static <T> Sink<T> singleOrEmpty() {
public static <T> Sink<T> latestOrEmpty() {
return new NextProcessor<>(null);
}

/**
* Help building {@link Sink} that will broadcast multiple signals to one or more {@link Subscriber}.
* This kind of sink is suited to produce {@link Sink#toFlux()}.
* Builds a {@link Sink} which produces multiple signals to one or more {@link Subscriber}.
*
* @return {@link ManySpec}
*/
Expand All @@ -67,7 +67,15 @@ public static ManySpec many() {
}

/**
* Provide unicast: 1 sink, 1 {@link Subscriber}
* A {@link Sink} that works like a conceptual Promise: it can produce a single signal
* with or without a value at any time. A single signal is replayed to late subscribers.
*/
public static <T> Sink<T> singleOrEmpty() {
return new NextProcessor<>(null);
}

/**
* Provides unicast: 1 sink, 1 {@link Subscriber}
*/
public interface UnicastSpec {
/**
Expand Down Expand Up @@ -116,7 +124,7 @@ public interface UnicastSpec {
}

/**
* Provide multicast : 1 sink, N {@link Subscriber}
* Provides multicast : 1 sink, N {@link Subscriber}
*/
public interface MulticastSpec {

Expand Down Expand Up @@ -191,25 +199,25 @@ public interface MulticastSpec {
}

/**
* Provide a sink with which can emit multiple elements
* Provides a sink with which can emit multiple elements
*/
public interface ManySpec {
/**
* Help building {@link Sink} that will broadcast signals to a single {@link Subscriber}
* Builds a {@link Sink} that will broadcast signals to a single {@link Subscriber}
*
* @return {@link UnicastSpec}
*/
UnicastSpec unicast();

/**
* Help building {@link Sink} that will broadcast signals to multiple {@link Subscriber}
* Builds a {@link Sink} that will broadcast signals to multiple {@link Subscriber}
*
* @return {@link MulticastSpec}
*/
MulticastSpec multicast();

/**
* Help building {@link Sink} that will broadcast signals to multiple {@link Subscriber} with the ability to retain
* Builds a {@link Sink} that will broadcast signals to multiple {@link Subscriber} with the ability to retain
* and replay all or an arbitrary number of elements.
*
* @return {@link MulticastReplaySpec}
Expand All @@ -227,7 +235,7 @@ public interface ManySpec {
}

/**
* Provide multicast with history/replay capacity : 1 sink, N {@link Subscriber}
* Provides multicast with history/replay capacity : 1 sink, N {@link Subscriber}
*/
public interface MulticastReplaySpec {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,20 @@ public void VoidProcessorNullFulfill() {
assertThat(mp.peek()).isNull();
}

@Test(expected = Exception.class)
@Test
public void VoidProcessorDoubleError() {
MonoProcessor<Void> mp = new VoidProcessor();

mp.onError(new Exception("test"));
mp.onError(new Exception("test"));
assertThat(mp.emitError(new Exception("test")).hasFailed()).isTrue();
}

@Test(expected = Exception.class)
@Test
public void VoidProcessorDoubleSignal() {
MonoProcessor<Void> mp = new VoidProcessor();

mp.onComplete();
mp.onError(new Exception("test"));
assertThat(mp.emitError(new Exception("test")).hasFailed()).isTrue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void fluxCanFilterTerminalStates() {
Mono<Void> tap = stream.then();

// then: "it is available"
assertThat(tap).isNull();
assertThat(tap.block()).isNull();

// when: "the error signal is observed and flux is retrieved"
stream = Flux.error(new Exception());
Expand Down

0 comments on commit 34b2cc1

Please sign in to comment.