Skip to content

Commit

Permalink
Provide onDiscard support in UnicastProcessor (#2102)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Apr 6, 2020
1 parent e63dc71 commit 7b55b6e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void doTerminate() {
}
}

void drainRegular(Subscriber<? super T> a) {
void drainRegular(CoreSubscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;
Expand Down Expand Up @@ -259,15 +259,15 @@ void drainRegular(Subscriber<? super T> a) {
}
}

void drainFused(Subscriber<? super T> a) {
void drainFused(CoreSubscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;

for (;;) {

if (cancelled) {
q.clear();
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
return;
}
Expand Down Expand Up @@ -303,7 +303,7 @@ void drain() {
int missed = 1;

for (;;) {
Subscriber<? super T> a = actual;
CoreSubscriber<? super T> a = actual;
if (a != null) {

if (outputFused) {
Expand All @@ -321,9 +321,9 @@ void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, Queue<T> q) {
boolean checkTerminated(boolean d, boolean empty, CoreSubscriber<? super T> a, Queue<T> q) {
if (cancelled) {
q.clear();
Operators.onDiscardQueueWithClear(q, a.currentContext(), null);
actual = null;
return true;
}
Expand Down Expand Up @@ -449,11 +449,9 @@ public void cancel() {

doTerminate();

if (!outputFused) {
if (WIP.getAndIncrement(this) == 0) {
queue.clear();
actual = null;
}
if (WIP.getAndIncrement(this) == 0) {
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
actual = null;
}
}

Expand All @@ -475,7 +473,7 @@ public boolean isEmpty() {

@Override
public void clear() {
queue.clear();
Operators.onDiscardQueueWithClear(queue, currentContext(), null);
}

@Override
Expand Down
20 changes: 14 additions & 6 deletions reactor-core/src/test/java/reactor/core/publisher/FluxZipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -898,48 +898,56 @@ public void backpressuredAsyncFusedCancelled2() {
}

@Test
@SuppressWarnings("rawtypes")
public void backpressuredAsyncFusedError() {
Hooks.onErrorDropped(c -> {
assertThat(c).hasMessage("test2");
});
UnicastProcessor<Integer> up = UnicastProcessor.create();
FluxZip.ZipInner[] inner = new FluxZip.ZipInner[1];
StepVerifier.create(Flux.zip(obj -> (int) obj[0] + (int) obj[1],
1,
up,
Flux.just(2, 3, 5)), 0)
.expectSubscription()
.then(() -> inner[0] = ((FluxZip.ZipInner) up.actual))
.then(() -> up.onNext(1))
.thenRequest(1)
.expectNext(3)
.then(() -> up.onNext(2))
.thenRequest(1)
.expectNext(5)
.then(() -> up.actual.onError(new Exception("test")))
.then(() -> up.actual.onError(new Exception("test2")))
.then(() -> inner[0].onError(new Exception("test")))
.then(() -> inner[0].onError(new Exception("test2")))
.verifyErrorMessage("test");
}

@Test
@SuppressWarnings("rawtypes")
public void backpressuredAsyncFusedErrorHideAll() {
Hooks.onErrorDropped(c -> {
assertThat(c).hasMessage("test2");
});
UnicastProcessor<Integer> up = UnicastProcessor.create();
FluxZip.ZipInner[] inner = new FluxZip.ZipInner[1];
StepVerifier.create(Flux.zip(obj -> (int) obj[0] + (int) obj[1], 1, up, s -> {
assertThat(((FluxZip.ZipInner) up.actual).parent.subscribers[1].done).isFalse();
Flux.just(2, 3, 5)
.subscribe(s);
})
.hide(), 0)
.expectSubscription()
.then(() -> inner[0] = ((FluxZip.ZipInner) up.actual))
.then(() -> up.onNext(1))
.thenRequest(1)
.expectNext(3)
.then(() -> up.onNext(2))
.thenRequest(1)
.expectNext(5)
.then(() -> assertThat(((FluxZip.ZipInner) up.actual).done).isFalse())
.then(() -> up.actual.onError(new Exception("test")))
.then(() -> assertThat(((FluxZip.ZipInner) up.actual).done).isTrue())
.then(() -> up.actual.onError(new Exception("test2")))
.then(() -> assertThat(inner[0].done).isFalse())
.then(() -> inner[0].onError(new Exception("test")))
.then(() -> assertThat(inner[0].done).isTrue())
.then(() -> inner[0].onError(new Exception("test2")))
.verifyErrorMessage("test");
}

Expand Down

0 comments on commit 7b55b6e

Please sign in to comment.