Skip to content

Commit

Permalink
Allows continuation of observations
Browse files Browse the repository at this point in the history
without this change we're not taking into account a parent observation into account. Nor do we set in the reactor context the current observation for the users to use

with this change whenever an observation is being created we put it into the reacto context under a well-known micrometer.observation key (for more information look at the ObservationThreadLocalAccessor class from micrometer-core)

Signed-off-by: Marcin Grzejszczak <mgrzejszczak@vmware.com>
  • Loading branch information
marcingrzejszczak committed Oct 26, 2022
1 parent 32da131 commit 95b5eb3
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 69 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
version=1.1.3
version=1.1.4-SNAPSHOT
perfBaselineVersion=1.1.2
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.context.ContextView;

/**
Expand All @@ -43,13 +44,24 @@
*/
public class ObservationRequesterRSocketProxy extends RSocketProxy {

/** Aligned with ObservationThreadLocalAccessor#KEY */
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";

private final ObservationRegistry observationRegistry;

private RSocketRequesterObservationConvention observationConvention;
@Nullable private final RSocketRequesterObservationConvention observationConvention;

public ObservationRequesterRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
this(source, observationRegistry, null);
}

public ObservationRequesterRSocketProxy(
RSocket source,
ObservationRegistry observationRegistry,
RSocketRequesterObservationConvention observationConvention) {
super(source);
this.observationRegistry = observationRegistry;
this.observationConvention = observationConvention;
}

@Override
Expand All @@ -76,15 +88,7 @@ <T> Mono<T> setObservation(
FrameType frameType,
ObservationDocumentation observation) {
return Mono.deferContextual(
contextView -> {
if (contextView.hasKey(Observation.class)) {
Observation parent = contextView.get(Observation.class);
try (Observation.Scope scope = parent.openScope()) {
return observe(input, payload, frameType, observation);
}
}
return observe(input, payload, frameType, observation);
});
contextView -> observe(input, payload, frameType, observation, contextView));
}

private String route(Payload payload) {
Expand All @@ -107,45 +111,40 @@ private <T> Mono<T> observe(
Function<Payload, Mono<T>> input,
Payload payload,
FrameType frameType,
ObservationDocumentation obs) {
ObservationDocumentation obs,
ContextView contextView) {
String route = route(payload);
RSocketContext rSocketContext =
new RSocketContext(
payload, payload.sliceMetadata(), frameType, route, RSocketContext.Side.REQUESTER);
Observation parentObservation = contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
Observation observation =
obs.start(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
observationRegistry);
obs.observation(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
observationRegistry)
.parentObservation(parentObservation);
setContextualName(frameType, route, observation);
observation.start();
Payload newPayload = payload;
if (rSocketContext.modifiedPayload != null) {
newPayload = rSocketContext.modifiedPayload;
}
return input
.apply(newPayload)
.doOnError(observation::error)
.doFinally(signalType -> observation.stop());
}

private Observation observation(ContextView contextView) {
if (contextView.hasKey(Observation.class)) {
return contextView.get(Observation.class);
}
return null;
.doFinally(signalType -> observation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.deferContextual(
contextView ->
setObservation(
super::requestStream,
payload,
contextView,
FrameType.REQUEST_STREAM,
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM));
return observationFlux(
super::requestStream,
payload,
FrameType.REQUEST_STREAM,
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_STREAM);
}

@Override
Expand All @@ -155,32 +154,16 @@ public Flux<Payload> requestChannel(Publisher<Payload> inbound) {
(firstSignal, flux) -> {
final Payload firstPayload = firstSignal.get();
if (firstPayload != null) {
return setObservation(
return observationFlux(
p -> super.requestChannel(flux.skip(1).startWith(p)),
firstPayload,
firstSignal.getContextView(),
FrameType.REQUEST_CHANNEL,
RSocketObservationDocumentation.RSOCKET_REQUESTER_REQUEST_CHANNEL);
}
return flux;
});
}

private Flux<Payload> setObservation(
Function<Payload, Flux<Payload>> input,
Payload payload,
ContextView contextView,
FrameType frameType,
ObservationDocumentation obs) {
Observation parentObservation = observation(contextView);
if (parentObservation == null) {
return observationFlux(input, payload, frameType, obs);
}
try (Observation.Scope scope = parentObservation.openScope()) {
return observationFlux(input, payload, frameType, obs);
}
}

private Flux<Payload> observationFlux(
Function<Payload, Flux<Payload>> input,
Payload payload,
Expand All @@ -196,17 +179,22 @@ private Flux<Payload> observationFlux(
frameType,
route,
RSocketContext.Side.REQUESTER);
Observation parentObservation =
contextView.getOrDefault(MICROMETER_OBSERVATION_KEY, null);
Observation newObservation =
obs.start(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
this.observationRegistry);
obs.observation(
this.observationConvention,
new DefaultRSocketRequesterObservationConvention(rSocketContext),
() -> rSocketContext,
this.observationRegistry)
.parentObservation(parentObservation);
setContextualName(frameType, route, newObservation);
newObservation.start();
return input
.apply(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
});
}

Expand All @@ -217,8 +205,4 @@ private void setContextualName(FrameType frameType, String route, Observation ne
newObservation.contextualName(frameType.name());
}
}

public void setObservationConvention(RSocketRequesterObservationConvention convention) {
this.observationConvention = convention;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* Tracing representation of a {@link RSocketProxy} for the responder.
Expand All @@ -39,14 +40,24 @@
* @since 1.1.4
*/
public class ObservationResponderRSocketProxy extends RSocketProxy {
/** Aligned with ObservationThreadLocalAccessor#KEY */
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";

private final ObservationRegistry observationRegistry;

private RSocketResponderObservationConvention observationConvention;
@Nullable private final RSocketResponderObservationConvention observationConvention;

public ObservationResponderRSocketProxy(RSocket source, ObservationRegistry observationRegistry) {
this(source, observationRegistry, null);
}

public ObservationResponderRSocketProxy(
RSocket source,
ObservationRegistry observationRegistry,
RSocketResponderObservationConvention observationConvention) {
super(source);
this.observationRegistry = observationRegistry;
this.observationConvention = observationConvention;
}

@Override
Expand All @@ -66,7 +77,8 @@ public Mono<Void> fireAndForget(Payload payload) {
startObservation(RSocketObservationDocumentation.RSOCKET_RESPONDER_FNF, rSocketContext);
return super.fireAndForget(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}

private Observation startObservation(
Expand Down Expand Up @@ -94,7 +106,8 @@ public Mono<Payload> requestResponse(Payload payload) {
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_RESPONSE, rSocketContext);
return super.requestResponse(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}

@Override
Expand All @@ -109,7 +122,8 @@ public Flux<Payload> requestStream(Payload payload) {
RSocketObservationDocumentation.RSOCKET_RESPONDER_REQUEST_STREAM, rSocketContext);
return super.requestStream(rSocketContext.modifiedPayload)
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}

@Override
Expand Down Expand Up @@ -137,7 +151,9 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
}
return super.requestChannel(flux.skip(1).startWith(rSocketContext.modifiedPayload))
.doOnError(newObservation::error)
.doFinally(signalType -> newObservation.stop());
.doFinally(signalType -> newObservation.stop())
.contextWrite(
context -> context.put(MICROMETER_OBSERVATION_KEY, newObservation));
}
return flux;
});
Expand All @@ -160,8 +176,4 @@ private String route(Payload payload, ByteBuf headers) {
}
return null;
}

public void setObservationConvention(RSocketResponderObservationConvention convention) {
this.observationConvention = convention;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public Tracer getTracer() {
public void onStart(RSocketContext context) {
Payload payload = context.payload;
Span.Builder spanBuilder = this.tracer.spanBuilder();
Span parentSpan = getParentSpan(context);
if (parentSpan != null) {
spanBuilder.setParent(parentSpan.context());
}
Span span = spanBuilder.kind(Span.Kind.PRODUCER).start();
log.debug("Extracted result from context or thread local {}", span);
// TODO: newmetadata returns an empty composite byte buf
Expand Down

0 comments on commit 95b5eb3

Please sign in to comment.