Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent behavior of onErrorContinue() if a Publisher emits a single element which then produces an exception in a subsequent flatMap() operator #1684

Closed
dkoval opened this issue May 2, 2019 · 3 comments
Labels
area/onErrorContinue This belongs to the onErrorContinue theme type/enhancement A general enhancement

Comments

@dkoval
Copy link

dkoval commented May 2, 2019

Expected behavior

According to the documentation, Flux.flatMap() operator supports resuming on errors in the mapper function. However, it does not seem to be the case if a Flux emits a single element which then produces an exception in a subsequent flatMap(). The same holds true for Mono.flatMap() operator, although the javadocs do not tell about Error Mode Support.

Actual behavior

onErrorContinue() does not drop an incriminating element from a sequence if a Publisher emits a single element which then produces an exception in a subsequent flatMap() operator.

Steps to reproduce

Let's consider a few test cases written in Kotlin

Scenario 1 with Flux

@Test
fun `should result in an empty Flux, but fails`() {
    val source = Flux.just(42)
            .flatMap<String> { throw IllegalStateException("Marvin: I think you ought to know I'm feeling very depressed") }
            .doOnNext { logger.info("$it received") }
            .onErrorContinue(
                    { ex: Throwable -> ex is IllegalStateException },
                    { ex, elem -> logger.warn("Element $elem skipped, reason: ${ex.message}") })

    source.test()
            .verifyComplete() // java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onError(java.lang.IllegalStateException: Marvin: I think you ought to know I'm feeling very depressed))
}

Scenario 2 with Mono

@Test
fun `should result in an empty Mono, but fails`() {
    val source = Mono.just(42)
            .flatMap<String> { throw IllegalStateException("Marvin: I think you ought to know I'm feeling very depressed") }
            .doOnNext { logger.info("$it received") }
            .onErrorContinue(
                    { ex: Throwable -> ex is IllegalStateException },
                    { ex, elem -> logger.warn("Element $elem skipped, reason: ${ex.message}") })

    source.test()
            .verifyComplete() // java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onError(java.lang.IllegalStateException: Marvin: I think you ought to know I'm feeling very depressed))
}

Interestingly though, both tests turn green as soon as flatMap {...} is replaced with a map {...} counterpart.

Reactor Core version

3.2.8.RELEASE

JVM version (e.g. java -version)

1.8

@simonbasle simonbasle added the status/need-investigation This needs more in-depth investigation label May 3, 2019
@simonbasle simonbasle added this to the 3.2.x Maintenance Backlog milestone May 3, 2019
@gindex
Copy link
Contributor

gindex commented Jul 30, 2019

Currently, FluxFlatMap checks if the source is just a supplier and executes scalar map in such cases. The thrown exceptions are mapped in Operators.onOperatorError() using Hooks.KEY_ON_OPERATOR_ERROR key to read the actual error mapper from Context.

However, the specified errorConsumer is stored in the context using OnNextFailureStrategy.KEY_ON_NEXT_ERROR_STRATEGY key and in the case of FluxArray
the error handling is executed in Operators.onNextError(). Then the error is consumed by the errorConsumer and the publisher is completed. Only in the case if the errorConsumer does not apply, it falls back to Operators.onOperatorError().

Because FluxJust and MonoJust are simple suppliers, the error consumption does not work for them. @simonbasle it seems to be a bug.

FluxMap is not affected by this, because it does not implement any special cases for scalars.

@gindex
Copy link
Contributor

gindex commented Jul 30, 2019

It can be fixed by replacing error handling Operators.error(s, Operators.onOperatorError(..)) in trySubscribeScalarMap() with

Throwable e_ = Operators.onNextError(... Operators.emptySubscription());
if(e_ != null){
	Operators.error(s, e);
}
else {
	Operators.complete(s);
}

It would retain the semantic, that if the error is consumed the subscriber gets onComplete and otherwise onError.

@simonbasle
Copy link
Member

simonbasle commented Aug 6, 2019

onErrorContinue was never implemented on Mono-specific code, because there would be nothing to recover from. A simple onErrorReturn/onErrorResume is equivalent there.

For the FluxFlatMap we could implement this. For the aforementioned reason we would need to limit that behavior to actual usage by a Flux, not a Mono. (at least in 3.2.x)

In a separate 3.3 issue, we might want to consider implementing onErrorContinue logic on Mono operators. But that would be a lot of work, especially this late in the 3.3.0 pre-release cycle, so we'd need very good arguments. Keep in mind onErrorContinue is a deviation from the specification, so it's not entirely a guaranteed free lunch.

@simonbasle simonbasle added for/team-attention This issue needs team attention or action status/need-decision This needs a decision from the team type/enhancement A general enhancement and removed status/need-investigation This needs more in-depth investigation labels Aug 6, 2019
simonbasle added a commit that referenced this issue Aug 6, 2019
Since the code for a scalar (aka Callable) source is shared among a few
operators other than Flux#flatMap (mergeSequential, switchMap and Mono
flatMap/flatMapMany), a new boolean is introduced as an opt-in to the
onErrorContinue support, to keep these operators consistent in how they
deal with scalar sources vs classic sources.
@simonbasle simonbasle added area/onErrorContinue This belongs to the onErrorContinue theme and removed for/team-attention This issue needs team attention or action status/need-decision This needs a decision from the team labels Aug 13, 2019
simonbasle added a commit that referenced this issue Aug 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/onErrorContinue This belongs to the onErrorContinue theme type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants