From af6628c7823952a91f3b7b3d37054373be267482 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 29 Oct 2018 23:45:02 +0200 Subject: [PATCH 1/2] included a few fixes for SwitchTransform Signed-off-by: Oleh Dokuka --- .../rsocket/internal/SwitchTransformFlux.java | 507 ++++++++++++++---- .../internal/SwitchTransformFluxTest.java | 370 +++++++++++-- 2 files changed, 734 insertions(+), 143 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java index 6b4626f9b..6a36cf090 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java @@ -16,18 +16,21 @@ package io.rsocket.internal; -import io.netty.util.ReferenceCountUtil; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; + +import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Operators; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; public final class SwitchTransformFlux extends Flux { @@ -35,7 +38,7 @@ public final class SwitchTransformFlux extends Flux { final BiFunction, Publisher> transformer; public SwitchTransformFlux( - Publisher source, BiFunction, Publisher> transformer) { + Publisher source, BiFunction, Publisher> transformer) { this.source = Objects.requireNonNull(source, "source"); this.transformer = Objects.requireNonNull(transformer, "transformer"); } @@ -46,30 +49,47 @@ public int getPrefetch() { } @Override + @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber actual) { - source.subscribe(new SwitchTransformMain<>(actual, transformer)); + if (actual instanceof Fuseable.ConditionalSubscriber) { + source.subscribe(new SwitchTransformConditionalOperator<>((Fuseable.ConditionalSubscriber) actual, transformer)); + return; + } + source.subscribe(new SwitchTransformOperator<>(actual, transformer)); } - static final class SwitchTransformMain implements CoreSubscriber, Scannable { + static final class SwitchTransformOperator extends Flux + implements CoreSubscriber, Subscription, Scannable { - final CoreSubscriber actual; + final CoreSubscriber outer; final BiFunction, Publisher> transformer; - final SwitchTransformInner inner; Subscription s; + Throwable throwable; - volatile int once; + volatile boolean done; + volatile T first; + volatile CoreSubscriber inner; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformMain.class, "once"); + static final AtomicReferenceFieldUpdater INNER = + AtomicReferenceFieldUpdater.newUpdater(SwitchTransformOperator.class, CoreSubscriber.class, "inner"); - SwitchTransformMain( - CoreSubscriber actual, - BiFunction, Publisher> transformer) { - this.actual = actual; + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip"); + + volatile int once; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once"); + + SwitchTransformOperator( + CoreSubscriber outer, + BiFunction, Publisher> transformer) { + this.outer = outer; this.transformer = transformer; - this.inner = new SwitchTransformInner<>(this); } @Override @@ -81,6 +101,44 @@ public Object scanUnsafe(Attr key) { return null; } + @Override + public Context currentContext() { + CoreSubscriber actual = inner; + + if (actual != null) { + return actual.currentContext(); + } + + return outer.currentContext(); + } + + @Override + public void cancel() { + if (s != Operators.cancelledSubscription()) { + Subscription s = this.s; + this.s = Operators.cancelledSubscription(); + ReferenceCountUtil.safeRelease(first); + + if (WIP.getAndIncrement(this) == 0) { + INNER.lazySet(this, null); + first = null; + } + + s.cancel(); + } + } + + @Override + public void subscribe(CoreSubscriber actual) { + if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + INNER.lazySet(this, actual); + actual.onSubscribe(this); + } + else { + Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } + } + @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { @@ -91,161 +149,428 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - if (isCanceled()) { + if (done) { + Operators.onNextDropped(t, currentContext()); return; } - if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { + CoreSubscriber i = inner; + + if (i == null) { try { - inner.first = t; + first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, inner), "The transformer returned a null value"); - result.subscribe(actual); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); + result.subscribe(outer); return; - } catch (Throwable e) { - onError(Operators.onOperatorError(s, e, t, actual.currentContext())); + } + catch (Throwable e) { + onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return; } } - inner.onNext(t); + i.onNext(t); } @Override public void onError(Throwable t) { - if (isCanceled()) { + if (done) { + Operators.onErrorDropped(t, currentContext()); return; } - if (once != 0) { - inner.onError(t); - } else { - actual.onSubscribe(Operators.emptySubscription()); - actual.onError(t); + throwable = t; + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.error(outer, t); } } @Override public void onComplete() { - if (isCanceled()) { + if (done) { return; } - if (once != 0) { - inner.onComplete(); - } else { - actual.onSubscribe(Operators.emptySubscription()); - actual.onComplete(); + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.complete(outer); } } - boolean isCanceled() { - return s == Operators.cancelledSubscription(); + @Override + public void request(long n) { + if (first != null && drainRegular() && n != Long.MAX_VALUE) { + n = Operators.addCap(n, -1); + if (n > 0) { + s.request(n); + } + } + else { + s.request(n); + } } - void cancel() { - s.cancel(); - s = Operators.cancelledSubscription(); + boolean drainRegular() { + if (WIP.getAndIncrement(this) != 0) { + return false; + } + + T f = first; + int m = 1; + boolean sent = false; + Subscription s = this.s; + CoreSubscriber a = inner; + + for (;;) { + if (f != null) { + first = null; + ReferenceCountUtil.safeRelease(f); + + if (s == Operators.cancelledSubscription()) { + Operators.onNextDropped(f, a.currentContext()); + return true; + } + + a.onNext(f); + f = null; + sent = true; + } + + if (s == Operators.cancelledSubscription()) { + return sent; + } + + if (done) { + Throwable t = throwable; + if (t != null) { + a.onError(t); + } + else { + a.onComplete(); + } + return sent; + } + + + m = WIP.addAndGet(this, -m); + + if (m == 0) { + return sent; + } + } } } - static final class SwitchTransformInner extends Flux implements Scannable, Subscription { - final SwitchTransformMain parent; + static final class SwitchTransformConditionalOperator extends Flux + implements Fuseable.ConditionalSubscriber, Subscription, Scannable { - volatile CoreSubscriber actual; + final Fuseable.ConditionalSubscriber outer; + final BiFunction, Publisher> transformer; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater ACTUAL = - AtomicReferenceFieldUpdater.newUpdater( - SwitchTransformInner.class, CoreSubscriber.class, "actual"); + Subscription s; + Throwable throwable; - volatile V first; + volatile boolean done; + volatile T first; + volatile Fuseable.ConditionalSubscriber inner; @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater FIRST = - AtomicReferenceFieldUpdater.newUpdater(SwitchTransformInner.class, Object.class, "first"); + static final AtomicReferenceFieldUpdater INNER = + AtomicReferenceFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, Fuseable.ConditionalSubscriber.class, "inner"); - volatile int once; + volatile int wip; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip"); + volatile int once; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformInner.class, "once"); + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once"); - SwitchTransformInner(SwitchTransformMain parent) { - this.parent = parent; + SwitchTransformConditionalOperator( + Fuseable.ConditionalSubscriber outer, + BiFunction, Publisher> transformer) { + this.outer = outer; + this.transformer = transformer; } - public void onNext(V t) { - CoreSubscriber a = actual; + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription(); + if (key == Attr.PREFETCH) return 1; - if (a != null) { - a.onNext(t); - } + return null; } - public void onError(Throwable t) { - CoreSubscriber a = actual; + @Override + public Context currentContext() { + CoreSubscriber actual = inner; - if (a != null) { - a.onError(t); + if (actual != null) { + return actual.currentContext(); } + + return outer.currentContext(); } - public void onComplete() { - CoreSubscriber a = actual; + @Override + public void cancel() { + if (s != Operators.cancelledSubscription()) { + Subscription s = this.s; + this.s = Operators.cancelledSubscription(); + ReferenceCountUtil.safeRelease(first); + + if (WIP.getAndIncrement(this) == 0) { + INNER.lazySet(this, null); + first = null; + } - if (a != null) { - a.onComplete(); + s.cancel(); } } @Override - public void subscribe(CoreSubscriber actual) { + @SuppressWarnings("unchecked") + public void subscribe(CoreSubscriber actual) { if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { - ACTUAL.lazySet(this, actual); + if (actual instanceof Fuseable.ConditionalSubscriber) { + INNER.lazySet(this, (Fuseable.ConditionalSubscriber) actual); + } + else { + INNER.lazySet(this, new ConditionalSubscriberAdapter<>(actual)); + } actual.onSubscribe(this); - } else { - actual.onError(new IllegalStateException("SwitchTransform allows only one Subscriber")); + } + else { + Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + s.request(1); + } + } + + @Override + public void onNext(T t) { + if (done) { + Operators.onNextDropped(t, currentContext()); + return; + } + + CoreSubscriber i = inner; + + if (i == null) { + try { + first = t; + Publisher result = + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); + result.subscribe(outer); + return; + } + catch (Throwable e) { + onError(Operators.onOperatorError(s, e, t, currentContext())); + ReferenceCountUtil.safeRelease(t); + return; + } + } + + i.onNext(t); + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + Operators.onNextDropped(t, currentContext()); + return false; + } + + Fuseable.ConditionalSubscriber i = inner; + + if (i == null) { + try { + first = t; + Publisher result = + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); + result.subscribe(outer); + return true; + } + catch (Throwable e) { + onError(Operators.onOperatorError(s, e, t, currentContext())); + ReferenceCountUtil.safeRelease(t); + return false; + } + } + + return i.tryOnNext(t); + } + + @Override + public void onError(Throwable t) { + if (done) { + Operators.onErrorDropped(t, currentContext()); + return; + } + + throwable = t; + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.error(outer, t); + } + } + + @Override + public void onComplete() { + if (done) { + return; + } + + done = true; + CoreSubscriber i = inner; + + if (i != null) { + if (first == null) { + drainRegular(); + } + } + else { + Operators.complete(outer); } } @Override public void request(long n) { - V f = first; + if (first != null && drainRegular() && n != Long.MAX_VALUE) { + if (--n > 0) { + s.request(n); + } + } + else { + s.request(n); + } + } + + boolean drainRegular() { + if (WIP.getAndIncrement(this) != 0) { + return false; + } - if (f != null && FIRST.compareAndSet(this, f, null)) { - actual.onNext(f); + T f = first; + int m = 1; + boolean sent = false; + Subscription s = this.s; + CoreSubscriber a = inner; + + for (;;) { + if (f != null) { + first = null; + ReferenceCountUtil.safeRelease(f); + + if (s == Operators.cancelledSubscription()) { + Operators.onNextDropped(f, a.currentContext()); + return true; + } + + a.onNext(f); + f = null; + sent = true; + } + + if (s == Operators.cancelledSubscription()) { + return sent; + } - long r = Operators.addCap(n, -1); - if (r > 0) { - parent.s.request(r); + if (done) { + Throwable t = throwable; + if (t != null) { + a.onError(t); + } + else { + a.onComplete(); + } + return sent; + } + + + m = WIP.addAndGet(this, -m); + + if (m == 0) { + return sent; } - } else { - parent.s.request(n); } } + } + + static final class ConditionalSubscriberAdapter implements Fuseable.ConditionalSubscriber { + + final CoreSubscriber delegate; + + ConditionalSubscriberAdapter(CoreSubscriber delegate) { + this.delegate = delegate; + } @Override - public void cancel() { - actual = null; - first = null; - parent.cancel(); + public Context currentContext() { + return delegate.currentContext(); } @Override - @Nullable - public Object scanUnsafe(Attr key) { - if (key == Attr.PARENT) return parent; - if (key == Attr.ACTUAL) return actual(); + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } - return null; + @Override + public void onNext(T t) { + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); } - public CoreSubscriber actual() { - return actual; + @Override + public void onComplete() { + delegate.onComplete(); + } + + @Override + public boolean tryOnNext(T t) { + delegate.onNext(t); + return true; } } } diff --git a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java index 9159641e3..09240e724 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java @@ -1,61 +1,327 @@ package io.rsocket.internal; import java.time.Duration; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; +import reactor.test.util.RaceTestUtils; +import reactor.util.context.Context; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; public class SwitchTransformFluxTest { @Test - public void backpressureTest() { + public void shouldBeAbleToCancelSubscription() throws InterruptedException { + for (int j = 0; j < 10; j++) { + ArrayList capturedElements = new ArrayList<>(); + ArrayList capturedCompletions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + TestPublisher publisher = TestPublisher.createCold(); + AtomicLong captureElement = new AtomicLong(0L); + AtomicBoolean captureCompletion = new AtomicBoolean(false); + AtomicLong requested = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(1); + Flux switchTransformed = publisher.flux() + .doOnRequest(requested::addAndGet) + .doOnCancel(latch::countDown) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux)); + + publisher.next(1L); + + switchTransformed.subscribe(captureElement::set, + __ -> { + }, + () -> captureCompletion.set(true), + s -> new Thread(() -> RaceTestUtils.race(publisher::complete, + () -> RaceTestUtils.race(s::cancel, + () -> s.request(1), + Schedulers.parallel()), + Schedulers.parallel())).start()); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(requested.get(), 1L); + capturedElements.add(captureElement.get()); + capturedCompletions.add(captureCompletion.get()); + } + + Assume.assumeThat(capturedElements, hasItem(equalTo(0L))); + Assume.assumeThat(capturedCompletions, hasItem(equalTo(false))); + } + } + + @Test + public void shouldRequestExpectedAmountOfElements() throws InterruptedException { TestPublisher publisher = TestPublisher.createCold(); + AtomicLong capture = new AtomicLong(); + AtomicLong requested = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(1); + Flux switchTransformed = publisher.flux() + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux)); + + publisher.next(1L); + + switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, + s -> { + for (int i = 0; i < 10000; i++) { + RaceTestUtils.race(() -> s.request(1), () -> s.request(1)); + } + RaceTestUtils.race(publisher::complete, publisher::complete); + }); + + latch.await(5, TimeUnit.SECONDS); - Flux switchTransformed = - publisher + Assert.assertEquals(capture.get(), 1L); + Assert.assertEquals(requested.get(), 20000L); + } + + @Test + public void shouldReturnCorrectContextOnEmptySource() { + Flux switchTransformed = Flux + .empty() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux + )) + .subscriberContext(Context.of("a", "c")) + .subscriberContext(Context.of("c", "d")); + + StepVerifier.create(switchTransformed, 0) + .expectSubscription() + .thenRequest(1) + .expectAccessibleContext() + .contains("a", "c") + .contains("c", "d") + .then() + .expectComplete() + .verify(); + } + + @Test + public void shouldNotFailOnIncorrectPublisherBehavior() { + TestPublisher publisher = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE); + Flux switchTransformed = publisher .flux() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux + .subscriberContext(Context.of("a", "b")) + )); + + StepVerifier.create(new Flux() { + @Override + public void subscribe(CoreSubscriber actual) { + switchTransformed.subscribe(actual); + publisher.next(1L); + } + }, 0) + .thenRequest(1) + .expectNext(1L) + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext(2L) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .expectError() + .verifyThenAssertThat() + .hasDroppedErrors(3) + .tookLessThan(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + } + +// @Test +// public void shouldNotFailOnIncorrePu + + @Test + public void shouldBeAbleToAccessUpstreamContext() { + TestPublisher publisher = TestPublisher.createCold(); + + Flux switchTransformed = publisher + .flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .subscriberContext(Context.of("a", "b")) + )) + .subscriberContext(Context.of("a", "c")) + .subscriberContext(Context.of("c", "d")); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext("2") - .then(publisher::complete) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .expectAccessibleContext() + .contains("a", "b") + .contains("c", "d") + .then() + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); } @Test - public void shouldErrorOnOverflowTest() { + public void shouldNotHangWhenOneElementUpstream() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = - publisher + Flux switchTransformed = publisher .flux() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .subscriberContext(Context.of("a", "b")) + )) + .subscriberContext(Context.of("a", "c")) + .subscriberContext(Context.of("c", "d")); + + publisher.next(1L); + publisher.complete(); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectNext("1") + .expectComplete() + .verify(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + } + + @Test + public void backpressureTest() { + TestPublisher publisher = TestPublisher.createCold(); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher.flux() + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .then(() -> publisher.next(2L)) - .expectError() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + publisher.assertWasRequested(); + publisher.assertNoRequestOverflow(); + + Assert.assertEquals(2L, requested.get()); + } + + @Test + public void backpressureConditionalTest() { + Flux publisher = Flux.range(0, 10000); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf))) + .filter(e -> false); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + Assert.assertEquals(2L, requested.get()); + } + + @Test + public void backpressureHiddenConditionalTest() { + Flux publisher = Flux.range(0, 10000); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .hide())) + .filter(e -> false); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + Assert.assertEquals(10001L, requested.get()); + } + + @Test + public void backpressureDrawbackOnConditionalInTransformTest() { + Flux publisher = Flux.range(0, 10000); + AtomicLong requested = new AtomicLong(); + + Flux switchTransformed = publisher + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf) + .filter(e -> false))); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); + + Assert.assertEquals(10001L, requested.get()); + } + + @Test + public void shouldErrorOnOverflowTest() { + TestPublisher publisher = TestPublisher.createCold(); + + Flux switchTransformed = publisher.flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); + + publisher.next(1L); + + StepVerifier.create(switchTransformed, 0) + .thenRequest(1) + .expectNext("1") + .then(() -> publisher.next(2L)) + .expectError() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -63,44 +329,44 @@ public void shouldErrorOnOverflowTest() { @Test public void shouldPropagateonCompleteCorrectly() { - Flux switchTransformed = - Flux.empty() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); - - StepVerifier.create(switchTransformed).expectComplete().verify(Duration.ofSeconds(10)); + Flux switchTransformed = Flux.empty() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); + + StepVerifier.create(switchTransformed) + .expectComplete() + .verify(Duration.ofSeconds(10)); } @Test public void shouldPropagateErrorCorrectly() { - Flux switchTransformed = - Flux.error(new RuntimeException("hello")) - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + Flux switchTransformed = Flux.error(new RuntimeException("hello")) + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); StepVerifier.create(switchTransformed) - .expectErrorMessage("hello") - .verify(Duration.ofSeconds(10)); + .expectErrorMessage("hello") + .verify(Duration.ofSeconds(10)); } @Test public void shouldBeAbleToBeCancelledProperly() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = - publisher - .flux() - .transform( - flux -> - new SwitchTransformFlux<>( - flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + Flux switchTransformed = publisher.flux() + .transform(flux -> new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map( + String::valueOf))); - publisher.emit(1, 2, 3, 4, 5); + publisher.next(1); - StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10)); + StepVerifier.create(switchTransformed, 0) + .thenCancel() + .verify(Duration.ofSeconds(10)); publisher.assertCancelled(); publisher.assertWasRequested(); From 48459fa075f6efd44eed0d010d54519ba7f860cc Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 29 Oct 2018 23:48:16 +0200 Subject: [PATCH 2/2] fix for switchTransform Signed-off-by: Oleh Dokuka --- .../rsocket/internal/SwitchTransformFlux.java | 122 +++--- .../internal/SwitchTransformFluxTest.java | 369 ++++++++++-------- 2 files changed, 256 insertions(+), 235 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java index 6a36cf090..b4710e541 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java @@ -16,12 +16,11 @@ package io.rsocket.internal; +import io.netty.util.ReferenceCountUtil; import java.util.Objects; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiFunction; - -import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -38,7 +37,7 @@ public final class SwitchTransformFlux extends Flux { final BiFunction, Publisher> transformer; public SwitchTransformFlux( - Publisher source, BiFunction, Publisher> transformer) { + Publisher source, BiFunction, Publisher> transformer) { this.source = Objects.requireNonNull(source, "source"); this.transformer = Objects.requireNonNull(transformer, "transformer"); } @@ -52,42 +51,48 @@ public int getPrefetch() { @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber actual) { if (actual instanceof Fuseable.ConditionalSubscriber) { - source.subscribe(new SwitchTransformConditionalOperator<>((Fuseable.ConditionalSubscriber) actual, transformer)); + source.subscribe( + new SwitchTransformConditionalOperator<>( + (Fuseable.ConditionalSubscriber) actual, transformer)); return; } source.subscribe(new SwitchTransformOperator<>(actual, transformer)); } static final class SwitchTransformOperator extends Flux - implements CoreSubscriber, Subscription, Scannable { + implements CoreSubscriber, Subscription, Scannable { final CoreSubscriber outer; final BiFunction, Publisher> transformer; Subscription s; - Throwable throwable; + Throwable throwable; volatile boolean done; - volatile T first; + volatile T first; volatile CoreSubscriber inner; + @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater INNER = - AtomicReferenceFieldUpdater.newUpdater(SwitchTransformOperator.class, CoreSubscriber.class, "inner"); + AtomicReferenceFieldUpdater.newUpdater( + SwitchTransformOperator.class, CoreSubscriber.class, "inner"); volatile int wip; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "wip"); volatile int once; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformOperator.class, "once"); SwitchTransformOperator( - CoreSubscriber outer, - BiFunction, Publisher> transformer) { + CoreSubscriber outer, + BiFunction, Publisher> transformer) { this.outer = outer; this.transformer = transformer; } @@ -133,9 +138,9 @@ public void subscribe(CoreSubscriber actual) { if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { INNER.lazySet(this, actual); actual.onSubscribe(this); - } - else { - Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } else { + Operators.error( + actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); } } @@ -160,12 +165,11 @@ public void onNext(T t) { try { first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, this), "The transformer returned a null value"); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); result.subscribe(outer); return; - } - catch (Throwable e) { + } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return; @@ -190,8 +194,7 @@ public void onError(Throwable t) { if (first == null) { drainRegular(); } - } - else { + } else { Operators.error(outer, t); } } @@ -209,8 +212,7 @@ public void onComplete() { if (first == null) { drainRegular(); } - } - else { + } else { Operators.complete(outer); } } @@ -222,8 +224,7 @@ public void request(long n) { if (n > 0) { s.request(n); } - } - else { + } else { s.request(n); } } @@ -239,7 +240,7 @@ boolean drainRegular() { Subscription s = this.s; CoreSubscriber a = inner; - for (;;) { + for (; ; ) { if (f != null) { first = null; ReferenceCountUtil.safeRelease(f); @@ -262,14 +263,12 @@ boolean drainRegular() { Throwable t = throwable; if (t != null) { a.onError(t); - } - else { + } else { a.onComplete(); } return sent; } - m = WIP.addAndGet(this, -m); if (m == 0) { @@ -279,37 +278,44 @@ boolean drainRegular() { } } - static final class SwitchTransformConditionalOperator extends Flux - implements Fuseable.ConditionalSubscriber, Subscription, Scannable { + implements Fuseable.ConditionalSubscriber, Subscription, Scannable { final Fuseable.ConditionalSubscriber outer; final BiFunction, Publisher> transformer; Subscription s; - Throwable throwable; + Throwable throwable; volatile boolean done; - volatile T first; + volatile T first; volatile Fuseable.ConditionalSubscriber inner; + @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater INNER = - AtomicReferenceFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, Fuseable.ConditionalSubscriber.class, "inner"); + static final AtomicReferenceFieldUpdater< + SwitchTransformConditionalOperator, Fuseable.ConditionalSubscriber> + INNER = + AtomicReferenceFieldUpdater.newUpdater( + SwitchTransformConditionalOperator.class, + Fuseable.ConditionalSubscriber.class, + "inner"); volatile int wip; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "wip"); volatile int once; + @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once"); + AtomicIntegerFieldUpdater.newUpdater(SwitchTransformConditionalOperator.class, "once"); SwitchTransformConditionalOperator( - Fuseable.ConditionalSubscriber outer, - BiFunction, Publisher> transformer) { + Fuseable.ConditionalSubscriber outer, + BiFunction, Publisher> transformer) { this.outer = outer; this.transformer = transformer; } @@ -356,14 +362,13 @@ public void subscribe(CoreSubscriber actual) { if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { if (actual instanceof Fuseable.ConditionalSubscriber) { INNER.lazySet(this, (Fuseable.ConditionalSubscriber) actual); - } - else { + } else { INNER.lazySet(this, new ConditionalSubscriberAdapter<>(actual)); } actual.onSubscribe(this); - } - else { - Operators.error(actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); + } else { + Operators.error( + actual, new IllegalStateException("SwitchTransform allows only one Subscriber")); } } @@ -388,12 +393,11 @@ public void onNext(T t) { try { first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, this), "The transformer returned a null value"); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); result.subscribe(outer); return; - } - catch (Throwable e) { + } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return; @@ -416,12 +420,11 @@ public boolean tryOnNext(T t) { try { first = t; Publisher result = - Objects.requireNonNull( - transformer.apply(t, this), "The transformer returned a null value"); + Objects.requireNonNull( + transformer.apply(t, this), "The transformer returned a null value"); result.subscribe(outer); return true; - } - catch (Throwable e) { + } catch (Throwable e) { onError(Operators.onOperatorError(s, e, t, currentContext())); ReferenceCountUtil.safeRelease(t); return false; @@ -446,8 +449,7 @@ public void onError(Throwable t) { if (first == null) { drainRegular(); } - } - else { + } else { Operators.error(outer, t); } } @@ -465,8 +467,7 @@ public void onComplete() { if (first == null) { drainRegular(); } - } - else { + } else { Operators.complete(outer); } } @@ -477,8 +478,7 @@ public void request(long n) { if (--n > 0) { s.request(n); } - } - else { + } else { s.request(n); } } @@ -494,7 +494,7 @@ boolean drainRegular() { Subscription s = this.s; CoreSubscriber a = inner; - for (;;) { + for (; ; ) { if (f != null) { first = null; ReferenceCountUtil.safeRelease(f); @@ -517,14 +517,12 @@ boolean drainRegular() { Throwable t = throwable; if (t != null) { a.onError(t); - } - else { + } else { a.onComplete(); } return sent; } - m = WIP.addAndGet(this, -m); if (m == 0) { diff --git a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java index 09240e724..e4b897409 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/SwitchTransformFluxTest.java @@ -1,12 +1,14 @@ package io.rsocket.internal; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; + import java.time.Duration; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -18,9 +20,6 @@ import reactor.test.util.RaceTestUtils; import reactor.util.context.Context; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; - public class SwitchTransformFluxTest { @Test @@ -34,24 +33,30 @@ public void shouldBeAbleToCancelSubscription() throws InterruptedException { AtomicBoolean captureCompletion = new AtomicBoolean(false); AtomicLong requested = new AtomicLong(); CountDownLatch latch = new CountDownLatch(1); - Flux switchTransformed = publisher.flux() - .doOnRequest(requested::addAndGet) - .doOnCancel(latch::countDown) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux)); + Flux switchTransformed = + publisher + .flux() + .doOnRequest(requested::addAndGet) + .doOnCancel(latch::countDown) + .transform( + flux -> new SwitchTransformFlux<>(flux, (first, innerFlux) -> innerFlux)); publisher.next(1L); - switchTransformed.subscribe(captureElement::set, - __ -> { - }, - () -> captureCompletion.set(true), - s -> new Thread(() -> RaceTestUtils.race(publisher::complete, - () -> RaceTestUtils.race(s::cancel, - () -> s.request(1), - Schedulers.parallel()), - Schedulers.parallel())).start()); + switchTransformed.subscribe( + captureElement::set, + __ -> {}, + () -> captureCompletion.set(true), + s -> + new Thread( + () -> + RaceTestUtils.race( + publisher::complete, + () -> + RaceTestUtils.race( + s::cancel, () -> s.request(1), Schedulers.parallel()), + Schedulers.parallel())) + .start()); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertEquals(requested.get(), 1L); @@ -70,21 +75,24 @@ public void shouldRequestExpectedAmountOfElements() throws InterruptedException AtomicLong capture = new AtomicLong(); AtomicLong requested = new AtomicLong(); CountDownLatch latch = new CountDownLatch(1); - Flux switchTransformed = publisher.flux() - .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux)); + Flux switchTransformed = + publisher + .flux() + .doOnRequest(requested::addAndGet) + .transform(flux -> new SwitchTransformFlux<>(flux, (first, innerFlux) -> innerFlux)); publisher.next(1L); - switchTransformed.subscribe(capture::set, __ -> {}, latch::countDown, - s -> { - for (int i = 0; i < 10000; i++) { - RaceTestUtils.race(() -> s.request(1), () -> s.request(1)); - } - RaceTestUtils.race(publisher::complete, publisher::complete); - }); + switchTransformed.subscribe( + capture::set, + __ -> {}, + latch::countDown, + s -> { + for (int i = 0; i < 10000; i++) { + RaceTestUtils.race(() -> s.request(1), () -> s.request(1)); + } + RaceTestUtils.race(publisher::complete, publisher::complete); + }); latch.await(5, TimeUnit.SECONDS); @@ -94,94 +102,97 @@ public void shouldRequestExpectedAmountOfElements() throws InterruptedException @Test public void shouldReturnCorrectContextOnEmptySource() { - Flux switchTransformed = Flux - .empty() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux - )) + Flux switchTransformed = + Flux.empty() + .transform(flux -> new SwitchTransformFlux<>(flux, (first, innerFlux) -> innerFlux)) .subscriberContext(Context.of("a", "c")) .subscriberContext(Context.of("c", "d")); StepVerifier.create(switchTransformed, 0) - .expectSubscription() - .thenRequest(1) - .expectAccessibleContext() - .contains("a", "c") - .contains("c", "d") - .then() - .expectComplete() - .verify(); + .expectSubscription() + .thenRequest(1) + .expectAccessibleContext() + .contains("a", "c") + .contains("c", "d") + .then() + .expectComplete() + .verify(); } @Test public void shouldNotFailOnIncorrectPublisherBehavior() { - TestPublisher publisher = TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE); - Flux switchTransformed = publisher + TestPublisher publisher = + TestPublisher.createNoncompliant(TestPublisher.Violation.CLEANUP_ON_TERMINATE); + Flux switchTransformed = + publisher .flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux - .subscriberContext(Context.of("a", "b")) - )); - - StepVerifier.create(new Flux() { - @Override - public void subscribe(CoreSubscriber actual) { - switchTransformed.subscribe(actual); - publisher.next(1L); - } - }, 0) - .thenRequest(1) - .expectNext(1L) - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext(2L) - .then(() -> publisher.error(new RuntimeException())) - .then(() -> publisher.error(new RuntimeException())) - .then(() -> publisher.error(new RuntimeException())) - .then(() -> publisher.error(new RuntimeException())) - .expectError() - .verifyThenAssertThat() - .hasDroppedErrors(3) - .tookLessThan(Duration.ofSeconds(10)); + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.subscriberContext(Context.of("a", "b")))); + + StepVerifier.create( + new Flux() { + @Override + public void subscribe(CoreSubscriber actual) { + switchTransformed.subscribe(actual); + publisher.next(1L); + } + }, + 0) + .thenRequest(1) + .expectNext(1L) + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext(2L) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .then(() -> publisher.error(new RuntimeException())) + .expectError() + .verifyThenAssertThat() + .hasDroppedErrors(3) + .tookLessThan(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); } -// @Test -// public void shouldNotFailOnIncorrePu + // @Test + // public void shouldNotFailOnIncorrePu @Test public void shouldBeAbleToAccessUpstreamContext() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .subscriberContext(Context.of("a", "b")) - )) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> + innerFlux.map(String::valueOf).subscriberContext(Context.of("a", "b")))) .subscriberContext(Context.of("a", "c")) .subscriberContext(Context.of("c", "d")); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext("2") - .expectAccessibleContext() - .contains("a", "b") - .contains("c", "d") - .then() - .then(publisher::complete) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .expectAccessibleContext() + .contains("a", "b") + .contains("c", "d") + .then() + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -191,13 +202,15 @@ public void shouldBeAbleToAccessUpstreamContext() { public void shouldNotHangWhenOneElementUpstream() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .subscriberContext(Context.of("a", "b")) - )) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> + innerFlux.map(String::valueOf).subscriberContext(Context.of("a", "b")))) .subscriberContext(Context.of("a", "c")) .subscriberContext(Context.of("c", "d")); @@ -205,10 +218,10 @@ public void shouldNotHangWhenOneElementUpstream() { publisher.complete(); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -219,23 +232,26 @@ public void backpressureTest() { TestPublisher publisher = TestPublisher.createCold(); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher.flux() - .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf))); + Flux switchTransformed = + publisher + .flux() + .doOnRequest(requested::addAndGet) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .thenRequest(1) - .then(() -> publisher.next(2L)) - .expectNext("2") - .then(publisher::complete) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .thenRequest(1) + .then(() -> publisher.next(2L)) + .expectNext("2") + .then(publisher::complete) + .expectComplete() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -248,17 +264,19 @@ public void backpressureConditionalTest() { Flux publisher = Flux.range(0, 10000); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf))) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))) .filter(e -> false); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); Assert.assertEquals(2L, requested.get()); } @@ -268,18 +286,19 @@ public void backpressureHiddenConditionalTest() { Flux publisher = Flux.range(0, 10000); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .hide())) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf).hide())) .filter(e -> false); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); Assert.assertEquals(10001L, requested.get()); } @@ -289,17 +308,19 @@ public void backpressureDrawbackOnConditionalInTransformTest() { Flux publisher = Flux.range(0, 10000); AtomicLong requested = new AtomicLong(); - Flux switchTransformed = publisher + Flux switchTransformed = + publisher .doOnRequest(requested::addAndGet) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map(String::valueOf) - .filter(e -> false))); + .transform( + flux -> + new SwitchTransformFlux<>( + flux, + (first, innerFlux) -> innerFlux.map(String::valueOf).filter(e -> false))); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectComplete() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectComplete() + .verify(Duration.ofSeconds(10)); Assert.assertEquals(10001L, requested.get()); } @@ -308,20 +329,22 @@ public void backpressureDrawbackOnConditionalInTransformTest() { public void shouldErrorOnOverflowTest() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher.flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); + Flux switchTransformed = + publisher + .flux() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1L); StepVerifier.create(switchTransformed, 0) - .thenRequest(1) - .expectNext("1") - .then(() -> publisher.next(2L)) - .expectError() - .verify(Duration.ofSeconds(10)); + .thenRequest(1) + .expectNext("1") + .then(() -> publisher.next(2L)) + .expectError() + .verify(Duration.ofSeconds(10)); publisher.assertWasRequested(); publisher.assertNoRequestOverflow(); @@ -329,44 +352,44 @@ public void shouldErrorOnOverflowTest() { @Test public void shouldPropagateonCompleteCorrectly() { - Flux switchTransformed = Flux.empty() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); - - StepVerifier.create(switchTransformed) - .expectComplete() - .verify(Duration.ofSeconds(10)); + Flux switchTransformed = + Flux.empty() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); + + StepVerifier.create(switchTransformed).expectComplete().verify(Duration.ofSeconds(10)); } @Test public void shouldPropagateErrorCorrectly() { - Flux switchTransformed = Flux.error(new RuntimeException("hello")) - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); + Flux switchTransformed = + Flux.error(new RuntimeException("hello")) + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); StepVerifier.create(switchTransformed) - .expectErrorMessage("hello") - .verify(Duration.ofSeconds(10)); + .expectErrorMessage("hello") + .verify(Duration.ofSeconds(10)); } @Test public void shouldBeAbleToBeCancelledProperly() { TestPublisher publisher = TestPublisher.createCold(); - Flux switchTransformed = publisher.flux() - .transform(flux -> new SwitchTransformFlux<>( - flux, - (first, innerFlux) -> innerFlux.map( - String::valueOf))); + Flux switchTransformed = + publisher + .flux() + .transform( + flux -> + new SwitchTransformFlux<>( + flux, (first, innerFlux) -> innerFlux.map(String::valueOf))); publisher.next(1); - StepVerifier.create(switchTransformed, 0) - .thenCancel() - .verify(Duration.ofSeconds(10)); + StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10)); publisher.assertCancelled(); publisher.assertWasRequested();