Skip to content

Commit

Permalink
fix #1342 Let collectList discard accumulated elements on error/cancel (
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle authored and smaldini committed Sep 12, 2018
1 parent 68e873d commit e337f0e
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void onError(Throwable t) {
return;
}
collection = null;
Operators.onDiscardMultiple(c, currentContext());
actual.onError(t);
}

Expand All @@ -119,6 +120,8 @@ public void onComplete() {

@Override
public void cancel() {
//specific discard of the collection
Operators.onDiscardMultiple(collection, currentContext());
super.cancel();
s.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package reactor.core.publisher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -128,4 +130,153 @@ public void scanBufferAllSubscriber() {
test.cancel();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

@Test
public void discardOnError() {
Mono<List<Integer>> test = Flux.range(1, 10)
.hide()
.map(i -> {
if (i == 5) throw new IllegalStateException("boom");
return i;
})
.collectList();

StepVerifier.create(test)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 4);
}

@Test
public void discardOnCancel() {
Mono<List<Long>> test = Flux.interval(Duration.ofMillis(100))
.take(10)
.collectList();

StepVerifier.create(test)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(210))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(0L, 1L);
}

@Test
public void discardOnNextPredicateMiss() {
StepVerifier.create(Flux.range(1, 10)
.hide() //hide both avoid the fuseable AND tryOnNext usage
.filter(i -> i % 2 == 0)
)
.expectNextCount(5)
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 3, 5, 7, 9);
}

@Test
public void discardTryOnNextPredicateFail() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxFilter.FilterSubscriber<Integer> subscriber =
new FluxFilter.FilterSubscriber<>(actual, i -> { throw new IllegalStateException("boom"); });
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}

@Test
public void discardTryOnNextPredicateMiss() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, null, null, null);
FluxFilter.FilterSubscriber<Integer> subscriber =
new FluxFilter.FilterSubscriber<>(actual, i -> i % 2 == 0);
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
subscriber.tryOnNext(2);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}

@Test
public void discardConditionalOnNextPredicateFail() {
StepVerifier.create(Flux.range(1, 10)
.hide()
.filter(i -> { throw new IllegalStateException("boom"); })
.filter(i -> true)
)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}

@Test
public void discardConditionalOnNextPredicateMiss() {
StepVerifier.create(Flux.range(1, 10)
.hide()
.filter(i -> i % 2 == 0)
.filter(i -> true)
)
.expectNextCount(5)
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 3, 5, 7, 9);
}

@Test
public void discardConditionalTryOnNextPredicateFail() {
Fuseable.ConditionalSubscriber<Integer> actual = new FluxPeekFuseableTest.ConditionalAssertSubscriber<>();

FluxFilter.FilterConditionalSubscriber<Integer> subscriber =
new FluxFilter.FilterConditionalSubscriber<>(actual, i -> {
throw new IllegalStateException("boom");
});
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}

@Test
public void discardConditionalTryOnNextPredicateMiss() {
Fuseable.ConditionalSubscriber<Integer> actual = new FluxPeekFuseableTest.ConditionalAssertSubscriber<>();

FluxFilter.FilterConditionalSubscriber<Integer> subscriber =
new FluxFilter.FilterConditionalSubscriber<>(actual, i -> i % 2 == 0);
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
subscriber.tryOnNext(2);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}
}

0 comments on commit e337f0e

Please sign in to comment.