From 1bd4968732e3d6fbe5f4f2acb9b32efc3d2b914d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Tue, 6 Mar 2018 10:44:34 +0100 Subject: [PATCH] fix #648 Make StepVerifier.withVirtualTime mutually exclusive This commit adds a lock to the StepVerifier that is only used when virtual time is activated, resulting in the VirtualTimeScheduler activation, publisher verification and VTS reset to be mutually exclusive with other StepVerifier.withVirtualTime verify() invocations. This prevent different StepVerifier from side-effecting each other, as the StepVerifier is installed using a global hook. --- .../test/DefaultStepVerifierBuilder.java | 7 ++++ .../java/reactor/test/StepVerifierTests.java | 37 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 1692e50462..87fcc00165 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -34,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -644,6 +646,9 @@ interface Event { final static class DefaultStepVerifier implements StepVerifier { + /** A global lock that is used to make withVirtualTime calls mutually exclusive */ + private static final Lock vtsLock = new ReentrantLock(true); + private final DefaultStepVerifierBuilder parent; private final int requestedFusionMode; private final int expectedFusionMode; @@ -701,6 +706,7 @@ public Duration verify(Duration duration) { if (parent.sourceSupplier != null) { VirtualTimeScheduler vts = null; if (parent.vtsLookup != null) { + vtsLock.lock(); //wait for other virtualtime verifies to finish vts = parent.vtsLookup.get(); //this works even for the default case where StepVerifier has created // a vts through enable(false), because the CURRENT will already be that vts @@ -730,6 +736,7 @@ public Duration verify(Duration duration) { //explicitly reset the factory, rather than rely on vts shutdown doing so // because it could have been eagerly shut down in a test. VirtualTimeScheduler.reset(); + vtsLock.unlock(); } } } else { diff --git a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java index 54f11858b6..cb913b575c 100644 --- a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java +++ b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java @@ -21,6 +21,9 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1893,4 +1896,38 @@ public void assertNextWithSubscribeOnJust() { .thenCancel() ::verify); } + + @Test + public void parallelVerifyWithVtsMutuallyExclusive() { + ExecutorService executorService = Executors.newFixedThreadPool(2); + for (int i = 0; i < 10; i++) { + Future ex1 = executorService.submit(() -> StepVerifier + .withVirtualTime(() -> Flux.just("A", "B", "C") + .delaySequence(Duration.ofMillis(100)) + ) + .then(() -> { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + }) + .thenAwait(Duration.ofMillis(100)) + .expectNextCount(3) + .verifyComplete()); + + Future ex2 = executorService.submit(() -> StepVerifier + .withVirtualTime(() -> Flux.just(1, 2, 3) + .delaySequence(Duration.ofMillis(100)) + ) + .thenAwait(Duration.ofMillis(100)) + .expectNextCount(3) + .expectComplete() + .verify()); + + assertThatCode(ex1::get).as("execution 1 in iteration #" + i).doesNotThrowAnyException(); + assertThatCode(ex2::get).as("execution 2 in iteration #" + i).doesNotThrowAnyException(); + } + } } \ No newline at end of file