From 07a957ee006a376acf0b4e3206d3d6b1fe8fc17e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Thu, 20 Jun 2024 16:10:43 +0200 Subject: [PATCH] Added missing test cases, reformatted, refactored --- .../core/publisher/FluxReplayStressTest.java | 371 ++++++++++++------ 1 file changed, 253 insertions(+), 118 deletions(-) diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxReplayStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxReplayStressTest.java index e7890e75a3..6506511a4f 100644 --- a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxReplayStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxReplayStressTest.java @@ -43,6 +43,10 @@ public abstract static class RefCntConcurrentSubscriptionBaseStressTest { final StressSubscriber subscriber1 = new StressSubscriber<>(); final StressSubscriber subscriber2 = new StressSubscriber<>(); + public RefCntConcurrentSubscriptionBaseStressTest(Flux sourceToShare) { + this(sourceToShare, null); + } + public RefCntConcurrentSubscriptionBaseStressTest(Flux sourceToShare, @Nullable Duration duration) { this(sourceToShare, 2, duration); @@ -64,25 +68,25 @@ public RefCntConcurrentSubscriptionBaseStressTest(Flux sourceToShare, @JCStressTest @Outcome(id = {"[0,1], 1, [0,1], 1, 0, 0, [1,2]"}, expect = ACCEPTABLE, desc = - "Normal scenario when cancellation of first subscriber has not effect on " + - "the second. Second subscription may still appear since last " + - "subscriber can join the first subscription but due to natural " + - "concurrency when synchronize block is entered, the connection is " + - "nulled so this will cause a connect() to be called. Current " + - "ConnectableFlux api does not allow any improvements on that since " + - "it lacks coordination.") - @Outcome(id = {"[0,1], 1, [0,1], 0, 0, 1, [1,2]"}, expect = ACCEPTABLE_INTERESTING,desc = - "Expected corner case when second subscriber still join the first " + - "subscription but due to natural concurrency cancellation happens " + - "before onComplete is called so the second subscriber gets value " + - "and error instead of onComplete. second connect call may still " + - "happen since ConnectableFlux.subscribe happens before the check of" + - " current connection value in FluxRefCnt, thus different ") - @Outcome(id = {"0, 0, 0, 0, 0, 1, 2"}, expect = ACCEPTABLE_INTERESTING,desc = - "Expected corner case when second subscriber still join the first " + - "subscription but due to natural concurrency cancellation of the " + - "first subscriber happens before value is delivered so in that case" + - " error is delivered prior any values") + "Normal scenario when cancellation of the first subscriber has no effect on" + + " the second. The second subscription may still take place since" + + " the last subscriber can join the first subscription. However, due" + + " to natural concurrency, when the synchronization block is entered," + + " the connection is nulled. This will cause connect() to be called." + + " Current ConnectableFlux api does not allow any improvements on that" + + " front since it lacks coordination.") + @Outcome(id = {"[0,1], 1, [0,1], 0, 0, 1, [1,2]"}, expect = ACCEPTABLE_INTERESTING, desc = + "Expected corner case when the second subscriber still joins the first" + + " subscription, but due to natural concurrency, cancellation" + + " happens before onComplete is called. So the second subscriber gets the value" + + " and onError instead of onComplete. The second connect call may still" + + " happen, since ConnectableFlux.subscribe happens before the check of" + + " the current connection value in FluxRefCnt") + @Outcome(id = {"0, 0, 0, 0, 0, 1, 2"}, expect = ACCEPTABLE_INTERESTING, desc = + "Expected corner case when the second subscriber still joins the first" + + " subscription, but due to natural concurrency, cancellation of the" + + " first subscriber happens before the value is delivered. In that case" + + " onError is delivered instead of any values") @State public static class RefCntRaceSubscribeAndCancelNoTimeoutStressTest { @@ -127,8 +131,7 @@ public void arbiter(IIIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "100000, 100000"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"100000, 100000"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntRaceManySubscribersSubscribeAndCancelNoTimeoutStressTest extends RefCntConcurrentSubscriptionBaseStressTest { @@ -141,57 +144,50 @@ public RefCntRaceManySubscribersSubscribeAndCancelNoTimeoutStressTest() { } /* - Thread 1 Thread 2 - subscribe(s11) subscribe(s21) - | | - source.subscribe source.subscribe - new FR connection 1 is created we are added to existing FR connection 1 - FRCG counter + 1 = 1 | - connect() | - value is delivered | - MonoNext calls cancel | - enter sync monitor | - FRCG count - 1 = 0 | - FRCG.connection = null | - exit sync monitor | - | the subscriber21 is added to the list of FR - disposable.dispose() | - | enters sync monitor - | FRCG connection is null - | create new FRCG connection - | FRCG counter + 1 = 1 - | connect() - | new FR connection 2 is created - | subscription is started + Thread 1 Thread 2 + subscribe(s11) subscribe(s21) + | | + source.subscribe source.subscribe + new FR connection 1 is created we are added to existing FR connection 1 + FRCG counter + 1 = 1 | + connect() | + value is delivered | + MonoNext calls cancel | + enter sync monitor | + FRCG count - 1 = 0 | + FRCG.connection = null | + exit sync monitor | + | the subscriber21 is added to the list of FR + disposable.dispose() | + | enters sync monitor + | FRCG connection is null + | create new FRCG connection + | FRCG counter + 1 = 1 + | connect() + | new FR connection 2 is created + | subscription is started we see subscriber21 in the list | we deliver value to subscriber21 | | MonoNext calls cancel | - enter sync monitor | - | | - FRCG count - 1 = 0 | - FRCG.connection2 = null | - connection2.dispose() | - cancel subscription | - | | - | we see subscription is canceled - | so we return without any value - | being delivered - | subscribe(s22) + enter sync monitor | + | | + FRCG count - 1 = 0 | + FRCG.connection2 = null | + connection2.dispose() | + cancel subscription | + | we see subscription is canceled + | so we return without any value + | being delivered + | subscribe(s22) | | - | FRCG.source.subscribe(s22) - | subscriber s22 is added to FR.connection2 - | - | - CONNECTION.lazySet(null) - terminate() t - we see subscriber22 in the list - we send error to subscriber22 - - - - - */ + | FRCG.source.subscribe(s22) + | subscriber s22 is added to FR.connection2 + CONNECTION.lazySet(null) + terminate() + we see subscriber22 in the list + we send error to subscriber22 + */ @Actor public void subscribe1() { @@ -251,14 +247,13 @@ public void arbiter(JJ_Result r) { } @JCStressTest - @Outcome(id = { - "10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionRangeSyncFusionStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntConcurrentSubscriptionRangeSyncFusionStressTest() { - super(Flux.range(0, 10), null); + super(Flux.range(0, 10)); } @Actor @@ -283,15 +278,13 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionRangeAsyncFusionStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntConcurrentSubscriptionRangeAsyncFusionStressTest() { - super(Flux.range(0, 10) - .publishOn(Schedulers.immediate()), null); + super(Flux.range(0, 10).publishOn(Schedulers.immediate())); } @Actor @@ -316,15 +309,13 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionRangeNoneFusionStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntConcurrentSubscriptionRangeNoneFusionStressTest() { - super(Flux.range(0, 10) - .hide(), null); + super(Flux.range(0, 10).hide()); } @Actor @@ -349,14 +340,13 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionEmptySyncStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntConcurrentSubscriptionEmptySyncStressTest() { - super(Flux.empty(), null); + super(Flux.empty()); } @Actor @@ -381,15 +371,13 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionEmptyAsyncStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntConcurrentSubscriptionEmptyAsyncStressTest() { - super(Flux.empty() - .publishOn(Schedulers.immediate()), null); + super(Flux.empty().publishOn(Schedulers.immediate())); } @Actor @@ -414,15 +402,13 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionEmptyNoneStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntConcurrentSubscriptionEmptyNoneStressTest() { - super(Flux.empty() - .hide(), null); + super(Flux.empty().hide()); } @Actor @@ -447,8 +433,7 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionErrorSyncStressTest extends RefCntConcurrentSubscriptionBaseStressTest { @@ -456,7 +441,7 @@ public static class RefCntConcurrentSubscriptionErrorSyncStressTest static final Throwable testError = new RuntimeException("boom"); public RefCntConcurrentSubscriptionErrorSyncStressTest() { - super(Flux.error(testError), null); + super(Flux.error(testError)); } @Actor @@ -481,8 +466,7 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionErrorAsyncStressTest extends RefCntConcurrentSubscriptionBaseStressTest { @@ -490,8 +474,7 @@ public static class RefCntConcurrentSubscriptionErrorAsyncStressTest static final Throwable testError = new RuntimeException("boom"); public RefCntConcurrentSubscriptionErrorAsyncStressTest() { - super(Flux.error(testError) - .publishOn(Schedulers.immediate()), null); + super(Flux.error(testError).publishOn(Schedulers.immediate())); } @Actor @@ -516,8 +499,7 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntConcurrentSubscriptionErrorNoneStressTest extends RefCntConcurrentSubscriptionBaseStressTest { @@ -525,8 +507,7 @@ public static class RefCntConcurrentSubscriptionErrorNoneStressTest static final Throwable testError = new RuntimeException("boom"); public RefCntConcurrentSubscriptionErrorNoneStressTest() { - super(Flux.error(testError) - .hide(), null); + super(Flux.error(testError).hide()); } @Actor @@ -551,15 +532,13 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntGraceConcurrentSubscriptionRangeAsyncFusionStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntGraceConcurrentSubscriptionRangeAsyncFusionStressTest() { - super(Flux.range(0, 10) - .publishOn(Schedulers.immediate()), Duration.ofSeconds(1)); + super(Flux.range(0, 10).publishOn(Schedulers.immediate()), Duration.ofSeconds(1)); } @Actor @@ -584,15 +563,44 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @State + public static class RefCntGraceConcurrentSubscriptionRangeSyncFusionStressTest + extends RefCntConcurrentSubscriptionBaseStressTest { + + public RefCntGraceConcurrentSubscriptionRangeSyncFusionStressTest() { + super(Flux.range(0, 10), Duration.ofSeconds(1)); + } + + @Actor + public void subscribe1() { + sharedSource.subscribe(subscriber1); + } + + @Actor + public void subscribe2() { + sharedSource.subscribe(subscriber2); + } + + @Arbiter + public void arbiter(IIIIII_Result r) { + r.r1 = subscriber1.onNextCalls.get(); + r.r2 = subscriber2.onNextCalls.get(); + r.r3 = subscriber1.onCompleteCalls.get(); + r.r4 = subscriber2.onCompleteCalls.get(); + r.r5 = subscriber1.onErrorCalls.get(); + r.r6 = subscriber2.onErrorCalls.get(); + } + } + + @JCStressTest + @Outcome(id = {"10, 10, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State public static class RefCntGraceConcurrentSubscriptionRangeNoneFusionStressTest extends RefCntConcurrentSubscriptionBaseStressTest { public RefCntGraceConcurrentSubscriptionRangeNoneFusionStressTest() { - super(Flux.range(0, 10) - .hide(), Duration.ofSeconds(1)); + super(Flux.range(0, 10).hide(), Duration.ofSeconds(1)); } @Actor @@ -620,12 +628,11 @@ public void arbiter(IIIIII_Result r) { @Outcome(id = { "0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State - public static class RefCntGraceConcurrentSubscriptionEmptyAsyncStressTest + public static class RefCntGraceConcurrentSubscriptionEmptyAsyncFusionStressTest extends RefCntConcurrentSubscriptionBaseStressTest { - public RefCntGraceConcurrentSubscriptionEmptyAsyncStressTest() { - super(Flux.empty() - .publishOn(Schedulers.immediate()), Duration.ofSeconds(1)); + public RefCntGraceConcurrentSubscriptionEmptyAsyncFusionStressTest() { + super(Flux.empty().publishOn(Schedulers.immediate()), Duration.ofSeconds(1)); } @Actor @@ -650,15 +657,143 @@ public void arbiter(IIIIII_Result r) { } @JCStressTest - @Outcome(id = { - "0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @Outcome(id = {"0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @State + public static class RefCntGraceConcurrentSubscriptionEmptyNoneFusionStressTest + extends RefCntConcurrentSubscriptionBaseStressTest { + + public RefCntGraceConcurrentSubscriptionEmptyNoneFusionStressTest() { + super(Flux.empty().hide(), Duration.ofSeconds(1)); + } + + @Actor + public void subscribe1() { + sharedSource.subscribe(subscriber1); + } + + @Actor + public void subscribe2() { + sharedSource.subscribe(subscriber2); + } + + @Arbiter + public void arbiter(IIIIII_Result r) { + r.r1 = subscriber1.onNextCalls.get(); + r.r2 = subscriber2.onNextCalls.get(); + r.r3 = subscriber1.onCompleteCalls.get(); + r.r4 = subscriber2.onCompleteCalls.get(); + r.r5 = subscriber1.onErrorCalls.get(); + r.r6 = subscriber2.onErrorCalls.get(); + } + } + + @JCStressTest + @Outcome(id = {"0, 0, 1, 1, 0, 0"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") @State - public static class RefCntGraceConcurrentSubscriptionEmptyNoneStressTest + public static class RefCntGraceConcurrentSubscriptionEmptySyncFusionStressTest extends RefCntConcurrentSubscriptionBaseStressTest { - public RefCntGraceConcurrentSubscriptionEmptyNoneStressTest() { - super(Flux.empty() - .hide(), Duration.ofSeconds(1)); + public RefCntGraceConcurrentSubscriptionEmptySyncFusionStressTest() { + super(Flux.empty(), Duration.ofSeconds(1)); + } + + @Actor + public void subscribe1() { + sharedSource.subscribe(subscriber1); + } + + @Actor + public void subscribe2() { + sharedSource.subscribe(subscriber2); + } + + @Arbiter + public void arbiter(IIIIII_Result r) { + r.r1 = subscriber1.onNextCalls.get(); + r.r2 = subscriber2.onNextCalls.get(); + r.r3 = subscriber1.onCompleteCalls.get(); + r.r4 = subscriber2.onCompleteCalls.get(); + r.r5 = subscriber1.onErrorCalls.get(); + r.r6 = subscriber2.onErrorCalls.get(); + } + } + + @JCStressTest + @Outcome(id = {"0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @State + public static class RefCntGraceConcurrentSubscriptionErrorAsyncFusionStressTest + extends RefCntConcurrentSubscriptionBaseStressTest { + + static final Throwable testError = new RuntimeException("boom"); + + public RefCntGraceConcurrentSubscriptionErrorAsyncFusionStressTest() { + super(Flux.error(testError).publishOn(Schedulers.immediate()), Duration.ofSeconds(1)); + } + + @Actor + public void subscribe1() { + sharedSource.subscribe(subscriber1); + } + + @Actor + public void subscribe2() { + sharedSource.subscribe(subscriber2); + } + + @Arbiter + public void arbiter(IIIIII_Result r) { + r.r1 = subscriber1.onNextCalls.get(); + r.r2 = subscriber2.onNextCalls.get(); + r.r3 = subscriber1.onCompleteCalls.get(); + r.r4 = subscriber2.onCompleteCalls.get(); + r.r5 = subscriber1.onErrorCalls.get(); + r.r6 = subscriber2.onErrorCalls.get(); + } + } + + @JCStressTest + @Outcome(id = {"0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @State + public static class RefCntGraceConcurrentSubscriptionErrorSyncFusionStressTest + extends RefCntConcurrentSubscriptionBaseStressTest { + + static final Throwable testError = new RuntimeException("boom"); + + public RefCntGraceConcurrentSubscriptionErrorSyncFusionStressTest() { + super(Flux.error(testError), Duration.ofSeconds(1)); + } + + @Actor + public void subscribe1() { + sharedSource.subscribe(subscriber1); + } + + @Actor + public void subscribe2() { + sharedSource.subscribe(subscriber2); + } + + @Arbiter + public void arbiter(IIIIII_Result r) { + r.r1 = subscriber1.onNextCalls.get(); + r.r2 = subscriber2.onNextCalls.get(); + r.r3 = subscriber1.onCompleteCalls.get(); + r.r4 = subscriber2.onCompleteCalls.get(); + r.r5 = subscriber1.onErrorCalls.get(); + r.r6 = subscriber2.onErrorCalls.get(); + } + } + + @JCStressTest + @Outcome(id = {"0, 0, 0, 0, 1, 1"}, expect = ACCEPTABLE, desc = "concurrent subscription succeeded") + @State + public static class RefCntGraceConcurrentSubscriptionErrorNoneFusionStressTest + extends RefCntConcurrentSubscriptionBaseStressTest { + + static final Throwable testError = new RuntimeException("boom"); + + public RefCntGraceConcurrentSubscriptionErrorNoneFusionStressTest() { + super(Flux.error(testError).hide(), Duration.ofSeconds(1)); } @Actor