From d31f1bc36210485b61de21fb2684a7b3d8801efc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Mon, 10 Apr 2023 16:41:50 +0200 Subject: [PATCH] ensures SignalListener#addToContext exceptions are handled (#3415) --- .../java/reactor/core/publisher/FluxTap.java | 34 ++-- .../core/publisher/FluxTapFuseable.java | 29 ++- .../FluxTapRestoringThreadLocals.java | 7 +- .../java/reactor/core/publisher/MonoTap.java | 18 +- .../core/publisher/MonoTapFuseable.java | 19 +- .../reactor/core/publisher/FluxTapTest.java | 186 +++++++++++++++++- 6 files changed, 261 insertions(+), 32 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxTap.java b/reactor-core/src/main/java/reactor/core/publisher/FluxTap.java index 01ac70bc85..1da50b0cf5 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxTap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxTap.java @@ -69,11 +69,24 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act return null; } + // Invoked AFTER doFirst + Context ctx; + try { + ctx = signalListener.addToContext(actual.currentContext()); + } + catch (Throwable e) { + IllegalStateException listenerError = new IllegalStateException( + "Unable to augment tap Context at subscription via addToContext", e); + signalListener.handleListenerError(listenerError); + Operators.error(actual, listenerError); + return null; + } + if (actual instanceof ConditionalSubscriber) { //noinspection unchecked - return new TapConditionalSubscriber<>((ConditionalSubscriber) actual, signalListener); + return new TapConditionalSubscriber<>((ConditionalSubscriber) actual, signalListener, ctx); } - return new TapSubscriber<>(actual, signalListener); + return new TapSubscriber<>(actual, signalListener, ctx); } @Nullable @@ -94,18 +107,10 @@ static class TapSubscriber implements InnerOperator { boolean done; Subscription s; - TapSubscriber(CoreSubscriber actual, SignalListener signalListener) { + TapSubscriber(CoreSubscriber actual, + SignalListener signalListener, Context ctx) { this.actual = actual; this.listener = signalListener; - //note that since we're in the subscriber, this is technically invoked AFTER doFirst - Context ctx; - try { - ctx = signalListener.addToContext(actual.currentContext()); - } - catch (Throwable e) { - signalListener.handleListenerError(new IllegalStateException("Unable to augment tap Context at construction via addToContext", e)); - ctx = actual.currentContext(); - } this.context = ctx; } @@ -330,8 +335,9 @@ static final class TapConditionalSubscriber extends TapSubscriber implemen final ConditionalSubscriber actualConditional; - public TapConditionalSubscriber(ConditionalSubscriber actual, SignalListener signalListener) { - super(actual, signalListener); + public TapConditionalSubscriber(ConditionalSubscriber actual, + SignalListener signalListener, Context ctx) { + super(actual, signalListener, ctx); this.actualConditional = actual; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxTapFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxTapFuseable.java index 2d1aa17376..efe3821a97 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxTapFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxTapFuseable.java @@ -24,6 +24,7 @@ import reactor.core.observability.SignalListener; import reactor.core.observability.SignalListenerFactory; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; /** * A {@link reactor.core.Fuseable} generic per-Subscription side effect {@link Flux} that notifies a @@ -69,11 +70,25 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act return null; } + // Invoked AFTER doFirst + Context ctx; + try { + ctx = signalListener.addToContext(actual.currentContext()); + } + catch (Throwable e) { + IllegalStateException listenerError = new IllegalStateException( + "Unable to augment tap Context at subscription via addToContext", e); + signalListener.handleListenerError(listenerError); + Operators.error(actual, listenerError); + return null; + } + if (actual instanceof ConditionalSubscriber) { //noinspection unchecked - return new TapConditionalFuseableSubscriber<>((ConditionalSubscriber) actual, signalListener); + return new TapConditionalFuseableSubscriber<>( + (ConditionalSubscriber) actual, signalListener, ctx); } - return new TapFuseableSubscriber<>(actual, signalListener); + return new TapFuseableSubscriber<>(actual, signalListener, ctx); } @Nullable @@ -90,8 +105,9 @@ static class TapFuseableSubscriber extends FluxTap.TapSubscriber implement int mode; QueueSubscription qs; - TapFuseableSubscriber(CoreSubscriber actual, SignalListener signalListener) { - super(actual, signalListener); + TapFuseableSubscriber(CoreSubscriber actual, + SignalListener signalListener, Context ctx) { + super(actual, signalListener, ctx); } /** @@ -268,8 +284,9 @@ static final class TapConditionalFuseableSubscriber extends TapFuseableSubscr final ConditionalSubscriber actualConditional; - public TapConditionalFuseableSubscriber(ConditionalSubscriber actual, SignalListener signalListener) { - super(actual, signalListener); + public TapConditionalFuseableSubscriber(ConditionalSubscriber actual, + SignalListener signalListener, Context ctx) { + super(actual, signalListener, ctx); this.actualConditional = actual; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxTapRestoringThreadLocals.java b/reactor-core/src/main/java/reactor/core/publisher/FluxTapRestoringThreadLocals.java index 24c34f4596..21d90be019 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxTapRestoringThreadLocals.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxTapRestoringThreadLocals.java @@ -72,8 +72,11 @@ public void subscribe(CoreSubscriber actual) { alteredContext = signalListener.addToContext(actual.currentContext()); } catch (Throwable e) { - signalListener.handleListenerError(new IllegalStateException("Unable to augment tap Context at construction via addToContext", e)); - alteredContext = actual.currentContext(); + IllegalStateException listenerError = new IllegalStateException( + "Unable to augment tap Context at subscription via addToContext", e); + signalListener.handleListenerError(listenerError); + Operators.error(actual, listenerError); + return; } try (ContextSnapshot.Scope ignored = ContextPropagation.setThreadLocals(alteredContext)) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoTap.java b/reactor-core/src/main/java/reactor/core/publisher/MonoTap.java index c5ebb81978..ba72852296 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoTap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoTap.java @@ -22,6 +22,7 @@ import reactor.core.observability.SignalListenerFactory; import reactor.core.publisher.FluxTap.TapSubscriber; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; /** * A generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events. @@ -66,11 +67,24 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act return null; } + // Invoked AFTER doFirst + Context ctx; + try { + ctx = signalListener.addToContext(actual.currentContext()); + } + catch (Throwable e) { + IllegalStateException listenerError = new IllegalStateException( + "Unable to augment tap Context at subscription via addToContext", e); + signalListener.handleListenerError(listenerError); + Operators.error(actual, listenerError); + return null; + } + if (actual instanceof Fuseable.ConditionalSubscriber) { //noinspection unchecked - return new FluxTap.TapConditionalSubscriber<>((Fuseable.ConditionalSubscriber) actual, signalListener); + return new FluxTap.TapConditionalSubscriber<>((Fuseable.ConditionalSubscriber) actual, signalListener, ctx); } - return new TapSubscriber<>(actual, signalListener); + return new TapSubscriber<>(actual, signalListener, ctx); } @Nullable diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoTapFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoTapFuseable.java index 59c598f595..ce83cb4f25 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoTapFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoTapFuseable.java @@ -21,6 +21,7 @@ import reactor.core.observability.SignalListener; import reactor.core.observability.SignalListenerFactory; import reactor.util.annotation.Nullable; +import reactor.util.context.Context; /** * A {@link Fuseable} generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events. @@ -65,11 +66,25 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act return null; } + // Invoked AFTER doFirst + Context ctx; + try { + ctx = signalListener.addToContext(actual.currentContext()); + } + catch (Throwable e) { + IllegalStateException listenerError = new IllegalStateException( + "Unable to augment tap Context at subscription via addToContext", e); + signalListener.handleListenerError(listenerError); + Operators.error(actual, listenerError); + return null; + } + if (actual instanceof ConditionalSubscriber) { //noinspection unchecked - return new FluxTapFuseable.TapConditionalFuseableSubscriber<>((ConditionalSubscriber) actual, signalListener); + return new FluxTapFuseable.TapConditionalFuseableSubscriber<>( + (ConditionalSubscriber) actual, signalListener, ctx); } - return new FluxTapFuseable.TapFuseableSubscriber<>(actual, signalListener); + return new FluxTapFuseable.TapFuseableSubscriber<>(actual, signalListener, ctx); } @Nullable diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/FluxTapTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/FluxTapTest.java index 6b7f1fae37..034c5a6f75 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/FluxTapTest.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/FluxTapTest.java @@ -558,6 +558,57 @@ public SignalListener createListener(Publisher sourc .hasMessage("expected"); } + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + void throwingAlterContext(boolean automatic) { + if (automatic) { + Hooks.enableAutomaticContextPropagation(); + } + TestSubscriber testSubscriber = TestSubscriber.create(); + TestSignalListener testSignalListener = + new TestSignalListener() { + @Override + public Context addToContext(Context originalContext) { + throw new IllegalStateException("expected"); + } + }; + + if (automatic) { + FluxTapRestoringThreadLocals test = + new FluxTapRestoringThreadLocals<>(Flux.just(1), factoryOf(testSignalListener)); + assertThatCode(() -> test.subscribe(testSubscriber)) + .doesNotThrowAnyException(); + } else { + FluxTap test = new FluxTap<>(Flux.just(1), factoryOf(testSignalListener)); + assertThatCode(() -> test.subscribeOrReturn(testSubscriber)) + .doesNotThrowAnyException(); + } + + assertThat(testSubscriber.expectTerminalError()) + .as("downstream error") + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + + assertThat(testSignalListener.listenerErrors) + .as("listenerErrors") + .satisfies(errors -> { + assertThat(errors.size()).isEqualTo(1); + assertThat(errors.stream().findFirst().get()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + }); + assertThat(testSignalListener.events) + .containsExactly("doFirst"); + } + @ParameterizedTestWithName @ValueSource(booleans = {true, false}) void doFirstListenerError(boolean automatic) { @@ -661,6 +712,47 @@ void doFirst() { ); } + @Test + void throwingAlterContext() { + TestSubscriber testSubscriber = TestSubscriber.create(); + TestSignalListener testSignalListener = + new TestSignalListener() { + @Override + public Context addToContext(Context originalContext) { + throw new IllegalStateException("expected"); + } + }; + + FluxTapFuseable test = new FluxTapFuseable<>( + Flux.just(1), factoryOf(testSignalListener)); + assertThatCode(() -> test.subscribeOrReturn(testSubscriber)) + .doesNotThrowAnyException(); + + assertThat(testSubscriber.expectTerminalError()) + .as("downstream error") + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + + assertThat(testSignalListener.listenerErrors) + .as("listenerErrors") + .satisfies(errors -> { + assertThat(errors.size()).isEqualTo(1); + assertThat(errors.stream().findFirst().get()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + }); + assertThat(testSignalListener.events) + .containsExactly("doFirst"); + } + @Test void doFirstListenerError() { Throwable listenerError = new IllegalStateException("expected from doFirst"); @@ -816,6 +908,47 @@ void doFirst() { ); } + @Test + void throwingAlterContext() { + TestSubscriber testSubscriber = TestSubscriber.create(); + TestSignalListener testSignalListener = + new TestSignalListener() { + @Override + public Context addToContext(Context originalContext) { + throw new IllegalStateException("expected"); + } + }; + + MonoTap test = new MonoTap<>( + Mono.just(1), factoryOf(testSignalListener)); + assertThatCode(() -> test.subscribeOrReturn(testSubscriber)) + .doesNotThrowAnyException(); + + assertThat(testSubscriber.expectTerminalError()) + .as("downstream error") + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + + assertThat(testSignalListener.listenerErrors) + .as("listenerErrors") + .satisfies(errors -> { + assertThat(errors.size()).isEqualTo(1); + assertThat(errors.stream().findFirst().get()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + }); + assertThat(testSignalListener.events) + .containsExactly("doFirst"); + } + @Test void doFirstListenerError() { Throwable listenerError = new IllegalStateException("expected from doFirst"); @@ -917,6 +1050,47 @@ void doFirst() { ); } + @Test + void throwingAlterContext() { + TestSubscriber testSubscriber = TestSubscriber.create(); + TestSignalListener testSignalListener = + new TestSignalListener() { + @Override + public Context addToContext(Context originalContext) { + throw new IllegalStateException("expected"); + } + }; + + MonoTapFuseable test = new MonoTapFuseable<>( + Mono.just(1), factoryOf(testSignalListener)); + assertThatCode(() -> test.subscribeOrReturn(testSubscriber)) + .doesNotThrowAnyException(); + + assertThat(testSubscriber.expectTerminalError()) + .as("downstream error") + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + + assertThat(testSignalListener.listenerErrors) + .as("listenerErrors") + .satisfies(errors -> { + assertThat(errors.size()).isEqualTo(1); + assertThat(errors.stream().findFirst().get()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Unable to augment tap Context at subscription via addToContext") + .extracting(Throwable::getCause) + .satisfies(t -> assertThat(t) + .isInstanceOf(IllegalStateException.class) + .hasMessage("expected")); + }); + assertThat(testSignalListener.events) + .containsExactly("doFirst"); + } + @Test void doFirstListenerError() { Throwable listenerError = new IllegalStateException("expected from doFirst"); @@ -1039,7 +1213,7 @@ void scanListenSubscriber() { Subscription subscription = Operators.emptySubscription(); FluxTap.TapSubscriber subscriber = new FluxTap.TapSubscriber<>( - actual, new TestSignalListener<>()); + actual, new TestSignalListener<>(), actual.currentContext()); subscriber.onSubscribe(subscription); @@ -1084,7 +1258,7 @@ void scanListenConditionalSubscriber() { Subscription subscription = Operators.emptySubscription(); FluxTap.TapConditionalSubscriber subscriber = new FluxTap.TapConditionalSubscriber<>( - actual, new TestSignalListener<>()); + actual, new TestSignalListener<>(), actual.currentContext()); subscriber.onSubscribe(subscription); @@ -1106,7 +1280,7 @@ void scanListenFuseableSubscriber() { Subscription subscription = Operators.emptySubscription(); FluxTapFuseable.TapFuseableSubscriber subscriber = new FluxTapFuseable.TapFuseableSubscriber<>( - actual, new TestSignalListener<>()); + actual, new TestSignalListener<>(), actual.currentContext()); subscriber.onSubscribe(subscription); @@ -1127,9 +1301,9 @@ void scanListenConditionalFuseableSubscriber() { ConditionalSubscriber actual = Operators.toConditionalSubscriber(Operators.drainSubscriber()); Subscription subscription = Operators.emptySubscription(); - FluxTapFuseable.TapConditionalFuseableSubscriber - subscriber = new FluxTapFuseable.TapConditionalFuseableSubscriber<>( - actual, new TestSignalListener<>()); + FluxTapFuseable.TapConditionalFuseableSubscriber subscriber = + new FluxTapFuseable.TapConditionalFuseableSubscriber<>( + actual, new TestSignalListener<>(), actual.currentContext()); subscriber.onSubscribe(subscription);