Skip to content

Update Pulsar semantic #13578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* that may be used in a messaging system.
*/
public enum MessageOperation {
CREATE,
PUBLISH,
RECEIVE,
SEND,
PROCESS;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.instrumentation.api.internal.AttributesExtractorUtil.internalSet;

import io.opentelemetry.api.common.AttributeKey;
Expand All @@ -28,6 +29,8 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
implements AttributesExtractor<REQUEST, RESPONSE>, SpanKeyProvider {

// copied from MessagingIncubatingAttributes
private static final AttributeKey<String> MESSAGING_OPERATION_NAME =
stringKey("messaging.operation.name");
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
AttributeKey.longKey("messaging.batch.message_count");
private static final AttributeKey<String> MESSAGING_CLIENT_ID =
Expand All @@ -50,8 +53,8 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
AttributeKey.longKey("messaging.message.envelope.size");
private static final AttributeKey<String> MESSAGING_MESSAGE_ID =
AttributeKey.stringKey("messaging.message.id");
private static final AttributeKey<String> MESSAGING_OPERATION =
AttributeKey.stringKey("messaging.operation");
private static final AttributeKey<String> MESSAGING_OPERATION_TYPE =
stringKey("messaging.operation.type");
private static final AttributeKey<String> MESSAGING_SYSTEM =
AttributeKey.stringKey("messaging.system");

Expand All @@ -72,24 +75,37 @@ public static <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> create(
*/
public static <REQUEST, RESPONSE> MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> builder(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return new MessagingAttributesExtractorBuilder<>(getter, operation);
return new MessagingAttributesExtractorBuilder<>(getter, operation, "");
}

public static <REQUEST, RESPONSE> MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> builder(
MessagingAttributesGetter<REQUEST, RESPONSE> getter,
MessageOperation operation,
String operationName) {
return new MessagingAttributesExtractorBuilder<>(getter, operation, operationName);
}

private final MessagingAttributesGetter<REQUEST, RESPONSE> getter;
private final MessageOperation operation;
private final List<String> capturedHeaders;
private final String operationName;

MessagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter,
MessageOperation operation,
List<String> capturedHeaders) {
List<String> capturedHeaders,
String operationName) {
this.getter = getter;
this.operation = operation;
this.capturedHeaders = CapturedMessageHeadersUtil.lowercase(capturedHeaders);
this.operationName = operationName;
}

@Override
public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
if (operationName != null) {
internalSet(attributes, MESSAGING_OPERATION_NAME, operationName);
}
internalSet(attributes, MESSAGING_SYSTEM, getter.getSystem(request));
boolean isTemporaryDestination = getter.isTemporaryDestination(request);
if (isTemporaryDestination) {
Expand All @@ -112,7 +128,7 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
attributes, MESSAGING_MESSAGE_ENVELOPE_SIZE, getter.getMessageEnvelopeSize(request));
internalSet(attributes, MESSAGING_CLIENT_ID, getter.getClientId(request));
if (operation != null) {
internalSet(attributes, MESSAGING_OPERATION, operation.operationName());
internalSet(attributes, MESSAGING_OPERATION_TYPE, operation.operationName());
}
}

Expand Down Expand Up @@ -147,6 +163,8 @@ public SpanKey internalGetSpanKey() {

switch (operation) {
case PUBLISH:
case CREATE:
case SEND:
return SpanKey.PRODUCER;
case RECEIVE:
return SpanKey.CONSUMER_RECEIVE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ public final class MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> {
final MessagingAttributesGetter<REQUEST, RESPONSE> getter;
final MessageOperation operation;
List<String> capturedHeaders = emptyList();
final String operationName;

MessagingAttributesExtractorBuilder(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
MessagingAttributesGetter<REQUEST, RESPONSE> getter,
MessageOperation operation,
String operationName) {
this.getter = getter;
this.operation = operation;
this.operationName = operationName;
}

/**
Expand All @@ -47,6 +51,6 @@ public MessagingAttributesExtractorBuilder<REQUEST, RESPONSE> setCapturedHeaders
* MessagingAttributesExtractorBuilder}.
*/
public AttributesExtractor<REQUEST, RESPONSE> build() {
return new MessagingAttributesExtractor<>(getter, operation, capturedHeaders);
return new MessagingAttributesExtractor<>(getter, operation, capturedHeaders, operationName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ public final class MessagingConsumerMetrics implements OperationListener {
private MessagingConsumerMetrics(Meter meter) {
DoubleHistogramBuilder durationBuilder =
meter
.histogramBuilder("messaging.receive.duration")
.setDescription("Measures the duration of receive operation.")
.histogramBuilder("messaging.client.operation.duration")
.setDescription(
"Duration of messaging operation initiated by a producer or consumer client.")
.setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS)
.setUnit("s");
MessagingMetricsAdvice.applyReceiveDurationAdvice(durationBuilder);
receiveDurationHistogram = durationBuilder.build();

LongCounterBuilder longCounterBuilder =
meter
.counterBuilder("messaging.receive.messages")
.setDescription("Measures the number of received messages.")
.counterBuilder("messaging.client.consumed.messages")
.setDescription("Number of messages that were delivered to the application.")
.setUnit("{message}");
MessagingMetricsAdvice.applyReceiveMessagesAdvice(longCounterBuilder);
receiveMessageCount = longCounterBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;

Expand All @@ -23,22 +24,25 @@ final class MessagingMetricsAdvice {
asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0));

// copied from MessagingIncubatingAttributes
private static final AttributeKey<String> MESSAGING_OPERATION_NAME =
stringKey("messaging.operation.name");
private static final AttributeKey<String> MESSAGING_SYSTEM =
AttributeKey.stringKey("messaging.system");
private static final AttributeKey<String> MESSAGING_DESTINATION_NAME =
AttributeKey.stringKey("messaging.destination.name");
private static final AttributeKey<String> MESSAGING_OPERATION =
AttributeKey.stringKey("messaging.operation");
private static final AttributeKey<String> MESSAGING_OPERATION_TYPE =
stringKey("messaging.operation.type");
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
AttributeKey.stringKey("messaging.destination.partition.id");
private static final AttributeKey<String> MESSAGING_DESTINATION_TEMPLATE =
AttributeKey.stringKey("messaging.destination.template");

private static final List<AttributeKey<?>> MESSAGING_ATTRIBUTES =
asList(
MESSAGING_OPERATION_NAME,
MESSAGING_SYSTEM,
MESSAGING_DESTINATION_NAME,
MESSAGING_OPERATION,
MESSAGING_OPERATION_TYPE,
MESSAGING_DESTINATION_PARTITION_ID,
MESSAGING_DESTINATION_TEMPLATE,
ErrorAttributes.ERROR_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ public final class MessagingProducerMetrics implements OperationListener {
private MessagingProducerMetrics(Meter meter) {
DoubleHistogramBuilder durationBuilder =
meter
.histogramBuilder("messaging.publish.duration")
.setDescription("Measures the duration of publish operation.")
.histogramBuilder("messaging.client.operation.duration")
.setDescription(
"Duration of messaging operation initiated by a producer or consumer client.")
.setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS)
.setUnit("s");
MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void collectsMetrics() {
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName("messaging.publish.duration")
.hasName("messaging.client.operation.duration")
.hasUnit("s")
.hasDescription("Measures the duration of publish operation.")
.hasHistogramSatisfying(
Expand Down Expand Up @@ -113,7 +113,7 @@ void collectsMetrics() {
.satisfiesExactlyInAnyOrder(
metric ->
assertThat(metric)
.hasName("messaging.publish.duration")
.hasName("messaging.client.operation.duration")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private static Instrumenter<PulsarRequest, Void> createConsumerReceiveInstrument
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE, "receive"))
.addOperationMetrics(MessagingConsumerMetrics.get())
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));
Expand All @@ -99,7 +99,7 @@ private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceive
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE, "receive"))
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR))
Expand All @@ -117,7 +117,7 @@ private static Instrumenter<PulsarRequest, Void> createConsumerProcessInstrument
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.PROCESS));
createMessagingAttributesExtractor(getter, MessageOperation.PROCESS, "process"));

if (receiveInstrumentationEnabled) {
SpanLinksExtractor<PulsarRequest> spanLinksExtractor =
Expand All @@ -136,9 +136,9 @@ private static Instrumenter<PulsarRequest, Void> createProducerInstrumenter() {
Instrumenter.<PulsarRequest, Void>builder(
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.PUBLISH))
MessagingSpanNameExtractor.create(getter, MessageOperation.SEND))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH))
createMessagingAttributesExtractor(getter, MessageOperation.SEND, "send"))
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.addOperationMetrics(MessagingProducerMetrics.get());
Expand All @@ -152,8 +152,8 @@ private static Instrumenter<PulsarRequest, Void> createProducerInstrumenter() {
}

private static <T> AttributesExtractor<T, Void> createMessagingAttributesExtractor(
MessagingAttributesGetter<T, Void> getter, MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
MessagingAttributesGetter<T, Void> getter, MessageOperation operation, String operationName) {
return MessagingAttributesExtractor.builder(getter, operation, operationName)
.setCapturedHeaders(capturedHeaders)
.build();
}
Expand Down
Loading
Loading