Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #1342 Let collectList discard accumulated elements on error/cancel #1345

Merged
merged 1 commit into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void onError(Throwable t) {
return;
}
collection = null;
Operators.onDiscardMultiple(c, currentContext());
actual.onError(t);
}

Expand All @@ -119,6 +120,8 @@ public void onComplete() {

@Override
public void cancel() {
//specific discard of the collection
Operators.onDiscardMultiple(collection, currentContext());
super.cancel();
s.cancel();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package reactor.core.publisher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -128,4 +130,153 @@ public void scanBufferAllSubscriber() {
test.cancel();
assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
}

@Test
public void discardOnError() {
Mono<List<Integer>> test = Flux.range(1, 10)
.hide()
.map(i -> {
if (i == 5) throw new IllegalStateException("boom");
return i;
})
.collectList();

StepVerifier.create(test)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1, 2, 3, 4);
}

@Test
public void discardOnCancel() {
Mono<List<Long>> test = Flux.interval(Duration.ofMillis(100))
.take(10)
.collectList();

StepVerifier.create(test)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(210))
.thenCancel()
.verifyThenAssertThat()
.hasDiscardedExactly(0L, 1L);
}

@Test
public void discardOnNextPredicateMiss() {
StepVerifier.create(Flux.range(1, 10)
.hide() //hide both avoid the fuseable AND tryOnNext usage
.filter(i -> i % 2 == 0)
)
.expectNextCount(5)
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 3, 5, 7, 9);
}

@Test
public void discardTryOnNextPredicateFail() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, null);
FluxFilter.FilterSubscriber<Integer> subscriber =
new FluxFilter.FilterSubscriber<>(actual, i -> { throw new IllegalStateException("boom"); });
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}

@Test
public void discardTryOnNextPredicateMiss() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, null, null, null);
FluxFilter.FilterSubscriber<Integer> subscriber =
new FluxFilter.FilterSubscriber<>(actual, i -> i % 2 == 0);
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
subscriber.tryOnNext(2);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}

@Test
public void discardConditionalOnNextPredicateFail() {
StepVerifier.create(Flux.range(1, 10)
.hide()
.filter(i -> { throw new IllegalStateException("boom"); })
.filter(i -> true)
)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}

@Test
public void discardConditionalOnNextPredicateMiss() {
StepVerifier.create(Flux.range(1, 10)
.hide()
.filter(i -> i % 2 == 0)
.filter(i -> true)
)
.expectNextCount(5)
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 3, 5, 7, 9);
}

@Test
public void discardConditionalTryOnNextPredicateFail() {
Fuseable.ConditionalSubscriber<Integer> actual = new FluxPeekFuseableTest.ConditionalAssertSubscriber<>();

FluxFilter.FilterConditionalSubscriber<Integer> subscriber =
new FluxFilter.FilterConditionalSubscriber<>(actual, i -> {
throw new IllegalStateException("boom");
});
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}

@Test
public void discardConditionalTryOnNextPredicateMiss() {
Fuseable.ConditionalSubscriber<Integer> actual = new FluxPeekFuseableTest.ConditionalAssertSubscriber<>();

FluxFilter.FilterConditionalSubscriber<Integer> subscriber =
new FluxFilter.FilterConditionalSubscriber<>(actual, i -> i % 2 == 0);
subscriber.onSubscribe(Operators.emptySubscription());

List<Object> discarded = new ArrayList<>();
Hooks.onDiscard(discarded::add);
try {
subscriber.tryOnNext(1);
subscriber.tryOnNext(2);
}
finally {
Hooks.resetOnDiscard();
}

assertThat(discarded).containsExactly(1);
}
}