Skip to content

Commit

Permalink
fix #2124 UnicastProcessor discards when onNext/cancel races
Browse files Browse the repository at this point in the history
This is a follow-up to #2125, this time targeting the same class of
discarding race but on `UnicastProcessor`.

As there is a chance that the race references `actual` (to resolve the
discard hook) AFTER `cancel()`, we need to keep that field populated.
It was nulled out for the benefit of `downstreamCount()` (see #980) so
we replace that by a boolean, set to true when subscribed.

Reviewed-in: #2126
  • Loading branch information
simonbasle committed Apr 22, 2020
1 parent 3d3e3b9 commit 9a76624
Show file tree
Hide file tree
Showing 3 changed files with 512 additions and 489 deletions.
Expand Up @@ -154,6 +154,7 @@ public static <E> UnicastProcessor<E> create(Queue<E> queue,
volatile boolean done;
Throwable error;

boolean hasDownstream; //important to not loose the downstream too early and miss discard hook, while having relevant hasDownstreams()
volatile CoreSubscriber<? super T> actual;

volatile boolean cancelled;
Expand Down Expand Up @@ -203,6 +204,7 @@ public int getBufferSize() {
@Override
public Object scanUnsafe(Attr key) {
if (Attr.BUFFERED == key) return queue.size();
if (Attr.PREFETCH == key) return Integer.MAX_VALUE;
return super.scanUnsafe(key);
}

Expand All @@ -229,7 +231,7 @@ void drainRegular(CoreSubscriber<? super T> a) {
T t = q.poll();
boolean empty = t == null;

if (checkTerminated(d, empty, a, q)) {
if (checkTerminated(d, empty, a, q, t)) {
return;
}

Expand All @@ -243,7 +245,7 @@ void drainRegular(CoreSubscriber<? super T> a) {
}

if (r == e) {
if (checkTerminated(done, q.isEmpty(), a, q)) {
if (checkTerminated(done, q.isEmpty(), a, q, null)) {
return;
}
}
Expand All @@ -268,7 +270,7 @@ void drainFused(CoreSubscriber<? super T> a) {

if (cancelled) {
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
hasDownstream = false;
return;
}

Expand All @@ -277,7 +279,7 @@ void drainFused(CoreSubscriber<? super T> a) {
a.onNext(null);

if (d) {
actual = null;
hasDownstream = false;

Throwable ex = error;
if (ex != null) {
Expand All @@ -295,8 +297,11 @@ void drainFused(CoreSubscriber<? super T> a) {
}
}

void drain() {
void drain(@Nullable T dataSignalOfferedBeforeDrain) {
if (WIP.getAndIncrement(this) != 0) {
if (dataSignalOfferedBeforeDrain != null && cancelled) {
Operators.onDiscard(dataSignalOfferedBeforeDrain, actual.currentContext());
}
return;
}

Expand All @@ -321,15 +326,16 @@ void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, CoreSubscriber<? super T> a, Queue<T> q) {
boolean checkTerminated(boolean d, boolean empty, CoreSubscriber<? super T> a, Queue<T> q, @Nullable T t) {
if (cancelled) {
Operators.onDiscard(t, a.currentContext());
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
hasDownstream = false;
return true;
}
if (d && empty) {
Throwable e = error;
actual = null;
hasDownstream = false;
if (e != null) {
a.onError(e);
} else {
Expand Down Expand Up @@ -369,9 +375,10 @@ public void onNext(T t) {
}

if (!queue.offer(t)) {
Context ctx = actual.currentContext();
Throwable ex = Operators.onOperatorError(null,
Exceptions.failWithOverflow(), t, currentContext());
if(onOverflow != null) {
Exceptions.failWithOverflow(), t, ctx);
if (onOverflow != null) {
try {
onOverflow.accept(t);
}
Expand All @@ -380,10 +387,11 @@ public void onNext(T t) {
ex.initCause(e);
}
}
onError(Operators.onOperatorError(null, ex, t, currentContext()));
Operators.onDiscard(t, ctx);
onError(ex);
return;
}
drain();
drain(t);
}

@Override
Expand All @@ -398,7 +406,7 @@ public void onError(Throwable t) {

doTerminate();

drain();
drain(null);
}

@Override
Expand All @@ -411,20 +419,21 @@ public void onComplete() {

doTerminate();

drain();
drain(null);
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Objects.requireNonNull(actual, "subscribe");
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {

this.hasDownstream = true;
actual.onSubscribe(this);
this.actual = actual;
if (cancelled) {
this.actual = null;
this.hasDownstream = false;
} else {
drain();
drain(null);
}
} else {
Operators.error(actual, new IllegalStateException("UnicastProcessor " +
Expand All @@ -436,7 +445,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
public void request(long n) {
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
drain();
drain(null);
}
}

Expand All @@ -451,7 +460,7 @@ public void cancel() {

if (WIP.getAndIncrement(this) == 0) {
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
actual = null;
hasDownstream = false;
}
}

Expand Down Expand Up @@ -513,6 +522,6 @@ public long downstreamCount() {

@Override
public boolean hasDownstreams() {
return actual != null;
return hasDownstream;
}
}

0 comments on commit 9a76624

Please sign in to comment.