Skip to content

Commit

Permalink
ensures SignalListener#addToContext exceptions are handled (#3415)
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Apr 10, 2023
1 parent 766252c commit d31f1bc
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 32 deletions.
34 changes: 20 additions & 14 deletions reactor-core/src/main/java/reactor/core/publisher/FluxTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,24 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

// Invoked AFTER doFirst
Context ctx;
try {
ctx = signalListener.addToContext(actual.currentContext());
}
catch (Throwable e) {
IllegalStateException listenerError = new IllegalStateException(
"Unable to augment tap Context at subscription via addToContext", e);
signalListener.handleListenerError(listenerError);
Operators.error(actual, listenerError);
return null;
}

if (actual instanceof ConditionalSubscriber) {
//noinspection unchecked
return new TapConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, signalListener);
return new TapConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, signalListener, ctx);
}
return new TapSubscriber<>(actual, signalListener);
return new TapSubscriber<>(actual, signalListener, ctx);
}

@Nullable
Expand All @@ -94,18 +107,10 @@ static class TapSubscriber<T> implements InnerOperator<T, T> {
boolean done;
Subscription s;

TapSubscriber(CoreSubscriber<? super T> actual, SignalListener<T> signalListener) {
TapSubscriber(CoreSubscriber<? super T> actual,
SignalListener<T> signalListener, Context ctx) {
this.actual = actual;
this.listener = signalListener;
//note that since we're in the subscriber, this is technically invoked AFTER doFirst
Context ctx;
try {
ctx = signalListener.addToContext(actual.currentContext());
}
catch (Throwable e) {
signalListener.handleListenerError(new IllegalStateException("Unable to augment tap Context at construction via addToContext", e));
ctx = actual.currentContext();
}
this.context = ctx;
}

Expand Down Expand Up @@ -330,8 +335,9 @@ static final class TapConditionalSubscriber<T> extends TapSubscriber<T> implemen

final ConditionalSubscriber<? super T> actualConditional;

public TapConditionalSubscriber(ConditionalSubscriber<? super T> actual, SignalListener<T> signalListener) {
super(actual, signalListener);
public TapConditionalSubscriber(ConditionalSubscriber<? super T> actual,
SignalListener<T> signalListener, Context ctx) {
super(actual, signalListener, ctx);
this.actualConditional = actual;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* A {@link reactor.core.Fuseable} generic per-Subscription side effect {@link Flux} that notifies a
Expand Down Expand Up @@ -69,11 +70,25 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

// Invoked AFTER doFirst
Context ctx;
try {
ctx = signalListener.addToContext(actual.currentContext());
}
catch (Throwable e) {
IllegalStateException listenerError = new IllegalStateException(
"Unable to augment tap Context at subscription via addToContext", e);
signalListener.handleListenerError(listenerError);
Operators.error(actual, listenerError);
return null;
}

if (actual instanceof ConditionalSubscriber) {
//noinspection unchecked
return new TapConditionalFuseableSubscriber<>((ConditionalSubscriber<? super T>) actual, signalListener);
return new TapConditionalFuseableSubscriber<>(
(ConditionalSubscriber<? super T>) actual, signalListener, ctx);
}
return new TapFuseableSubscriber<>(actual, signalListener);
return new TapFuseableSubscriber<>(actual, signalListener, ctx);
}

@Nullable
Expand All @@ -90,8 +105,9 @@ static class TapFuseableSubscriber<T> extends FluxTap.TapSubscriber<T> implement
int mode;
QueueSubscription<T> qs;

TapFuseableSubscriber(CoreSubscriber<? super T> actual, SignalListener<T> signalListener) {
super(actual, signalListener);
TapFuseableSubscriber(CoreSubscriber<? super T> actual,
SignalListener<T> signalListener, Context ctx) {
super(actual, signalListener, ctx);
}

/**
Expand Down Expand Up @@ -268,8 +284,9 @@ static final class TapConditionalFuseableSubscriber<T> extends TapFuseableSubscr

final ConditionalSubscriber<? super T> actualConditional;

public TapConditionalFuseableSubscriber(ConditionalSubscriber<? super T> actual, SignalListener<T> signalListener) {
super(actual, signalListener);
public TapConditionalFuseableSubscriber(ConditionalSubscriber<? super T> actual,
SignalListener<T> signalListener, Context ctx) {
super(actual, signalListener, ctx);
this.actualConditional = actual;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ public void subscribe(CoreSubscriber<? super T> actual) {
alteredContext = signalListener.addToContext(actual.currentContext());
}
catch (Throwable e) {
signalListener.handleListenerError(new IllegalStateException("Unable to augment tap Context at construction via addToContext", e));
alteredContext = actual.currentContext();
IllegalStateException listenerError = new IllegalStateException(
"Unable to augment tap Context at subscription via addToContext", e);
signalListener.handleListenerError(listenerError);
Operators.error(actual, listenerError);
return;
}

try (ContextSnapshot.Scope ignored = ContextPropagation.setThreadLocals(alteredContext)) {
Expand Down
18 changes: 16 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/MonoTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import reactor.core.observability.SignalListenerFactory;
import reactor.core.publisher.FluxTap.TapSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* A generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events.
Expand Down Expand Up @@ -66,11 +67,24 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

// Invoked AFTER doFirst
Context ctx;
try {
ctx = signalListener.addToContext(actual.currentContext());
}
catch (Throwable e) {
IllegalStateException listenerError = new IllegalStateException(
"Unable to augment tap Context at subscription via addToContext", e);
signalListener.handleListenerError(listenerError);
Operators.error(actual, listenerError);
return null;
}

if (actual instanceof Fuseable.ConditionalSubscriber) {
//noinspection unchecked
return new FluxTap.TapConditionalSubscriber<>((Fuseable.ConditionalSubscriber<? super T>) actual, signalListener);
return new FluxTap.TapConditionalSubscriber<>((Fuseable.ConditionalSubscriber<? super T>) actual, signalListener, ctx);
}
return new TapSubscriber<>(actual, signalListener);
return new TapSubscriber<>(actual, signalListener, ctx);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* A {@link Fuseable} generic per-Subscription side effect {@link Mono} that notifies a {@link SignalListener} of most events.
Expand Down Expand Up @@ -65,11 +66,25 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

// Invoked AFTER doFirst
Context ctx;
try {
ctx = signalListener.addToContext(actual.currentContext());
}
catch (Throwable e) {
IllegalStateException listenerError = new IllegalStateException(
"Unable to augment tap Context at subscription via addToContext", e);
signalListener.handleListenerError(listenerError);
Operators.error(actual, listenerError);
return null;
}

if (actual instanceof ConditionalSubscriber) {
//noinspection unchecked
return new FluxTapFuseable.TapConditionalFuseableSubscriber<>((ConditionalSubscriber<? super T>) actual, signalListener);
return new FluxTapFuseable.TapConditionalFuseableSubscriber<>(
(ConditionalSubscriber<? super T>) actual, signalListener, ctx);
}
return new FluxTapFuseable.TapFuseableSubscriber<>(actual, signalListener);
return new FluxTapFuseable.TapFuseableSubscriber<>(actual, signalListener, ctx);
}

@Nullable
Expand Down

0 comments on commit d31f1bc

Please sign in to comment.