diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 1692e50462..87fcc00165 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -34,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -644,6 +646,9 @@ interface Event { final static class DefaultStepVerifier implements StepVerifier { + /** A global lock that is used to make withVirtualTime calls mutually exclusive */ + private static final Lock vtsLock = new ReentrantLock(true); + private final DefaultStepVerifierBuilder parent; private final int requestedFusionMode; private final int expectedFusionMode; @@ -701,6 +706,7 @@ public Duration verify(Duration duration) { if (parent.sourceSupplier != null) { VirtualTimeScheduler vts = null; if (parent.vtsLookup != null) { + vtsLock.lock(); //wait for other virtualtime verifies to finish vts = parent.vtsLookup.get(); //this works even for the default case where StepVerifier has created // a vts through enable(false), because the CURRENT will already be that vts @@ -730,6 +736,7 @@ public Duration verify(Duration duration) { //explicitly reset the factory, rather than rely on vts shutdown doing so // because it could have been eagerly shut down in a test. VirtualTimeScheduler.reset(); + vtsLock.unlock(); } } } else { diff --git a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java index 54f11858b6..cb913b575c 100644 --- a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java +++ b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java @@ -21,6 +21,9 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1893,4 +1896,38 @@ public void assertNextWithSubscribeOnJust() { .thenCancel() ::verify); } + + @Test + public void parallelVerifyWithVtsMutuallyExclusive() { + ExecutorService executorService = Executors.newFixedThreadPool(2); + for (int i = 0; i < 10; i++) { + Future ex1 = executorService.submit(() -> StepVerifier + .withVirtualTime(() -> Flux.just("A", "B", "C") + .delaySequence(Duration.ofMillis(100)) + ) + .then(() -> { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + }) + .thenAwait(Duration.ofMillis(100)) + .expectNextCount(3) + .verifyComplete()); + + Future ex2 = executorService.submit(() -> StepVerifier + .withVirtualTime(() -> Flux.just(1, 2, 3) + .delaySequence(Duration.ofMillis(100)) + ) + .thenAwait(Duration.ofMillis(100)) + .expectNextCount(3) + .expectComplete() + .verify()); + + assertThatCode(ex1::get).as("execution 1 in iteration #" + i).doesNotThrowAnyException(); + assertThatCode(ex2::get).as("execution 2 in iteration #" + i).doesNotThrowAnyException(); + } + } } \ No newline at end of file