diff --git a/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java index 4eee5cc18a..bde5af3023 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java @@ -154,6 +154,7 @@ public static UnicastProcessor create(Queue queue, volatile boolean done; Throwable error; + boolean hasDownstream; //important to not loose the downstream too early and miss discard hook, while having relevant hasDownstreams() volatile CoreSubscriber actual; volatile boolean cancelled; @@ -203,6 +204,7 @@ public int getBufferSize() { @Override public Object scanUnsafe(Attr key) { if (Attr.BUFFERED == key) return queue.size(); + if (Attr.PREFETCH == key) return Integer.MAX_VALUE; return super.scanUnsafe(key); } @@ -229,7 +231,7 @@ void drainRegular(CoreSubscriber a) { T t = q.poll(); boolean empty = t == null; - if (checkTerminated(d, empty, a, q)) { + if (checkTerminated(d, empty, a, q, t)) { return; } @@ -243,7 +245,7 @@ void drainRegular(CoreSubscriber a) { } if (r == e) { - if (checkTerminated(done, q.isEmpty(), a, q)) { + if (checkTerminated(done, q.isEmpty(), a, q, null)) { return; } } @@ -268,7 +270,7 @@ void drainFused(CoreSubscriber a) { if (cancelled) { Operators.onDiscardQueueWithClear(q, a.currentContext(), null); - actual = null; + hasDownstream = false; return; } @@ -277,7 +279,7 @@ void drainFused(CoreSubscriber a) { a.onNext(null); if (d) { - actual = null; + hasDownstream = false; Throwable ex = error; if (ex != null) { @@ -295,8 +297,11 @@ void drainFused(CoreSubscriber a) { } } - void drain() { + void drain(@Nullable T dataSignalOfferedBeforeDrain) { if (WIP.getAndIncrement(this) != 0) { + if (dataSignalOfferedBeforeDrain != null && cancelled) { + Operators.onDiscard(dataSignalOfferedBeforeDrain, actual.currentContext()); + } return; } @@ -321,15 +326,16 @@ void drain() { } } - boolean checkTerminated(boolean d, boolean empty, CoreSubscriber a, Queue q) { + boolean checkTerminated(boolean d, boolean empty, CoreSubscriber a, Queue q, @Nullable T t) { if (cancelled) { + Operators.onDiscard(t, a.currentContext()); Operators.onDiscardQueueWithClear(q, a.currentContext(), null); - actual = null; + hasDownstream = false; return true; } if (d && empty) { Throwable e = error; - actual = null; + hasDownstream = false; if (e != null) { a.onError(e); } else { @@ -369,9 +375,10 @@ public void onNext(T t) { } if (!queue.offer(t)) { + Context ctx = actual.currentContext(); Throwable ex = Operators.onOperatorError(null, - Exceptions.failWithOverflow(), t, currentContext()); - if(onOverflow != null) { + Exceptions.failWithOverflow(), t, ctx); + if (onOverflow != null) { try { onOverflow.accept(t); } @@ -380,10 +387,11 @@ public void onNext(T t) { ex.initCause(e); } } - onError(Operators.onOperatorError(null, ex, t, currentContext())); + Operators.onDiscard(t, ctx); + onError(ex); return; } - drain(); + drain(t); } @Override @@ -398,7 +406,7 @@ public void onError(Throwable t) { doTerminate(); - drain(); + drain(null); } @Override @@ -411,7 +419,7 @@ public void onComplete() { doTerminate(); - drain(); + drain(null); } @Override @@ -422,9 +430,9 @@ public void subscribe(CoreSubscriber actual) { actual.onSubscribe(this); this.actual = actual; if (cancelled) { - this.actual = null; + this.hasDownstream = false; } else { - drain(); + drain(null); } } else { Operators.error(actual, new IllegalStateException("UnicastProcessor " + @@ -436,7 +444,7 @@ public void subscribe(CoreSubscriber actual) { public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); - drain(); + drain(null); } } @@ -451,7 +459,7 @@ public void cancel() { if (WIP.getAndIncrement(this) == 0) { Operators.onDiscardQueueWithClear(queue, currentContext(), null); - actual = null; + hasDownstream = false; } } @@ -513,6 +521,6 @@ public long downstreamCount() { @Override public boolean hasDownstreams() { - return actual != null; + return hasDownstream; } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/UnicastProcessorOnDiscardShouldNotLeakTest.java b/reactor-core/src/test/java/reactor/core/publisher/UnicastProcessorOnDiscardShouldNotLeakTest.java new file mode 100644 index 0000000000..009e24abd9 --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/publisher/UnicastProcessorOnDiscardShouldNotLeakTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2011-Present VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import org.reactivestreams.Publisher; + +import reactor.test.publisher.TestPublisher; + +public class UnicastProcessorOnDiscardShouldNotLeakTest extends AbstractOnDiscardShouldNotLeakTest { + + public UnicastProcessorOnDiscardShouldNotLeakTest(boolean conditional, boolean fused) { + super(conditional, fused); + } + + @Override + protected Publisher> transform(TestPublisher> main, + TestPublisher>... additional) { + return main + .flux() + .subscribeWith(UnicastProcessor.create()); + } +} \ No newline at end of file