Skip to content

Commit

Permalink
port #648 Make StepVerifier.withVirtualTime mutually exclusive
Browse files Browse the repository at this point in the history
This commit adds a lock to the StepVerifier that is only used when
virtual time is activated, resulting in the VirtualTimeScheduler
activation, publisher verification and VTS reset to be mutually
exclusive with other StepVerifier.withVirtualTime verify() invocations.

This prevent different StepVerifier from side-effecting each other, as
the StepVerifier is installed using a global hook.

This is a backport of #1104 (commit 1bd4968), as tracked in #1111
  • Loading branch information
simonbasle committed Mar 7, 2018
1 parent e8f7867 commit 65e7d62
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
Expand Down Expand Up @@ -644,6 +646,9 @@ interface Event<T> {

final static class DefaultStepVerifier<T> implements StepVerifier {

/** A global lock that is used to make withVirtualTime calls mutually exclusive */
private static final Lock vtsLock = new ReentrantLock(true);

private final DefaultStepVerifierBuilder<T> parent;
private final int requestedFusionMode;
private final int expectedFusionMode;
Expand Down Expand Up @@ -701,6 +706,7 @@ public Duration verify(Duration duration) {
if (parent.sourceSupplier != null) {
VirtualTimeScheduler vts = null;
if (parent.vtsLookup != null) {
vtsLock.lock(); //wait for other virtualtime verifies to finish
vts = parent.vtsLookup.get();
//this works even for the default case where StepVerifier has created
// a vts through enable(false), because the CURRENT will already be that vts
Expand Down Expand Up @@ -730,6 +736,7 @@ public Duration verify(Duration duration) {
//explicitly reset the factory, rather than rely on vts shutdown doing so
// because it could have been eagerly shut down in a test.
VirtualTimeScheduler.reset();
vtsLock.unlock();
}
}
} else {
Expand Down
37 changes: 37 additions & 0 deletions reactor-test/src/test/java/reactor/test/StepVerifierTests.java
Expand Up @@ -21,6 +21,9 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1893,4 +1896,38 @@ public void assertNextWithSubscribeOnJust() {
.thenCancel()
::verify);
}

@Test
public void parallelVerifyWithVtsMutuallyExclusive() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i++) {
Future<Duration> ex1 = executorService.submit(() -> StepVerifier
.withVirtualTime(() -> Flux.just("A", "B", "C")
.delaySequence(Duration.ofMillis(100))
)
.then(() -> {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
e.printStackTrace();
}
})
.thenAwait(Duration.ofMillis(100))
.expectNextCount(3)
.verifyComplete());

Future<Duration> ex2 = executorService.submit(() -> StepVerifier
.withVirtualTime(() -> Flux.just(1, 2, 3)
.delaySequence(Duration.ofMillis(100))
)
.thenAwait(Duration.ofMillis(100))
.expectNextCount(3)
.expectComplete()
.verify());

assertThatCode(ex1::get).as("execution 1 in iteration #" + i).doesNotThrowAnyException();
assertThatCode(ex2::get).as("execution 2 in iteration #" + i).doesNotThrowAnyException();
}
}
}

0 comments on commit 65e7d62

Please sign in to comment.