Skip to content

Commit

Permalink
fix #638 Add toProcessor variant that also connects inline
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed May 30, 2017
1 parent e6470e9 commit f7f2dce
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/main/java/reactor/core/publisher/Mono.java
Expand Up @@ -3218,6 +3218,25 @@ public final MonoProcessor<T> toProcessor() {
}
}

public final MonoProcessor<T> toProcessor(boolean connect) {
if (!connect)
return toProcessor();

MonoProcessor<T> result;
if (this instanceof MonoProcessor) {
result = (MonoProcessor<T>)this;
}
else {
result = new MonoProcessor<>(this);
}
result.connect();
return result;
}

public final MonoProcessor<T> toConnectedProcessor() {
return toProcessor(true);
}

/**
* Transform this {@link Mono} in order to generate a target {@link Mono}. Unlike {@link #compose(Function)}, the
* provided function is executed as part of assembly.
Expand Down
84 changes: 84 additions & 0 deletions src/test/java/reactor/core/publisher/MonoProcessorTest.java
Expand Up @@ -16,6 +16,7 @@
package reactor.core.publisher;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -435,4 +436,87 @@ public void scanProcessorError() {
assertThat(test.scan(Scannable.BooleanAttr.TERMINATED)).isTrue();
}

@Test
public void monoToProcessorDoesntConnect() {
MonoProcessor<String> monoProcessor = Mono.just("foo").toProcessor();

assertThat(monoProcessor.connected).isZero();
}

@Test
public void monoToProcessorAndSubscribeDoesConnect() {
MonoProcessor<String> monoProcessor = Mono.just("foo").toProcessor();

assertThat(monoProcessor.subscribe()).isSameAs(monoProcessor);
assertThat(monoProcessor.connected).isEqualTo(1);
}

@Test
public void monoToProcessorReusesInstance() {
MonoProcessor<String> monoProcessor = Mono.just("foo").toProcessor();

assertThat(monoProcessor)
.isSameAs(monoProcessor.toProcessor())
.isSameAs(monoProcessor.subscribe());
}

@Test
public void monoToProcessorConnect() {
MonoProcessor<String> connectedProcessor = Mono.just("foo")
.toProcessor(true);

assertThat(connectedProcessor.connected).isEqualTo(1);
}

@Test
public void monoToProcessorConnectReusesInstance() {
MonoProcessor<String> connectedProcessor = Mono.just("foo")
.toProcessor(true);

assertThat(connectedProcessor)
.isSameAs(connectedProcessor.toProcessor(true))
.isSameAs(connectedProcessor.toProcessor())
.isSameAs(connectedProcessor.subscribe());
}

@Test
public void monoToProcessorChain() {
StepVerifier.withVirtualTime(() -> Mono.just("foo")
.toProcessor(true)
.delayElement(Duration.ofMillis(500)))
.expectSubscription()
.expectNoEvent(Duration.ofMillis(500))
.expectNext("foo")
.verifyComplete();
}

@Test
public void monoToProcessorChain2() {
StepVerifier.withVirtualTime(() -> Mono.just("foo")
.toConnectedProcessor()
.delayElement(Duration.ofMillis(500)))
.expectSubscription()
.expectNoEvent(Duration.ofMillis(500))
.expectNext("foo")
.verifyComplete();
}

@Test
public void monoToProcessorChainColdToHot() {
AtomicInteger subscriptionCount = new AtomicInteger();
Mono<String> coldToHot = Mono.just("foo")
.doOnSubscribe(sub -> subscriptionCount.incrementAndGet())
.cache()
.toConnectedProcessor() //this actually subscribes
.filter(s -> s.length() < 4);

assertThat(subscriptionCount.get()).isEqualTo(1);

coldToHot.block();
coldToHot.block();
coldToHot.block();

assertThat(subscriptionCount.get()).isEqualTo(1);
}

}

0 comments on commit f7f2dce

Please sign in to comment.