Skip to content

Commit

Permalink
Backport #2102 Provide onDiscard support in UnicastProcessor
Browse files Browse the repository at this point in the history
Reviewed-in: #2126
  • Loading branch information
simonbasle committed Apr 22, 2020
1 parent c0ef739 commit 9b7bd24
Showing 1 changed file with 10 additions and 12 deletions.
Expand Up @@ -213,7 +213,7 @@ void doTerminate() {
}
}

void drainRegular(Subscriber<? super T> a) {
void drainRegular(CoreSubscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;
Expand Down Expand Up @@ -259,15 +259,15 @@ void drainRegular(Subscriber<? super T> a) {
}
}

void drainFused(Subscriber<? super T> a) {
void drainFused(CoreSubscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;

for (;;) {

if (cancelled) {
q.clear();
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
return;
}
Expand Down Expand Up @@ -303,7 +303,7 @@ void drain() {
int missed = 1;

for (;;) {
Subscriber<? super T> a = actual;
CoreSubscriber<? super T> a = actual;
if (a != null) {

if (outputFused) {
Expand All @@ -321,9 +321,9 @@ void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, Queue<T> q) {
boolean checkTerminated(boolean d, boolean empty, CoreSubscriber<? super T> a, Queue<T> q) {
if (cancelled) {
q.clear();
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
return true;
}
Expand Down Expand Up @@ -449,11 +449,9 @@ public void cancel() {

doTerminate();

if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
queue.clear();
actual = null;
}
if (WIP.getAndIncrement(this) == 0) {
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
actual = null;
}
}

Expand All @@ -475,7 +473,7 @@ public boolean isEmpty() {

@Override
public void clear() {
queue.clear();
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
}

@Override
Expand Down

0 comments on commit 9b7bd24

Please sign in to comment.