From aa051cdf20a976f049450b40e301d1bae1d755e1 Mon Sep 17 00:00:00 2001 From: eralmansouri Date: Mon, 10 Apr 2023 06:23:50 -0700 Subject: [PATCH] makes error handling for fmSequential consistent with flatMap (#3397) fix #3396 --- .../core/publisher/FluxMergeSequential.java | 21 ++++++++++++------- .../publisher/FluxMergeSequentialTest.java | 21 ++++++++++++++++++- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java b/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java index eb27398bd6..da92db9417 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -311,15 +311,22 @@ void innerNext(MergeSequentialInner inner, R value) { } void innerError(MergeSequentialInner inner, Throwable e) { - if (Exceptions.addThrowable(ERROR, this, e)) { - inner.setDone(); - if (errorMode != ErrorMode.END) { - s.cancel(); + inner.setDone(); + + e = Operators.onNextInnerError(e, currentContext(), s); + if (e != null) { + if (Exceptions.addThrowable(ERROR, this, e)) { + if (errorMode != ErrorMode.END) { + s.cancel(); + } + drain(); + } + else { + Operators.onErrorDropped(e, actual.currentContext()); } - drain(); } else { - Operators.onErrorDropped(e, actual.currentContext()); + drain(); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxMergeSequentialTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxMergeSequentialTest.java index 97928c07cb..9c5312dd4b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxMergeSequentialTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxMergeSequentialTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import java.util.function.Function; import java.util.function.Supplier; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscriber; @@ -375,6 +376,24 @@ public void testMainError() { ts.assertNotComplete(); } + @Test + public void testInnerErrorWithDroppedError() { + final AtomicInteger count = new AtomicInteger(); + + Flux.range(0, 3) + .flatMapSequential(i -> Mono.defer(() -> { + throw new RuntimeException("forced failure"); + })) + .onErrorContinue((t, v) -> { + if (t.getMessage().contains("forced failure")) { + count.incrementAndGet(); + } + }) + .subscribe(); + + Assertions.assertEquals(3, count.get()); + } + @SuppressWarnings("unchecked") @Test public void testInnerError() {