diff --git a/README.md b/README.md
index 19b33665..5b9b093c 100644
--- a/README.md
+++ b/README.md
@@ -43,6 +43,7 @@ need to do:
* [By Key and Value](#by-key-and-value)
* [By Predicate](#by-predicate)
* [Interactive Queries](#interactive-queries)
+ * [Open Telemetry](#open-telemetry)
* [Testing](#testing)
* [Motivation](#motivation)
* [Contribution](#contribution)
@@ -449,6 +450,26 @@ containers:
- If neither the variable environment nor the `MY_POD_IP` environment variable is set, Kstreamplify
sets `application.server` to the default value `localhost`.
+### Open Telemetry
+
+The Kstreamplify Spring Boot module simplifies the integration of [Open Telemetry](https://opentelemetry.io/) into your Kafka Streams application
+by binding all the metrics of the Kafka Streams instance to the Spring Boot registry which is used by the Open Telemetry Java agent.
+
+You can run your application with the Open Telemetry Java agent by including the following JVM options:
+
+```shell
+-javaagent:/opentelemetry-javaagent.jar -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.metrics.exporter=otlp
+```
+
+It also facilitates the addition of custom tags to the metrics, allowing you to use them to organize your metrics in your Grafana dashboard.
+
+```shell
+-Dotel.resource.attributes=environment=production,service.name=myNamespace,service.name=myKafkaStreams,category=orders
+```
+
+All the tags specified in the `otel.resource.attributes` property will be included in the metrics and can be observed in the logs
+during the application startup.
+
### Testing
For testing, you can create a test class that extends `KafkaStreamsStarterTest` and override the `getKafkaStreamsStarter` method to return your `KafkaStreamsStarter` class.
diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java
index be4f238b..856c386b 100644
--- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java
+++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java
@@ -90,6 +90,8 @@ public void init(KafkaStreamsStarter streamsStarter) {
kafkaStreams = new KafkaStreams(topology, KafkaStreamsExecutionContext.getProperties());
+ registerMetrics(kafkaStreams);
+
kafkaStreamsStarter.onStart(kafkaStreams);
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
@@ -128,8 +130,7 @@ private void initDlq() {
* Init the host information.
*/
private void initHostInfo() {
- String ipEnvVarName =
- (String) kafkaProperties.get(InitializerConstants.IP_SYSTEM_VARIABLE_PROPERTY);
+ String ipEnvVarName = (String) kafkaProperties.get(InitializerConstants.IP_SYSTEM_VARIABLE_PROPERTY);
if (StringUtils.isBlank(ipEnvVarName)) {
ipEnvVarName = InitializerConstants.IP_SYSTEM_VARIABLE_DEFAULT;
}
@@ -159,11 +160,8 @@ protected void initHttpServer() {
*/
protected void initProperties() {
properties = PropertiesUtils.loadProperties();
-
serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY);
-
kafkaProperties = PropertiesUtils.loadKafkaProperties(properties);
-
KafkaStreamsExecutionContext.registerProperties(kafkaProperties);
}
@@ -193,4 +191,11 @@ protected void onStateChange(KafkaStreams.State newState, KafkaStreams.State old
System.exit(3);
}
}
+
+ /**
+ * Register metrics.
+ */
+ protected void registerMetrics(KafkaStreams kafkaStreams) {
+ // Nothing to do here
+ }
}
diff --git a/kstreamplify-spring-boot/pom.xml b/kstreamplify-spring-boot/pom.xml
index f6be7392..2fdd4e9e 100644
--- a/kstreamplify-spring-boot/pom.xml
+++ b/kstreamplify-spring-boot/pom.xml
@@ -39,6 +39,11 @@
spring-boot-starter-logging
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
org.springframework.boot
spring-boot-starter-test
diff --git a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java
index b3cb5367..0cf5050c 100644
--- a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java
+++ b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializer.java
@@ -2,6 +2,8 @@
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.properties.KafkaProperties;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
@@ -19,14 +21,19 @@
@Slf4j
@Component
@ConditionalOnBean(KafkaStreamsStarter.class)
-public class SpringKafkaStreamsInitializer extends KafkaStreamsInitializer
- implements ApplicationRunner {
+public class SpringKafkaStreamsInitializer extends KafkaStreamsInitializer implements ApplicationRunner {
/**
* The application context.
*/
@Autowired
private ConfigurableApplicationContext applicationContext;
+ /**
+ * The meter registry.
+ */
+ @Autowired
+ private MeterRegistry registry;
+
/**
* The server port.
*/
@@ -60,7 +67,7 @@ public void run(ApplicationArguments args) {
*/
@Override
protected void initHttpServer() {
- // Nothing to do here, Spring Boot is running its own HTTP server
+ // Nothing to do here as the server is already started by Spring Boot
}
/**
@@ -93,6 +100,17 @@ protected void onStateChange(KafkaStreams.State newState, KafkaStreams.State old
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void registerMetrics(KafkaStreams kafkaStreams) {
+ // As the Kafka Streams metrics are not picked up by the OpenTelemetry Java agent automatically,
+ // register them manually to the Spring Boot registry as the agent will pick metrics up from there
+ KafkaStreamsMetrics kafkaStreamsMetrics = new KafkaStreamsMetrics(kafkaStreams);
+ kafkaStreamsMetrics.bindTo(registry);
+ }
+
/**
* Close the application context.
*/
@@ -100,7 +118,7 @@ private void closeApplicationContext() {
if (applicationContext != null) {
applicationContext.close();
} else {
- log.warn("No Spring context set");
+ log.warn("Spring Boot context is not set, cannot close it");
}
}
}
diff --git a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/opentelemetry/OpenTelemetryConfig.java b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/opentelemetry/OpenTelemetryConfig.java
new file mode 100644
index 00000000..4b5624dc
--- /dev/null
+++ b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/opentelemetry/OpenTelemetryConfig.java
@@ -0,0 +1,60 @@
+package com.michelin.kstreamplify.opentelemetry;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.checkerframework.checker.units.qual.A;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.util.StringUtils;
+
+/**
+ * The OpenTelemetry configuration class.
+ */
+@Slf4j
+@Getter
+@Setter
+@Configuration
+public class OpenTelemetryConfig {
+ /**
+ * The OpenTelemetry resource attributes.
+ */
+ @Value("${otel.resource.attributes:}")
+ private String otelResourceAttributes;
+
+ /**
+ * Register tags in Open Telemetry meter registry.
+ * It enables to add custom tags given in the property otel.resource.attributes
+ * to metrics.
+ *
+ * @return A meter registry customizer
+ */
+ @Bean
+ @ConditionalOnProperty(value = "otel.resource.attributes")
+ public MeterRegistryCustomizer addTagsOnMetrics() {
+ List tags = StringUtils.hasText(otelResourceAttributes)
+ ? Arrays.stream(otelResourceAttributes.split(","))
+ .map(resourceAttribute -> Tag.of(resourceAttribute.split("=")[0], resourceAttribute.split("=")[1]))
+ .toList() : Collections.emptyList();
+
+ return registry -> {
+ // Only add tags and Kafka metrics to Open Telemetry meter registry whose Java agent reads from it
+ if (registry.getClass().getName().contains("OpenTelemetryMeterRegistry")) {
+ log.info("Adding tags {} to registry {}",
+ tags.stream().map(tag -> tag.getKey() + "=" + tag.getValue()).toList(),
+ registry.getClass().getName());
+ registry.config().commonTags(tags);
+ }
+ };
+ }
+}
diff --git a/kstreamplify-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/kstreamplify-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index 343c9f2b..0b52d9c9 100644
--- a/kstreamplify-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/kstreamplify-spring-boot/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1,3 +1,4 @@
com.michelin.kstreamplify.initializer.SpringKafkaStreamsInitializer
com.michelin.kstreamplify.rest.SpringProbeController
-com.michelin.kstreamplify.properties.KafkaProperties
\ No newline at end of file
+com.michelin.kstreamplify.properties.KafkaProperties
+com.michelin.kstreamplify.opentelemetry.OpenTelemetryConfig
\ No newline at end of file
diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializerTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializerTest.java
index 5112f68f..87f3af16 100644
--- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializerTest.java
+++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializerTest.java
@@ -2,27 +2,21 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.properties.KafkaProperties;
-import java.lang.reflect.Field;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.DefaultApplicationArguments;
import org.springframework.context.ConfigurableApplicationContext;
@ExtendWith(MockitoExtension.class)
diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integrations/SpringKafkaStreamsInitializerIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integrations/SpringKafkaStreamsInitializerIntegrationTest.java
index b429870e..0812fe40 100644
--- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integrations/SpringKafkaStreamsInitializerIntegrationTest.java
+++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integrations/SpringKafkaStreamsInitializerIntegrationTest.java
@@ -2,12 +2,14 @@
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.DEFINED_PORT;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
+import io.micrometer.core.instrument.MeterRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -41,6 +43,9 @@ class SpringKafkaStreamsInitializerIntegrationTest {
@Autowired
private KafkaStreamsInitializer initializer;
+ @Autowired
+ private MeterRegistry registry;
+
@Autowired
private TestRestTemplate restTemplate;
@@ -89,7 +94,6 @@ void shouldInitAndRun() throws InterruptedException {
KafkaStreamsExecutionContext.getProperties().get("application.server"));
// Assert HTTP probes
-
ResponseEntity responseReady = restTemplate
.getForEntity("http://localhost:8081/ready", Void.class);
@@ -115,6 +119,32 @@ void shouldInitAndRun() throws InterruptedException {
""", responseTopology.getBody());
}
+ @Test
+ void shouldRegisterKafkaMetrics() throws InterruptedException {
+ waitingForKafkaStreamsToRun();
+
+ // Kafka Streams metrics are registered
+ assertFalse(registry.getMeters()
+ .stream()
+ .filter(metric -> metric.getId().getName().startsWith("kafka.stream"))
+ .toList()
+ .isEmpty());
+
+ // Kafka producer metrics are registered
+ assertFalse(registry.getMeters()
+ .stream()
+ .filter(metric -> metric.getId().getName().startsWith("kafka.producer"))
+ .toList()
+ .isEmpty());
+
+ // Kafka consumer metrics are registered
+ assertFalse(registry.getMeters()
+ .stream()
+ .filter(metric -> metric.getId().getName().startsWith("kafka.consumer"))
+ .toList()
+ .isEmpty());
+ }
+
private void waitingForKafkaStreamsToRun() throws InterruptedException {
while (!initializer.getKafkaStreams().state().equals(KafkaStreams.State.RUNNING)) {
log.info("Waiting for Kafka Streams to start...");
diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/opentelemetry/OpenTelemetryConfigTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/opentelemetry/OpenTelemetryConfigTest.java
new file mode 100644
index 00000000..ed45a0a4
--- /dev/null
+++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/opentelemetry/OpenTelemetryConfigTest.java
@@ -0,0 +1,63 @@
+package com.michelin.kstreamplify.opentelemetry;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
+
+/**
+ * The OpenTelemetry configuration test class.
+ */
+@ExtendWith(MockitoExtension.class)
+class OpenTelemetryConfigTest {
+ private final OpenTelemetryConfig openTelemetryConfig = new OpenTelemetryConfig();
+
+ @Test
+ void shouldAddTagsToMetricsWhenOpenTelemetryRegistry() {
+ openTelemetryConfig.setOtelResourceAttributes("tagName=tagValue,tagName2=tagValue2");
+ MeterRegistryCustomizer customizer = openTelemetryConfig.addTagsOnMetrics();
+
+ MeterRegistry meterRegistry = new OpenTelemetryMeterRegistry();
+ customizer.customize(meterRegistry);
+ meterRegistry.counter("fakeCounterMetric");
+
+ assertEquals("fakeCounterMetric", meterRegistry.getMeters().get(0).getId().getName());
+ assertEquals(Tag.of("tagName", "tagValue"), meterRegistry.getMeters().get(0).getId().getTags().get(0));
+ assertEquals(Tag.of("tagName2", "tagValue2"), meterRegistry.getMeters().get(0).getId().getTags().get(1));
+ }
+
+ @Test
+ void shouldNotAddTagsToMetricsIfEmpty() {
+ MeterRegistryCustomizer customizer = openTelemetryConfig.addTagsOnMetrics();
+
+ MeterRegistry meterRegistry = new OpenTelemetryMeterRegistry();
+ customizer.customize(meterRegistry);
+ meterRegistry.counter("fakeCounterMetric");
+
+ assertEquals("fakeCounterMetric", meterRegistry.getMeters().get(0).getId().getName());
+ assertTrue(meterRegistry.getMeters().get(0).getId().getTags().isEmpty());
+ }
+
+ @Test
+ void shouldNotAddTagsToMetricsWhenNotOpenTelemetryRegistry() {
+ openTelemetryConfig.setOtelResourceAttributes("tagName=tagValue,tagName2=tagValue2");
+ MeterRegistryCustomizer customizer = openTelemetryConfig.addTagsOnMetrics();
+
+ MeterRegistry meterRegistry = new SimpleMeterRegistry();
+ customizer.customize(meterRegistry);
+ meterRegistry.counter("fakeCounterMetric");
+
+ assertEquals("fakeCounterMetric", meterRegistry.getMeters().get(0).getId().getName());
+ assertTrue(meterRegistry.getMeters().get(0).getId().getTags().isEmpty());
+ }
+
+ static class OpenTelemetryMeterRegistry extends SimpleMeterRegistry {
+ // Empty class to mock OpenTelemetryMeterRegistry
+ }
+}
diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/properties/KafkaPropertiesTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/properties/KafkaPropertiesTest.java
index c4c0e5b0..c93ef23d 100644
--- a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/properties/KafkaPropertiesTest.java
+++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/properties/KafkaPropertiesTest.java
@@ -3,6 +3,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Map;
+import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test;
class KafkaPropertiesTest {
@@ -11,9 +12,7 @@ class KafkaPropertiesTest {
@Test
void shouldLoadProperties() {
- Map props = Map.of(
- "application.id", "appId"
- );
+ Map props = Map.of(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
kafkaProperties.setProperties(props);