Skip to content

Commit

Permalink
Reactive Messaging OTel tracing decorator update for 4.11.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 8, 2023
1 parent adf8f7d commit 6a3982b
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.InstrumentationRecorder;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.grpc.GrpcTracingClientInterceptor;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.grpc.GrpcTracingServerInterceptor;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingDecorator;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingEmitterDecorator;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingIncomingDecorator;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingOutgoingDecorator;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.restclient.OpenTelemetryClientFilter;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.resteasy.AttachExceptionHandler;
import io.quarkus.opentelemetry.runtime.tracing.intrumentation.resteasy.OpenTelemetryClassicServerFilter;
Expand Down Expand Up @@ -90,7 +92,9 @@ void registerReactiveMessagingMessageDecorator(
Capabilities capabilities,
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {
if (capabilities.isPresent(Capability.SMALLRYE_REACTIVE_MESSAGING)) {
additionalBeans.produce(new AdditionalBeanBuildItem(ReactiveMessagingTracingDecorator.class));
additionalBeans.produce(new AdditionalBeanBuildItem(ReactiveMessagingTracingOutgoingDecorator.class));
additionalBeans.produce(new AdditionalBeanBuildItem(ReactiveMessagingTracingIncomingDecorator.class));
additionalBeans.produce(new AdditionalBeanBuildItem(ReactiveMessagingTracingEmitterDecorator.class));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging;

import static io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging.ReactiveMessagingTracingOutgoingDecorator.decorateOutgoing;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PublisherDecorator;

/**
* Intercepts outgoing messages from emitters from Reactive Messaging connectors.
* <p>
* For outgoing messages from emitters, if the message doesn't already contain a tracing metadata, it attaches one with the
* current
* OpenTelemetry context.
* Reactive messaging outbound connectors, if tracing is supported, will use that context as parent span to trace outbound
* message transmission.
*/
@ApplicationScoped
public class ReactiveMessagingTracingEmitterDecorator implements PublisherDecorator {

@Override
public int getPriority() {
// Place the decorator before all others including the ContextDecorator which is priority 0
// This is only important for the emitter case
return -1000;
}

@Inject
ChannelRegistry registry;

/**
* Incoming messages
*/
@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher,
String channelName, boolean isConnector) {
Multi<? extends Message<?>> multi = publisher;
if (!isConnector && registry.getEmitterNames().contains(channelName)) {
// Emitter is a special case for the emitter publisher
multi = decorateOutgoing(multi);
}
return multi;
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging;

import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
Expand All @@ -10,30 +8,19 @@
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;

/**
* Intercepts incoming and outgoing messages from Reactive Messaging connectors.
* Intercepts incoming messages from Reactive Messaging connectors.
* <p>
* For incoming messages, it fetches OpenTelemetry context from the message and attaches to the duplicated context of the
* message.
* Consumer methods will be called on this duplicated context, so the OpenTelemetry context associated with the incoming message
* will be propagated.
* <p>
* For outgoing messages, if the message doesn't already contain a tracing metadata, it attaches one with the current
* OpenTelemetry context.
* Reactive messaging outbound connectors, if tracing is supported, will use that context as parent span to trace outbound
* message transmission.
*/
@ApplicationScoped
public class ReactiveMessagingTracingDecorator implements PublisherDecorator, SubscriberDecorator {

@Override
public int getPriority() {
return 1000;
}
public class ReactiveMessagingTracingIncomingDecorator implements PublisherDecorator {

/**
* Incoming messages
Expand All @@ -59,25 +46,4 @@ public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publishe
return multi;
}

/**
* Outgoing messages
*/
@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> toBeSubscribed,
List<String> channelName, boolean isConnector) {
Multi<? extends Message<?>> multi = toBeSubscribed;
if (isConnector) {
// add TracingMetadata to the outgoing message if it doesn't exist already
multi = multi.map(m -> {
Message<?> message = m;
if (m.getMetadata(TracingMetadata.class).isEmpty()) {
var otelContext = QuarkusContextStorage.INSTANCE.current();
message = m.addMetadata(TracingMetadata.withCurrent(otelContext));
}
return message;
});
}
return multi;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.opentelemetry.runtime.tracing.intrumentation.reactivemessaging;

import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.TracingMetadata;

/**
* Intercepts outgoing messages from Reactive Messaging connectors.
* <p>
* For outgoing messages, if the message doesn't already contain a tracing metadata, it attaches one with the current
* OpenTelemetry context.
* Reactive messaging outbound connectors, if tracing is supported, will use that context as parent span to trace outbound
* message transmission.
*/
@ApplicationScoped
public class ReactiveMessagingTracingOutgoingDecorator implements SubscriberDecorator {

/**
* Outgoing messages
*/
@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> toBeSubscribed,
List<String> channelName, boolean isConnector) {
Multi<? extends Message<?>> multi = toBeSubscribed;
if (isConnector) {
// add TracingMetadata to the outgoing message if it doesn't exist already
multi = decorateOutgoing(multi);
}
return multi;
}

static Multi<? extends Message<?>> decorateOutgoing(Multi<? extends Message<?>> multi) {
return multi.map(m -> {
Message<?> message = m;
if (m.getMetadata(TracingMetadata.class).isEmpty()) {
var otelContext = QuarkusContextStorage.INSTANCE.current();
message = m.addMetadata(TracingMetadata.withCurrent(otelContext));
}
return message;
});
}

}

0 comments on commit 6a3982b

Please sign in to comment.