Skip to content

Commit

Permalink
fix #2124 UnicastProcessor discards when onNext/cancel races
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Apr 21, 2020
1 parent 1a7f804 commit 2fc5796
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 19 deletions.
Expand Up @@ -154,6 +154,7 @@ public static <E> UnicastProcessor<E> create(Queue<E> queue,
volatile boolean done;
Throwable error;

boolean hasDownstream; //important to not loose the downstream too early and miss discard hook, while having relevant hasDownstreams()
volatile CoreSubscriber<? super T> actual;

volatile boolean cancelled;
Expand Down Expand Up @@ -203,6 +204,7 @@ public int getBufferSize() {
@Override
public Object scanUnsafe(Attr key) {
if (Attr.BUFFERED == key) return queue.size();
if (Attr.PREFETCH == key) return Integer.MAX_VALUE;
return super.scanUnsafe(key);
}

Expand All @@ -229,7 +231,7 @@ void drainRegular(CoreSubscriber<? super T> a) {
T t = q.poll();
boolean empty = t == null;

if (checkTerminated(d, empty, a, q)) {
if (checkTerminated(d, empty, a, q, t)) {
return;
}

Expand All @@ -243,7 +245,7 @@ void drainRegular(CoreSubscriber<? super T> a) {
}

if (r == e) {
if (checkTerminated(done, q.isEmpty(), a, q)) {
if (checkTerminated(done, q.isEmpty(), a, q, null)) {
return;
}
}
Expand All @@ -268,7 +270,7 @@ void drainFused(CoreSubscriber<? super T> a) {

if (cancelled) {
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
hasDownstream = false;
return;
}

Expand All @@ -277,7 +279,7 @@ void drainFused(CoreSubscriber<? super T> a) {
a.onNext(null);

if (d) {
actual = null;
hasDownstream = false;

Throwable ex = error;
if (ex != null) {
Expand All @@ -295,8 +297,11 @@ void drainFused(CoreSubscriber<? super T> a) {
}
}

void drain() {
void drain(@Nullable T dataSignalOfferedBeforeDrain) {
if (WIP.getAndIncrement(this) != 0) {
if (dataSignalOfferedBeforeDrain != null && cancelled) {
Operators.onDiscard(dataSignalOfferedBeforeDrain, actual.currentContext());
}
return;
}

Expand All @@ -321,15 +326,16 @@ void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, CoreSubscriber<? super T> a, Queue<T> q) {
boolean checkTerminated(boolean d, boolean empty, CoreSubscriber<? super T> a, Queue<T> q, @Nullable T t) {
if (cancelled) {
Operators.onDiscard(t, a.currentContext());
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
hasDownstream = false;
return true;
}
if (d && empty) {
Throwable e = error;
actual = null;
hasDownstream = false;
if (e != null) {
a.onError(e);
} else {
Expand Down Expand Up @@ -369,9 +375,10 @@ public void onNext(T t) {
}

if (!queue.offer(t)) {
Context ctx = actual.currentContext();
Throwable ex = Operators.onOperatorError(null,
Exceptions.failWithOverflow(), t, currentContext());
if(onOverflow != null) {
Exceptions.failWithOverflow(), t, ctx);
if (onOverflow != null) {
try {
onOverflow.accept(t);
}
Expand All @@ -380,10 +387,11 @@ public void onNext(T t) {
ex.initCause(e);
}
}
onError(Operators.onOperatorError(null, ex, t, currentContext()));
Operators.onDiscard(t, ctx);
onError(ex);
return;
}
drain();
drain(t);
}

@Override
Expand All @@ -398,7 +406,7 @@ public void onError(Throwable t) {

doTerminate();

drain();
drain(null);
}

@Override
Expand All @@ -411,7 +419,7 @@ public void onComplete() {

doTerminate();

drain();
drain(null);
}

@Override
Expand All @@ -422,9 +430,9 @@ public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(this);
this.actual = actual;
if (cancelled) {
this.actual = null;
this.hasDownstream = false;
} else {
drain();
drain(null);
}
} else {
Operators.error(actual, new IllegalStateException("UnicastProcessor " +
Expand All @@ -436,7 +444,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
public void request(long n) {
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
drain();
drain(null);
}
}

Expand All @@ -451,7 +459,7 @@ public void cancel() {

if (WIP.getAndIncrement(this) == 0) {
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
actual = null;
hasDownstream = false;
}
}

Expand Down Expand Up @@ -513,6 +521,6 @@ public long downstreamCount() {

@Override
public boolean hasDownstreams() {
return actual != null;
return hasDownstream;
}
}
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2011-Present VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import org.reactivestreams.Publisher;

import reactor.test.publisher.TestPublisher;

public class UnicastProcessorOnDiscardShouldNotLeakTest extends AbstractOnDiscardShouldNotLeakTest {

public UnicastProcessorOnDiscardShouldNotLeakTest(boolean conditional, boolean fused) {
super(conditional, fused);
}

@Override
protected Publisher<Tracked<?>> transform(TestPublisher<Tracked<?>> main,
TestPublisher<Tracked<?>>... additional) {
return main
.flux()
.subscribeWith(UnicastProcessor.create());
}
}

0 comments on commit 2fc5796

Please sign in to comment.