From 3d3e3b900d487ff61f766b3b8506b026399d1532 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Mon, 20 Apr 2020 19:08:03 +0200 Subject: [PATCH] Backport #2102 Provide onDiscard support in UnicastProcessor Reviewed-in: #2126 --- .../core/publisher/UnicastProcessor.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java b/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java index 063b7cd30f..4eee5cc18a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java +++ b/reactor-core/src/main/java/reactor/core/publisher/UnicastProcessor.java @@ -213,7 +213,7 @@ void doTerminate() { } } - void drainRegular(Subscriber a) { + void drainRegular(CoreSubscriber a) { int missed = 1; final Queue q = queue; @@ -259,7 +259,7 @@ void drainRegular(Subscriber a) { } } - void drainFused(Subscriber a) { + void drainFused(CoreSubscriber a) { int missed = 1; final Queue q = queue; @@ -267,7 +267,7 @@ void drainFused(Subscriber a) { for (;;) { if (cancelled) { - q.clear(); + Operators.onDiscardQueueWithClear(q, a.currentContext(), null); actual = null; return; } @@ -303,7 +303,7 @@ void drain() { int missed = 1; for (;;) { - Subscriber a = actual; + CoreSubscriber a = actual; if (a != null) { if (outputFused) { @@ -321,9 +321,9 @@ void drain() { } } - boolean checkTerminated(boolean d, boolean empty, Subscriber a, Queue q) { + boolean checkTerminated(boolean d, boolean empty, CoreSubscriber a, Queue q) { if (cancelled) { - q.clear(); + Operators.onDiscardQueueWithClear(q, a.currentContext(), null); actual = null; return true; } @@ -449,11 +449,9 @@ public void cancel() { doTerminate(); - if (!outputFused) { - if (WIP.getAndIncrement(this) == 0) { - queue.clear(); - actual = null; - } + if (WIP.getAndIncrement(this) == 0) { + Operators.onDiscardQueueWithClear(queue, currentContext(), null); + actual = null; } } @@ -475,7 +473,7 @@ public boolean isEmpty() { @Override public void clear() { - queue.clear(); + Operators.onDiscardQueueWithClear(queue, currentContext(), null); } @Override