Skip to content

Commit

Permalink
fix #1486 Prevent usingWhen terminal & cancel callbacks to both apply (
Browse files Browse the repository at this point in the history
…#1487)

This commit prevents asyncComplete/asyncError terminal callbacks to
apply in addition to asyncCancel, in case one signal is still received
after the other (eg. a cancellation is performed asynchronously,
resulting in the signal arriving AFTER the onError/onComplete signals).
  • Loading branch information
simonbasle authored and bsideup committed Jan 28, 2019
1 parent b973466 commit 5ae36a0
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

Expand Down Expand Up @@ -284,6 +285,9 @@ static class UsingWhenSubscriber<T, S> implements UsingWhenParent<T> {
@Nullable
final DeferredSubscription arbiter;

volatile int callbackApplied;
static final AtomicIntegerFieldUpdater<UsingWhenSubscriber> CALLBACK_APPLIED = AtomicIntegerFieldUpdater.newUpdater(UsingWhenSubscriber.class, "callbackApplied");

/**
* Also stores the onComplete terminal state as {@link Exceptions#TERMINATED}
*/
Expand Down Expand Up @@ -327,20 +331,22 @@ public void request(long l) {

@Override
public void cancel() {
if (Operators.terminate(SUBSCRIPTION, this)) {
try {
if (asyncCancel != null) {
Flux.from(asyncCancel.apply(resource))
.subscribe(new CancelInner(this));
if (CALLBACK_APPLIED.compareAndSet(this, 0, 1)) {
if (Operators.terminate(SUBSCRIPTION, this)) {
try {
if (asyncCancel != null) {
Flux.from(asyncCancel.apply(resource))
.subscribe(new CancelInner(this));
}
else {
Flux.from(asyncComplete.apply(resource))
.subscribe(new CancelInner(this));
}
}
else {
Flux.from(asyncComplete.apply(resource))
.subscribe(new CancelInner(this));
catch (Throwable error) {
Loggers.getLogger(FluxUsingWhen.class).warn("Error generating async resource cleanup during onCancel", error);
}
}
catch (Throwable error) {
Loggers.getLogger(FluxUsingWhen.class).warn("Error generating async resource cleanup during onCancel", error);
}
}
}

Expand All @@ -351,37 +357,41 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
Publisher<?> p;
if (CALLBACK_APPLIED.compareAndSet(this, 0, 1)) {
Publisher<?> p;

try {
p = Objects.requireNonNull(asyncError.apply(resource),
"The asyncError returned a null Publisher");
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
_e = Exceptions.addSuppressed(_e, t);
actual.onError(_e);
return;
}
try {
p = Objects.requireNonNull(asyncError.apply(resource),
"The asyncError returned a null Publisher");
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
_e = Exceptions.addSuppressed(_e, t);
actual.onError(_e);
return;
}

p.subscribe(new RollbackInner(this, t));
p.subscribe(new RollbackInner(this, t));
}
}

@Override
public void onComplete() {
Publisher<?> p;
if (CALLBACK_APPLIED.compareAndSet(this, 0, 1)) {
Publisher<?> p;

try {
p = Objects.requireNonNull(asyncComplete.apply(resource),
"The asyncComplete returned a null Publisher");
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
actual.onError(_e);
return;
}
try {
p = Objects.requireNonNull(asyncComplete.apply(resource),
"The asyncComplete returned a null Publisher");
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
actual.onError(_e);
return;
}

p.subscribe(new CommitInner(this));
p.subscribe(new CommitInner(this));
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;

import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Fuseable;
Expand Down Expand Up @@ -852,6 +855,180 @@ public void contextPropagationOnCancelWithNoHandler(Flux<String> source) {
assertThat(errorRef).hasValue(null);
}

// == tests checking callbacks don't pile up ==

@Test
public void noCancelCallbackAfterComplete() {
LongAdder cleanupCount = new LongAdder();
Flux<String> flux = Flux.usingWhen(Mono.defer(() -> Mono.just("foo")), Mono::just,
s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion
s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error
s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel
);

flux.subscribe(new CoreSubscriber<Object>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
s.request(1);
subscription = s;
}

@Override
public void onNext(Object o) {}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {
subscription.cancel();
}
});

assertThat(cleanupCount.sum()).isEqualTo(10);
}

@Test
public void noCancelCallbackAfterError() {
LongAdder cleanupCount = new LongAdder();
Flux<String> flux = Flux.usingWhen(Mono.just("foo"), v -> Mono.error(new IllegalStateException("boom")),
s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion
s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error
s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel
);

flux.subscribe(new CoreSubscriber<Object>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
s.request(1);
subscription = s;
}

@Override
public void onNext(Object o) {}

@Override
public void onError(Throwable t) {
subscription.cancel();
}

@Override
public void onComplete() {}
});

assertThat(cleanupCount.sum()).isEqualTo(100);
}

@Test
public void noCompleteCallbackAfterCancel() throws InterruptedException {
AtomicBoolean cancelled = new AtomicBoolean();
LongAdder cleanupCount = new LongAdder();

Publisher<String> badPublisher = s -> s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
new Thread(() -> {
s.onNext("foo1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
s.onComplete();
}).start();
}

@Override
public void cancel() {
cancelled.set(true);
}
});

Flux<String> flux = Flux.usingWhen(Mono.just("foo"), v -> badPublisher,
s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion
s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error
s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel
);

flux.subscribe(new CoreSubscriber<String>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
s.request(1);
subscription = s;
}

@Override
public void onNext(String o) {
subscription.cancel();
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

Thread.sleep(300);
assertThat(cleanupCount.sum()).isEqualTo(1000);
assertThat(cancelled).as("source cancelled").isTrue();
}

@Test
public void noErrorCallbackAfterCancel() throws InterruptedException {
AtomicBoolean cancelled = new AtomicBoolean();
LongAdder cleanupCount = new LongAdder();

Publisher<String> badPublisher = s -> s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
new Thread(() -> {
s.onNext("foo1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
s.onError(new IllegalStateException("boom"));
}).start();
}

@Override
public void cancel() {
cancelled.set(true);
}
});

Flux<String> flux = Flux.usingWhen(Mono.just("foo"), v -> badPublisher,
s -> Mono.fromRunnable(() -> cleanupCount.add(10)), //10 for completion
s -> Mono.fromRunnable(() -> cleanupCount.add(100)), //100 for error
s -> Mono.fromRunnable(() -> cleanupCount.add(1000)) //1000 for cancel
);

flux.subscribe(new CoreSubscriber<String>() {
Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
s.request(1);
subscription = s;
}

@Override
public void onNext(String o) {
subscription.cancel();
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
});

Thread.sleep(300);
assertThat(cleanupCount.sum()).isEqualTo(1000);
assertThat(cancelled).as("source cancelled").isTrue();
}


// == scanUnsafe tests ==

Expand Down Expand Up @@ -1114,4 +1291,4 @@ private Object[] sourcesContextError() {
}


}
}

0 comments on commit 5ae36a0

Please sign in to comment.