Skip to content

Commit

Permalink
polish test for #342
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Jan 9, 2017
1 parent 74abd03 commit ee73625
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/test/java/reactor/core/publisher/FluxSubscribeOnValueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
*/
package reactor.core.publisher;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.junit.Test;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import static org.junit.Assert.assertTrue;

public class FluxSubscribeOnValueTest {

ConcurrentMap<Integer, Integer> execs = new ConcurrentHashMap<>();

@Test
public void testSubscribeOnValueFusion() {

Expand All @@ -30,13 +38,25 @@ public void testSubscribeOnValueFusion() {
.subscribeOn(Schedulers.parallel())
.log()
.map(this::slow)))
.expectFusion(Fuseable.ASYNC, Fuseable.NONE)
.expectNextCount(100)
.verifyComplete();

int minExec = 2;

for (Integer counted : execs.values()) {
assertTrue("Thread used less than " + minExec + " " + "times",
counted >= minExec);
}

}

int slow(int slow){
try {
execs.computeIfAbsent(Thread.currentThread()
.hashCode(), i -> 0);
execs.compute(Thread.currentThread()
.hashCode(), (k, v) -> v + 1);
Thread.sleep(10);
return slow;
}
Expand Down

0 comments on commit ee73625

Please sign in to comment.