Skip to content

Add messaging wrappers to support lightweight manual instrumentation #1855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions messaging-wrappers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# OpenTelemetry Messaging Wrappers

This is a lightweight messaging wrappers API designed to help you quickly add instrumentation to any
type of messaging system client. To further ease the burden of instrumentation, we will also provide
predefined implementations for certain messaging systems, helping you seamlessly address the issue
of broken traces.

<details>
<summary>Table of Contents</summary>

- [Overview](#overview)
- [Predefined Implementations](#predefined-implementations)
- [Quickstart For Given Implementations](#quickstart-for-given-implementations)
- [\[Given\] Step 1 Add dependencies](#given-step-1-add-dependencies)
- [\[Given\] Step 2 Initializing MessagingWrappers](#given-step-2-initializing-messagingwrappers)
- [\[Given\] Step 3 Wrapping the Process](#given-step-3-wrapping-the-process)
- [Manual Implementation](#manual-implementation)
- [\[Manual\] Step 1 Add dependencies](#manual-step-1-add-dependencies)
- [\[Manual\] Step 2 Initializing MessagingWrappers](#manual-step-2-initializing-messagingwrappers)
- [\[Manual\] Step 3 Wrapping the Process](#manual-step-3-wrapping-the-process)
- [Component Owners](#component-owners)

</details>

## Overview

The primary goal of this API is to simplify the process of adding instrumentation to your messaging
systems, thereby enhancing observability without introducing significant overhead. Inspired by
[#13340](https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/13340) and
[opentelemetry-java-instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java),
this tool aims to streamline the tracing and monitoring process.

## Predefined Implementations

| Messaging system | Version Scope | Wrapper type |
|-------------------|---------------|--------------|
| kafka-clients | `[0.11.0.0,)` | process |
| aliyun mns-client | `[1.3.0,)` | process |

## Quickstart For Given Implementations

This example will demonstrate how to add automatic instrumentation to your Kafka consumer with process wrapper. For
detailed example, please check out [KafkaClientTest](./kafka-clients/src/test/java/io/opentelemetry/contrib/messaging/wrappers/kafka/KafkaClientTest.java).

### [Given] Step 1 Add dependencies

To use OpenTelemetry in your project, you need to add the necessary dependencies. Below are the configurations for both
Gradle and Maven.

### [Given] Gradle

```kotlin
dependencies {
implementation("io.opentelemetry.contrib:opentelemetry-messaging-wrappers-kafka-clients:${latest_version}")
}
```

### [Given] Maven

```xml
<dependency>
<groupId>io.opentelemetry.contrib</groupId>
<artifactId>opentelemetry-messaging-wrappers-kafka-clients</artifactId>
<version>${latest_version}</version>
</dependency>
```

### [Given] Step 2 Initializing MessagingWrappers

For `kafka-clients`, we provide pre-implemented wrappers that allow for out-of-the-box integration. We provide
an implementation based on the OpenTelemetry semantic convention by default.

```java
public class KafkaDemo {

public static MessagingProcessWrapper<KafkaProcessRequest> createWrapper() {
return KafkaHelper.processWrapperBuilder().build();
}
}
```

### [Given] Step 3 Wrapping the Process

Once the MessagingWrapper are initialized, you can wrap your message processing logic to ensure that tracing spans are
properly created and propagated.

**P.S.** Some instrumentations may also [generate process spans](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/docs/supported-libraries.md).
If both are enabled, it might result in duplicate nested process spans. It is recommended to disable one of them.

```java
public class Demo {

private static final MessagingProcessWrapper<KafkaProcessRequest> WRAPPER = createWrapper();

// please initialize consumer
private Consumer<Integer, String> consumer;

public String consume() {
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5));
ConsumerRecord<?, ?> record = records.iterator().next();

return WRAPPER.doProcess(
KafkaProcessRequest.of(record, groupId, clientId), () -> {
// your processing logic
return "success";
});
}

public void consumeWithoutResult() {
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5));
ConsumerRecord<?, ?> record = records.iterator().next();

WRAPPER.doProcess(
KafkaProcessRequest.of(record, groupId, clientId), () -> {
// your processing logic
});
}
}
```

## Manual Implementation

You can also build implementations based on the `messaging-wrappers-api` for any messaging system to accommodate your
custom message protocol. For detailed example, please check out [UserDefinedMessageSystemTest](./api/src/test/java/io/opentelemetry/contrib/messaging/wrappers/UserDefinedMessageSystemTest.java).

### [Manual] Step 1 Add dependencies

#### [Manual] Gradle

```kotlin
dependencies {
implementation("io.opentelemetry.contrib:opentelemetry-messaging-wrappers-api:${latest_version}")
}
```

#### [Manual] Maven

```xml
<dependency>
<groupId>io.opentelemetry.contrib</groupId>
<artifactId>opentelemetry-messaging-wrappers-api</artifactId>
<version>${latest_version}</version>
</dependency>
```

### [Manual] Step 2 Initializing MessagingWrappers

Below is an example of how to initialize a messaging wrapper.

```java
public class Demo {

public static MessagingProcessWrapper<MessagingProcessRequest> createWrapper(OpenTelemetry openTelemetry) {

return MessagingProcessWrapper.<MessagingProcessRequest>defaultBuilder()
.openTelemetry(openTelemetry)
.textMapGetter(DefaultMessageTextMapGetter.create())
.attributesExtractors(
Collections.singletonList(
MessagingAttributesExtractor.create(
DefaultMessagingAttributesGetter.create(), MessageOperation.PROCESS)))
.build();
}
}

public class MyMessagingProcessRequest implements MessagingProcessRequest {
// your implementation here

@Override
public List<String> getMessageHeader(String name) {
// impl: how to get specific header from your message
return Collections.singletonList(message.getHeaders().get(name));
}

@Override
public Collection<String> getAllMessageHeadersKey() {
// impl: how to get all headers key set from your message
return message.getHeaders().keySet();
}
}
```

For arbitrary messaging systems, you need to manually define `MessagingProcessRequest` and the corresponding `TextMapGetter`.
You can also customize your messaging spans by adding an `AttributesExtractor`.

### [Manual] Step 3 Wrapping the Process

Once the MessagingWrapper are initialized, you can wrap your message processing logic to ensure that tracing spans are
properly created and propagated.

**P.S.** Some instrumentations may also [generate process spans](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/docs/supported-libraries.md).
If both are enabled, it might result in duplicate nested process spans. It is recommended to disable one of them.

```java
public class Demo {

private static final MessagingProcessWrapper<MyMessagingProcessRequest> WRAPPER = createWrapper();

public String consume(Message message) {
return WRAPPER.doProcess(new MyMessagingProcessRequest(message), () -> {
// your processing logic
return "success";
});
}

public void consumeWithoutReturn(Message message) {
WRAPPER.doProcess(new MyMessagingProcessRequest(message), () -> {
// your processing logic
});
}
}
```

## Component Owners

- [Minghui Zhang](https://github.com/Cirilla-zmh), Alibaba
- [Steve Rao](https://github.com/steverao), Alibaba

Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).
40 changes: 40 additions & 0 deletions messaging-wrappers/aliyun-mns-sdk/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
id("otel.java-conventions")

id("otel.publish-conventions")
}

description = "OpenTelemetry Messaging Wrappers - aliyun-mns-sdk implementation"
otelJava.moduleName.set("io.opentelemetry.contrib.messaging.wrappers.aliyun-mns-sdk")

dependencies {
api(project(":messaging-wrappers:api"))

compileOnly("com.aliyun.mns:aliyun-sdk-mns:1.3.0")

testImplementation("com.aliyun.mns:aliyun-sdk-mns:1.3.0")
testImplementation(project(":messaging-wrappers:testing"))

testImplementation("org.springframework.boot:spring-boot-starter-web:2.7.18")
testImplementation("org.springframework.boot:spring-boot-starter-test:2.7.18")
}

tasks {
withType<Test>().configureEach {
jvmArgs("-Dotel.java.global-autoconfigure.enabled=true")
// TODO: According to https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#message-creation-context-as-parent-of-process-span,
// process span should be the child of receive span. However, we couldn't access the trace context with receive span
// in wrappers, unless we add a generic accessor for that.
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
jvmArgs("-Dotel.traces.exporter=logging")
jvmArgs("-Dotel.metrics.exporter=logging")
jvmArgs("-Dotel.logs.exporter=logging")
}
}

configurations.testRuntimeClasspath {
resolutionStrategy {
force("ch.qos.logback:logback-classic:1.2.12")
force("org.slf4j:slf4j-api:1.7.35")
}
}
2 changes: 2 additions & 0 deletions messaging-wrappers/aliyun-mns-sdk/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# TODO: uncomment when ready to mark as stable
# otel.stable=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.messaging.wrappers.mns;

import com.aliyun.mns.model.BaseMessage;
import com.aliyun.mns.model.MessagePropertyValue;
import com.aliyun.mns.model.MessageSystemPropertyName;
import com.aliyun.mns.model.MessageSystemPropertyValue;
import javax.annotation.Nullable;

public final class MnsHelper {

public static MnsProcessWrapperBuilder processWrapperBuilder() {
return new MnsProcessWrapperBuilder();
}

@Nullable
public static String getMessageHeader(BaseMessage message, String name) {
MessageSystemPropertyName key = convert2SystemPropertyName(name);
if (key != null) {
MessageSystemPropertyValue systemProperty = message.getSystemProperty(key);
if (systemProperty != null) {
return systemProperty.getStringValueByType();
}
}
MessagePropertyValue messagePropertyValue = message.getUserProperties().get(name);
if (messagePropertyValue != null) {
return messagePropertyValue.getStringValueByType();
}
return null;
}

/** see {@link MessageSystemPropertyName} */
@Nullable
public static MessageSystemPropertyName convert2SystemPropertyName(String name) {
if (name == null) {
return null;
} else if (name.equals(MessageSystemPropertyName.TRACE_PARENT.getValue())) {
return MessageSystemPropertyName.TRACE_PARENT;
} else if (name.equals(MessageSystemPropertyName.BAGGAGE.getValue())) {
return MessageSystemPropertyName.BAGGAGE;
} else if (name.equals(MessageSystemPropertyName.TRACE_STATE.getValue())) {
return MessageSystemPropertyName.TRACE_STATE;
}
return null;
}

private MnsHelper() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.messaging.wrappers.mns;

import io.opentelemetry.contrib.messaging.wrappers.MessagingProcessWrapperBuilder;
import io.opentelemetry.contrib.messaging.wrappers.mns.semconv.MnsConsumerAttributesGetter;
import io.opentelemetry.contrib.messaging.wrappers.mns.semconv.MnsProcessRequest;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import java.util.ArrayList;

public class MnsProcessWrapperBuilder extends MessagingProcessWrapperBuilder<MnsProcessRequest> {

MnsProcessWrapperBuilder() {
super();
super.textMapGetter = MnsTextMapGetter.create();
super.spanNameExtractor =
MessagingSpanNameExtractor.create(
MnsConsumerAttributesGetter.INSTANCE, MessageOperation.PROCESS);
super.attributesExtractors = new ArrayList<>();
super.attributesExtractors.add(
MessagingAttributesExtractor.create(
MnsConsumerAttributesGetter.INSTANCE, MessageOperation.PROCESS));
}
}
Loading
Loading