From 5ae36a0a336666ee07f9c726a900a5eabcdd3bc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Mon, 14 Jan 2019 19:21:09 +0100 Subject: [PATCH] fix #1486 Prevent usingWhen terminal & cancel callbacks to both apply (#1487) This commit prevents asyncComplete/asyncError terminal callbacks to apply in addition to asyncCancel, in case one signal is still received after the other (eg. a cancellation is performed asynchronously, resulting in the signal arriving AFTER the onError/onComplete signals). --- .../reactor/core/publisher/FluxUsingWhen.java | 78 ++++---- .../core/publisher/FluxUsingWhenTest.java | 179 +++++++++++++++++- 2 files changed, 222 insertions(+), 35 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxUsingWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxUsingWhen.java index 45d8a017f4..41f04ee1c2 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxUsingWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxUsingWhen.java @@ -18,6 +18,7 @@ import java.util.Objects; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -284,6 +285,9 @@ static class UsingWhenSubscriber implements UsingWhenParent { @Nullable final DeferredSubscription arbiter; + volatile int callbackApplied; + static final AtomicIntegerFieldUpdater CALLBACK_APPLIED = AtomicIntegerFieldUpdater.newUpdater(UsingWhenSubscriber.class, "callbackApplied"); + /** * Also stores the onComplete terminal state as {@link Exceptions#TERMINATED} */ @@ -327,20 +331,22 @@ public void request(long l) { @Override public void cancel() { - if (Operators.terminate(SUBSCRIPTION, this)) { - try { - if (asyncCancel != null) { - Flux.from(asyncCancel.apply(resource)) - .subscribe(new CancelInner(this)); + if (CALLBACK_APPLIED.compareAndSet(this, 0, 1)) { + if (Operators.terminate(SUBSCRIPTION, this)) { + try { + if (asyncCancel != null) { + Flux.from(asyncCancel.apply(resource)) + .subscribe(new CancelInner(this)); + } + else { + Flux.from(asyncComplete.apply(resource)) + .subscribe(new CancelInner(this)); + } } - else { - Flux.from(asyncComplete.apply(resource)) - .subscribe(new CancelInner(this)); + catch (Throwable error) { + Loggers.getLogger(FluxUsingWhen.class).warn("Error generating async resource cleanup during onCancel", error); } } - catch (Throwable error) { - Loggers.getLogger(FluxUsingWhen.class).warn("Error generating async resource cleanup during onCancel", error); - } } } @@ -351,37 +357,41 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - Publisher p; + if (CALLBACK_APPLIED.compareAndSet(this, 0, 1)) { + Publisher p; - try { - p = Objects.requireNonNull(asyncError.apply(resource), - "The asyncError returned a null Publisher"); - } - catch (Throwable e) { - Throwable _e = Operators.onOperatorError(e, actual.currentContext()); - _e = Exceptions.addSuppressed(_e, t); - actual.onError(_e); - return; - } + try { + p = Objects.requireNonNull(asyncError.apply(resource), + "The asyncError returned a null Publisher"); + } + catch (Throwable e) { + Throwable _e = Operators.onOperatorError(e, actual.currentContext()); + _e = Exceptions.addSuppressed(_e, t); + actual.onError(_e); + return; + } - p.subscribe(new RollbackInner(this, t)); + p.subscribe(new RollbackInner(this, t)); + } } @Override public void onComplete() { - Publisher p; + if (CALLBACK_APPLIED.compareAndSet(this, 0, 1)) { + Publisher p; - try { - p = Objects.requireNonNull(asyncComplete.apply(resource), - "The asyncComplete returned a null Publisher"); - } - catch (Throwable e) { - Throwable _e = Operators.onOperatorError(e, actual.currentContext()); - actual.onError(_e); - return; - } + try { + p = Objects.requireNonNull(asyncComplete.apply(resource), + "The asyncComplete returned a null Publisher"); + } + catch (Throwable e) { + Throwable _e = Operators.onOperatorError(e, actual.currentContext()); + actual.onError(_e); + return; + } - p.subscribe(new CommitInner(this)); + p.subscribe(new CommitInner(this)); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java index 9ed1c5065d..99d67eb94c 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; import java.util.logging.Level; import junitparams.JUnitParamsRunner; @@ -28,7 +29,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; + import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.Fuseable; @@ -852,6 +855,180 @@ public void contextPropagationOnCancelWithNoHandler(Flux source) { assertThat(errorRef).hasValue(null); } + // == tests checking callbacks don't pile up == + + @Test + public void noCancelCallbackAfterComplete() { + LongAdder cleanupCount = new LongAdder(); + Flux flux = Flux.usingWhen(Mono.defer(() -> Mono.just("foo")), Mono::just, + s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion + s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error + s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel + ); + + flux.subscribe(new CoreSubscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + subscription = s; + } + + @Override + public void onNext(Object o) {} + + @Override + public void onError(Throwable t) {} + + @Override + public void onComplete() { + subscription.cancel(); + } + }); + + assertThat(cleanupCount.sum()).isEqualTo(10); + } + + @Test + public void noCancelCallbackAfterError() { + LongAdder cleanupCount = new LongAdder(); + Flux flux = Flux.usingWhen(Mono.just("foo"), v -> Mono.error(new IllegalStateException("boom")), + s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion + s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error + s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel + ); + + flux.subscribe(new CoreSubscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + subscription = s; + } + + @Override + public void onNext(Object o) {} + + @Override + public void onError(Throwable t) { + subscription.cancel(); + } + + @Override + public void onComplete() {} + }); + + assertThat(cleanupCount.sum()).isEqualTo(100); + } + + @Test + public void noCompleteCallbackAfterCancel() throws InterruptedException { + AtomicBoolean cancelled = new AtomicBoolean(); + LongAdder cleanupCount = new LongAdder(); + + Publisher badPublisher = s -> s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + new Thread(() -> { + s.onNext("foo1"); + try { Thread.sleep(100); } catch (InterruptedException e) {} + s.onComplete(); + }).start(); + } + + @Override + public void cancel() { + cancelled.set(true); + } + }); + + Flux flux = Flux.usingWhen(Mono.just("foo"), v -> badPublisher, + s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion + s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error + s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel + ); + + flux.subscribe(new CoreSubscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + subscription = s; + } + + @Override + public void onNext(String o) { + subscription.cancel(); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onComplete() {} + }); + + Thread.sleep(300); + assertThat(cleanupCount.sum()).isEqualTo(1000); + assertThat(cancelled).as("source cancelled").isTrue(); + } + + @Test + public void noErrorCallbackAfterCancel() throws InterruptedException { + AtomicBoolean cancelled = new AtomicBoolean(); + LongAdder cleanupCount = new LongAdder(); + + Publisher badPublisher = s -> s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + new Thread(() -> { + s.onNext("foo1"); + try { Thread.sleep(100); } catch (InterruptedException e) {} + s.onError(new IllegalStateException("boom")); + }).start(); + } + + @Override + public void cancel() { + cancelled.set(true); + } + }); + + Flux flux = Flux.usingWhen(Mono.just("foo"), v -> badPublisher, + s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion + s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error + s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel + ); + + flux.subscribe(new CoreSubscriber() { + Subscription subscription; + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + subscription = s; + } + + @Override + public void onNext(String o) { + subscription.cancel(); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onComplete() {} + }); + + Thread.sleep(300); + assertThat(cleanupCount.sum()).isEqualTo(1000); + assertThat(cancelled).as("source cancelled").isTrue(); + } + // == scanUnsafe tests == @@ -1114,4 +1291,4 @@ private Object[] sourcesContextError() { } -} \ No newline at end of file +}