Skip to content

Commit

Permalink
Merge #3312 into 3.5.1
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
  • Loading branch information
Oleh Dokuka committed Dec 12, 2022
2 parents 558173d + 9ae9858 commit 94e7226
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ void drainAsync() {
sp = iterable.spliterator();
itFinite = FluxIterable.checkFinite(sp);

isEmpty = itFinite && sp.estimateSize() == 0;
isEmpty = itFinite ? sp.estimateSize() == 0 : !hasNext(sp);
}
catch (Throwable exc) {
sp = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,9 @@ static <T> boolean checkFinite(Spliterator<? extends T> spliterator) {
@Nullable
private final Runnable onClose;

FluxIterable(Iterable<? extends T> iterable, @Nullable Runnable onClose) {
this.iterable = Objects.requireNonNull(iterable, "iterable");
this.onClose = onClose;
}

FluxIterable(Iterable<? extends T> iterable) {
this(iterable, null);
this.iterable = Objects.requireNonNull(iterable, "iterable");
this.onClose = null;
}

@Override
Expand Down Expand Up @@ -158,11 +154,51 @@ static <T> void subscribe(CoreSubscriber<? super T> s, Spliterator<? extends T>
}

if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new IterableSubscriptionConditional<>((ConditionalSubscriber<? super T>) s,
sp, knownToBeFinite, onClose));
IterableSubscriptionConditional<? extends T> isc =
new IterableSubscriptionConditional<>((ConditionalSubscriber<? super T>) s,
sp,
knownToBeFinite,
onClose);

boolean hasNext;
try {
hasNext = isc.hasNext();
}
catch (Throwable ex) {
Operators.error(s, ex);
isc.onCloseWithDropError();
return;
}

if (!hasNext) {
Operators.complete(s);
isc.onCloseWithDropError();
return;
}

s.onSubscribe(isc);
}
else {
s.onSubscribe(new IterableSubscription<>(s, sp, knownToBeFinite, onClose));
IterableSubscription<? extends T> is =
new IterableSubscription<>(s, sp, knownToBeFinite, onClose);

boolean hasNext;
try {
hasNext = is.hasNext();
}
catch (Throwable ex) {
Operators.error(s, ex);
is.onCloseWithDropError();
return;
}

if (!hasNext) {
Operators.complete(s);
is.onCloseWithDropError();
return;
}

s.onSubscribe(is);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

package reactor.core.publisher;

import java.util.Iterator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,34 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.Scannable.Attr;
import reactor.core.scheduler.Schedulers;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.publisher.FluxOperatorTest;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FluxFlattenIterableTest extends FluxOperatorTest<String, String> {

Expand Down Expand Up @@ -678,6 +686,47 @@ public Context currentContext() {
assertThat(test.currentKnownToBeFinite).as("knownFinite reset").isFalse();
}

@ParameterizedTestWithName
@MethodSource("reactor.core.publisher.FluxIterableTest#factory")
public void testFluxIterableEmptyCase(Function<Flux, Flux> fn) {
Iterable<String> iterable = mock(Iterable.class);
Mockito.when(iterable.spliterator())
.thenReturn(mock(Spliterator.class));

StepVerifier.create(
Flux.just(1)
.hide()
.flatMapIterable(__ -> iterable)
.as(fn)
.next()
)
.expectSubscription()
.expectComplete()
.verify();
}

@ParameterizedTestWithName
@MethodSource("reactor.core.publisher.FluxIterableTest#factory")
public void testFluxIterableErrorHasNext(Function<Flux, Flux> fn) {
Iterable<String> iterable = mock(Iterable.class);
Spliterator mock = mock(Spliterator.class);
Mockito.when(iterable.spliterator())
.thenReturn(mock);

when(mock.tryAdvance(any())).thenThrow();

StepVerifier.create(
Flux.just(1)
.hide()
.flatMapIterable(__ -> iterable)
.as(fn)
.next()
)
.expectSubscription()
.expectError()
.verify();
}

static class ReferenceCounted {

int refCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;

import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -36,6 +41,7 @@
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.test.MockUtils;
import reactor.test.ParameterizedTestWithName;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.NonNull;
Expand All @@ -44,11 +50,119 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FluxIterableTest {

final Iterable<Integer> source = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

static Stream<Function<Flux, Flux>> factory() {
return Stream.of(new Function<Flux, Flux>() {
@Override
public Flux apply(Flux flux) {
return flux;
}

@Override
public String toString() {
return "normal fast-path";
}
}, new Function<Flux, Flux>() {
@Override
public Flux apply(Flux flux) {
return flux.filter(__ -> true);
}

@Override
public String toString() {
return "conditional fast-path";
}
}, new Function<Flux, Flux>() {
@Override
public Flux apply(Flux flux) {
return flux.limitRate(1);
}

@Override
public String toString() {
return "fused";
}
}, new Function<Flux, Flux>() {
@Override
public Flux apply(Flux flux) {
return flux.hide()
.limitRate(1);
}

@Override
public String toString() {
return "normal slow-path";
}
}, new Function<Flux, Flux>() {
@Override
public Flux apply(Flux flux) {
return flux.filter(__ -> true)
.hide()
.limitRate(1);
}

@Override
public String toString() {
return "conditional slow-path";
}
}, new Function<Flux, Flux>() {
@Override
public Flux apply(Flux flux) {
return flux.filter(__ -> true)
.limitRate(1);
}

@Override
public String toString() {
return "conditional-fused";
}
});
}

@ParameterizedTestWithName
@MethodSource("factory")
public void testFluxIterableEmptyCase(Function<Flux, Flux> fn) {
Iterable<String> iterable = mock(Iterable.class);
Mockito.when(iterable.spliterator())
.thenReturn(mock(Spliterator.class));

StepVerifier.create(
Flux.fromIterable(iterable)
.as(fn)
.next()
)
.expectSubscription()
.expectComplete()
.verify();
}

@ParameterizedTestWithName
@MethodSource("factory")
public void testFluxIterableErrorHasNext(Function<Flux, Flux> fn) {
Iterable<String> iterable = mock(Iterable.class);
Spliterator mock = mock(Spliterator.class);
Mockito.when(iterable.spliterator())
.thenReturn(mock);

when(mock.tryAdvance(any())).thenThrow();

StepVerifier.create(
Flux.fromIterable(iterable)
.as(fn)
.next()
)
.expectSubscription()
.expectError()
.verify();
}

@Test
//https://github.com/reactor/reactor-core/issues/3295
public void useIterableOncePerSubscriber() {
Expand Down Expand Up @@ -253,7 +367,7 @@ public void scanSubscription() {
@Test
public void scanConditionalSubscription() {
@SuppressWarnings("unchecked")
Fuseable.ConditionalSubscriber<? super String> actual = Mockito.mock(MockUtils.TestScannableConditionalSubscriber.class);
Fuseable.ConditionalSubscriber<? super String> actual = mock(MockUtils.TestScannableConditionalSubscriber.class);
Mockito.when(actual.currentContext()).thenReturn(Context.empty());
FluxIterable.IterableSubscriptionConditional<String> test =
new FluxIterable.IterableSubscriptionConditional<>(actual, Collections.singleton("test").spliterator(), true);
Expand Down Expand Up @@ -328,7 +442,7 @@ void smokeTestIterableConditionalSubscriptionWithInfiniteIterable() {
Context discardingContext = Operators.enableOnDiscard(Context.empty(), v -> { });

@SuppressWarnings("unchecked")
Fuseable.ConditionalSubscriber<Integer> testSubscriber = Mockito.mock(Fuseable.ConditionalSubscriber.class);
Fuseable.ConditionalSubscriber<Integer> testSubscriber = mock(Fuseable.ConditionalSubscriber.class);
Mockito.when(testSubscriber.currentContext()).thenReturn(discardingContext);

Spliterator<Integer> iterator = Spliterators.spliteratorUnknownSize(new Iterator<Integer>() {
Expand Down

0 comments on commit 94e7226

Please sign in to comment.