Skip to content

Commit

Permalink
Merge #3397 into 3.5.5
Browse files Browse the repository at this point in the history
Signed-off-by: OlegDokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka committed Apr 10, 2023
2 parents 4476aef + aa051cd commit 7c5cc07
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -311,15 +311,22 @@ void innerNext(MergeSequentialInner<R> inner, R value) {
}

void innerError(MergeSequentialInner<R> inner, Throwable e) {
if (Exceptions.addThrowable(ERROR, this, e)) {
inner.setDone();
if (errorMode != ErrorMode.END) {
s.cancel();
inner.setDone();

e = Operators.onNextInnerError(e, currentContext(), s);
if (e != null) {
if (Exceptions.addThrowable(ERROR, this, e)) {
if (errorMode != ErrorMode.END) {
s.cancel();
}
drain();
}
else {
Operators.onErrorDropped(e, actual.currentContext());
}
drain();
}
else {
Operators.onErrorDropped(e, actual.currentContext());
drain();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -375,6 +376,24 @@ public void testMainError() {
ts.assertNotComplete();
}

@Test
public void testInnerErrorWithDroppedError() {
final AtomicInteger count = new AtomicInteger();

Flux.range(0, 3)
.flatMapSequential(i -> Mono.defer(() -> {
throw new RuntimeException("forced failure");
}))
.onErrorContinue((t, v) -> {
if (t.getMessage().contains("forced failure")) {
count.incrementAndGet();
}
})
.subscribe();

Assertions.assertEquals(3, count.get());
}

@SuppressWarnings("unchecked")
@Test
public void testInnerError() {
Expand Down

0 comments on commit 7c5cc07

Please sign in to comment.