Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows continuation of observations #1076

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
version=1.1.3
version=1.1.4-SNAPSHOT
perfBaselineVersion=1.1.2
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.context.ContextView;

/**
Expand All @@ -43,13 +44,24 @@
*/
public class ObservationRequesterRSocketProxy extends RSocketProxy {

/** Aligned with ObservationThreadLocalAccessor#KEY */
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";

private final ObservationRegistry observationRegistry;

private RSocketRequesterObservationConvention observationConvention;
@Nullable private final RSocketRequesterObservationConvention observationConvention;

public ObservationRequesterRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
this(source, observationRegistry, null);
}

public ObservationRequesterRSocketProxy(
RSocket source,
ObservationRegistry observationRegistry,
RSocketRequesterObservationConvention observationConvention) {
super(source);
this.observationRegistry = observationRegistry;
this.observationConvention = observationConvention;
}

@Override
Expand All @@ -76,15 +88,7 @@ <T> Mono<T> setObservation(
FrameType frameType,
ObservationDocumentation observation) {
return Mono.deferContextual(
contextView -> {
if (contextView.hasKey(Observation.class)) {
Observation parent = contextView.get(Observation.class);
try (Observation.Scope scope = parent.openScope()) {
return observe(input, payload, frameType, observation);
}
}
return observe(input, payload, frameType, observation);
});
contextView -> observe(input, payload, frameType, observation, contextView));
}

private String route(Payload payload) {
Expand All @@ -107,45 +111,40 @@ private <T> Mono<T> observe(
Function<Payload, Mono<T>> input,
Payload payload,
FrameType frameType,
ObservationDocumentation obs) {
ObservationDocumentation obs,
ContextView contextView) {
String route = route(payload);
RSocketContext rSocketContext =
new RSocketContext(
payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
Observation parentObservation = contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
Observation observation =
obs.start(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
observationRegistry);
obs.observation(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
observationRegistry)
.parentObservation(parentObservation);
setContextualName(frameType, route, observation);
observation.start();
Payload newPayload = payload;
if (rSocketContext.modifiedPayload != null) {
newPayload = rSocketContext.modifiedPayload;
}
return input
.apply(newPayload)
.doOnError(observation::error)
.doFinally(signalType -> observation.stop());
}

private Observation observation(ContextView contextView) {
if (contextView.hasKey(Observation.class)) {
return contextView.get(Observation.class);
}
return null;
.doFinally(signalType -> observation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.deferContextual(
contextView ->
setObservation(
super::requestStream,
payload,
contextView,
FrameType.REQUEST_STREAM,
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM));
return observationFlux(
super::requestStream,
payload,
FrameType.REQUEST_STREAM,
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM);
}

@Override
Expand All @@ -155,32 +154,16 @@ public Flux<Payload> requestChannel(Publisher<Payload> inbound) {
(firstSignal, flux) -> {
final Payload firstPayload = firstSignal.get();
if (firstPayload != null) {
return setObservation(
return observationFlux(
p -> super.requestChannel(flux.skip(1).startWith(p)),
firstPayload,
firstSignal.getContextView(),
FrameType.REQUEST_CHANNEL,
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_CHANNEL);
}
return flux;
});
}

private Flux<Payload> setObservation(
Function<Payload, Flux<Payload>> input,
Payload payload,
ContextView contextView,
FrameType frameType,
ObservationDocumentation obs) {
Observation parentObservation = observation(contextView);
if (parentObservation == null) {
return observationFlux(input, payload, frameType, obs);
}
try (Observation.Scope scope = parentObservation.openScope()) {
return observationFlux(input, payload, frameType, obs);
}
}

private Flux<Payload> observationFlux(
Function<Payload, Flux<Payload>> input,
Payload payload,
Expand All @@ -196,17 +179,22 @@ private Flux<Payload> observationFlux(
frameType,
route,
RSocketContext.Side.REQUESTER);
Observation parentObservation =
contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
Observation newObservation =
obs.start(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
this.observationRegistry);
obs.observation(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
this.observationRegistry)
.parentObservation(parentObservation);
setContextualName(frameType, route, newObservation);
newObservation.start();
return input
.apply(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
});
}

Expand All @@ -217,8 +205,4 @@ private void setContextualName(FrameType frameType, String route, Observation ne
newObservation.contextualName(frameType.name());
}
}

public void setObservationConvention(RSocketRequesterObservationConvention convention) {
this.observationConvention = convention;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* Tracing representation of a {@link RSocketProxy} for the responder.
Expand All @@ -39,14 +40,24 @@
* @since 1.1.4
*/
public class ObservationResponderRSocketProxy extends RSocketProxy {
/** Aligned with ObservationThreadLocalAccessor#KEY */
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";

private final ObservationRegistry observationRegistry;

private RSocketResponderObservationConvention observationConvention;
@Nullable private final RSocketResponderObservationConvention observationConvention;

public ObservationResponderRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
this(source, observationRegistry, null);
}

public ObservationResponderRSocketProxy(
RSocket source,
ObservationRegistry observationRegistry,
RSocketResponderObservationConvention observationConvention) {
super(source);
this.observationRegistry = observationRegistry;
this.observationConvention = observationConvention;
}

@Override
Expand All @@ -66,7 +77,8 @@ public Mono<Void> fireAndForget(Payload payload) {
startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_FNF, rSocketContext);
return super.fireAndForget(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}

private Observation startObservation(
Expand Down Expand Up @@ -94,7 +106,8 @@ public Mono<Payload> requestResponse(Payload payload) {
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_RESPONSE, rSocketContext);
return super.requestResponse(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}

@Override
Expand All @@ -109,7 +122,8 @@ public Flux<Payload> requestStream(Payload payload) {
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_STREAM, rSocketContext);
return super.requestStream(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}

@Override
Expand Down Expand Up @@ -137,7 +151,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
}
return super.requestChannel(flux.skip(1).startWith(rSocketContext.modifiedPayload))
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(
context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}
return flux;
});
Expand All @@ -160,8 +176,4 @@ private String route(Payload payload, ByteBuf headers) {
}
return null;
}

public void setObservationConvention(RSocketResponderObservationConvention convention) {
this.observationConvention = convention;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public Tracer getTracer() {
public void onStart(RSocketContext context) {
Payload payload = context.payload;
Span.Builder spanBuilder = this.tracer.spanBuilder();
Span parentSpan = getParentSpan(context);
if (parentSpan != null) {
spanBuilder.setParent(parentSpan.context());
}
Span span = spanBuilder.kind(Span.Kind.PRODUCER).start();
log.debug("Extracted result from context or thread local {}", span);
// TODO: newmetadata returns an empty composite byte buf
Expand Down