Skip to content

Commit

Permalink
Add OpenTelemetry integration (#161)
Browse files Browse the repository at this point in the history
* Ease OpenTelemetry Kafka Streams integration

* Add otel documentation

* Add test for otel

* add integration test for otel

* fix sonar
  • Loading branch information
loicgreffier committed Feb 14, 2024
1 parent fb999c3 commit 8fe84ee
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 20 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ need to do:
* [By Key and Value](#by-key-and-value)
* [By Predicate](#by-predicate)
* [Interactive Queries](#interactive-queries)
* [Open Telemetry](#open-telemetry)
* [Testing](#testing)
* [Motivation](#motivation)
* [Contribution](#contribution)
Expand Down Expand Up @@ -449,6 +450,26 @@ containers:
- If neither the variable environment nor the `MY_POD_IP` environment variable is set, Kstreamplify
sets `application.server` to the default value `localhost`.

### Open Telemetry

The Kstreamplify Spring Boot module simplifies the integration of [Open Telemetry](https://opentelemetry.io/) into your Kafka Streams application
by binding all the metrics of the Kafka Streams instance to the Spring Boot registry which is used by the Open Telemetry Java agent.

You can run your application with the Open Telemetry Java agent by including the following JVM options:

```shell
-javaagent:/opentelemetry-javaagent.jar -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.metrics.exporter=otlp
```

It also facilitates the addition of custom tags to the metrics, allowing you to use them to organize your metrics in your Grafana dashboard.

```shell
-Dotel.resource.attributes=environment=production,service.name=myNamespace,service.name=myKafkaStreams,category=orders
```

All the tags specified in the `otel.resource.attributes` property will be included in the metrics and can be observed in the logs
during the application startup.

### Testing

For testing, you can create a test class that extends `KafkaStreamsStarterTest` and override the `getKafkaStreamsStarter` method to return your `KafkaStreamsStarter` class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public void init(KafkaStreamsStarter streamsStarter) {

kafkaStreams = new KafkaStreams(topology, KafkaStreamsExecutionContext.getProperties());

registerMetrics(kafkaStreams);

kafkaStreamsStarter.onStart(kafkaStreams);

Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
Expand Down Expand Up @@ -128,8 +130,7 @@ private void initDlq() {
* Init the host information.
*/
private void initHostInfo() {
String ipEnvVarName =
(String) kafkaProperties.get(InitializerConstants.IP_SYSTEM_VARIABLE_PROPERTY);
String ipEnvVarName = (String) kafkaProperties.get(InitializerConstants.IP_SYSTEM_VARIABLE_PROPERTY);
if (StringUtils.isBlank(ipEnvVarName)) {
ipEnvVarName = InitializerConstants.IP_SYSTEM_VARIABLE_DEFAULT;
}
Expand Down Expand Up @@ -159,11 +160,8 @@ protected void initHttpServer() {
*/
protected void initProperties() {
properties = PropertiesUtils.loadProperties();

serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY);

kafkaProperties = PropertiesUtils.loadKafkaProperties(properties);

KafkaStreamsExecutionContext.registerProperties(kafkaProperties);
}

Expand Down Expand Up @@ -193,4 +191,11 @@ protected void onStateChange(KafkaStreams.State newState, KafkaStreams.State old
System.exit(3);
}
}

/**
* Register metrics.
*/
protected void registerMetrics(KafkaStreams kafkaStreams) {
// Nothing to do here
}
}
5 changes: 5 additions & 0 deletions kstreamplify-spring-boot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.properties.KafkaProperties;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
Expand All @@ -19,14 +21,19 @@
@Slf4j
@Component
@ConditionalOnBean(KafkaStreamsStarter.class)
public class SpringKafkaStreamsInitializer extends KafkaStreamsInitializer
implements ApplicationRunner {
public class SpringKafkaStreamsInitializer extends KafkaStreamsInitializer implements ApplicationRunner {
/**
* The application context.
*/
@Autowired
private ConfigurableApplicationContext applicationContext;

/**
* The meter registry.
*/
@Autowired
private MeterRegistry registry;

/**
* The server port.
*/
Expand Down Expand Up @@ -60,7 +67,7 @@ public void run(ApplicationArguments args) {
*/
@Override
protected void initHttpServer() {
// Nothing to do here, Spring Boot is running its own HTTP server
// Nothing to do here as the server is already started by Spring Boot
}

/**
Expand Down Expand Up @@ -93,14 +100,25 @@ protected void onStateChange(KafkaStreams.State newState, KafkaStreams.State old
}
}

/**
* {@inheritDoc}
*/
@Override
protected void registerMetrics(KafkaStreams kafkaStreams) {
// As the Kafka Streams metrics are not picked up by the OpenTelemetry Java agent automatically,
// register them manually to the Spring Boot registry as the agent will pick metrics up from there
KafkaStreamsMetrics kafkaStreamsMetrics = new KafkaStreamsMetrics(kafkaStreams);
kafkaStreamsMetrics.bindTo(registry);
}

/**
* Close the application context.
*/
private void closeApplicationContext() {
if (applicationContext != null) {
applicationContext.close();
} else {
log.warn("No Spring context set");
log.warn("Spring Boot context is not set, cannot close it");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.michelin.kstreamplify.opentelemetry;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

/**
* The OpenTelemetry configuration class.
*/
@Slf4j
@Getter
@Setter
@Configuration
public class OpenTelemetryConfig {
/**
* The OpenTelemetry resource attributes.
*/
@Value("${otel.resource.attributes:}")
private String otelResourceAttributes;

/**
* Register tags in Open Telemetry meter registry.
* It enables to add custom tags given in the property otel.resource.attributes
* to metrics.
*
* @return A meter registry customizer
*/
@Bean
@ConditionalOnProperty(value = "otel.resource.attributes")
public MeterRegistryCustomizer<MeterRegistry> addTagsOnMetrics() {
List<Tag> tags = StringUtils.hasText(otelResourceAttributes)
? Arrays.stream(otelResourceAttributes.split(","))
.map(resourceAttribute -> Tag.of(resourceAttribute.split("=")[0], resourceAttribute.split("=")[1]))
.toList() : Collections.emptyList();

return registry -> {
// Only add tags and Kafka metrics to Open Telemetry meter registry whose Java agent reads from it
if (registry.getClass().getName().contains("OpenTelemetryMeterRegistry")) {
log.info("Adding tags {} to registry {}",
tags.stream().map(tag -> tag.getKey() + "=" + tag.getValue()).toList(),
registry.getClass().getName());
registry.config().commonTags(tags);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
com.michelin.kstreamplify.initializer.SpringKafkaStreamsInitializer
com.michelin.kstreamplify.rest.SpringProbeController
com.michelin.kstreamplify.properties.KafkaProperties
com.michelin.kstreamplify.properties.KafkaProperties
com.michelin.kstreamplify.opentelemetry.OpenTelemetryConfig
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,21 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.properties.KafkaProperties;
import java.lang.reflect.Field;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.DefaultApplicationArguments;
import org.springframework.context.ConfigurableApplicationContext;

@ExtendWith(MockitoExtension.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.DEFINED_PORT;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -41,6 +43,9 @@ class SpringKafkaStreamsInitializerIntegrationTest {
@Autowired
private KafkaStreamsInitializer initializer;

@Autowired
private MeterRegistry registry;

@Autowired
private TestRestTemplate restTemplate;

Expand Down Expand Up @@ -89,7 +94,6 @@ void shouldInitAndRun() throws InterruptedException {
KafkaStreamsExecutionContext.getProperties().get("application.server"));

// Assert HTTP probes

ResponseEntity<Void> responseReady = restTemplate
.getForEntity("http://localhost:8081/ready", Void.class);

Expand All @@ -115,6 +119,32 @@ void shouldInitAndRun() throws InterruptedException {
""", responseTopology.getBody());
}

@Test
void shouldRegisterKafkaMetrics() throws InterruptedException {
waitingForKafkaStreamsToRun();

// Kafka Streams metrics are registered
assertFalse(registry.getMeters()
.stream()
.filter(metric -> metric.getId().getName().startsWith("kafka.stream"))
.toList()
.isEmpty());

// Kafka producer metrics are registered
assertFalse(registry.getMeters()
.stream()
.filter(metric -> metric.getId().getName().startsWith("kafka.producer"))
.toList()
.isEmpty());

// Kafka consumer metrics are registered
assertFalse(registry.getMeters()
.stream()
.filter(metric -> metric.getId().getName().startsWith("kafka.consumer"))
.toList()
.isEmpty());
}

private void waitingForKafkaStreamsToRun() throws InterruptedException {
while (!initializer.getKafkaStreams().state().equals(KafkaStreams.State.RUNNING)) {
log.info("Waiting for Kafka Streams to start...");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.michelin.kstreamplify.opentelemetry;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;

/**
* The OpenTelemetry configuration test class.
*/
@ExtendWith(MockitoExtension.class)
class OpenTelemetryConfigTest {
private final OpenTelemetryConfig openTelemetryConfig = new OpenTelemetryConfig();

@Test
void shouldAddTagsToMetricsWhenOpenTelemetryRegistry() {
openTelemetryConfig.setOtelResourceAttributes("tagName=tagValue,tagName2=tagValue2");
MeterRegistryCustomizer<MeterRegistry> customizer = openTelemetryConfig.addTagsOnMetrics();

MeterRegistry meterRegistry = new OpenTelemetryMeterRegistry();
customizer.customize(meterRegistry);
meterRegistry.counter("fakeCounterMetric");

assertEquals("fakeCounterMetric", meterRegistry.getMeters().get(0).getId().getName());
assertEquals(Tag.of("tagName", "tagValue"), meterRegistry.getMeters().get(0).getId().getTags().get(0));
assertEquals(Tag.of("tagName2", "tagValue2"), meterRegistry.getMeters().get(0).getId().getTags().get(1));
}

@Test
void shouldNotAddTagsToMetricsIfEmpty() {
MeterRegistryCustomizer<MeterRegistry> customizer = openTelemetryConfig.addTagsOnMetrics();

MeterRegistry meterRegistry = new OpenTelemetryMeterRegistry();
customizer.customize(meterRegistry);
meterRegistry.counter("fakeCounterMetric");

assertEquals("fakeCounterMetric", meterRegistry.getMeters().get(0).getId().getName());
assertTrue(meterRegistry.getMeters().get(0).getId().getTags().isEmpty());
}

@Test
void shouldNotAddTagsToMetricsWhenNotOpenTelemetryRegistry() {
openTelemetryConfig.setOtelResourceAttributes("tagName=tagValue,tagName2=tagValue2");
MeterRegistryCustomizer<MeterRegistry> customizer = openTelemetryConfig.addTagsOnMetrics();

MeterRegistry meterRegistry = new SimpleMeterRegistry();
customizer.customize(meterRegistry);
meterRegistry.counter("fakeCounterMetric");

assertEquals("fakeCounterMetric", meterRegistry.getMeters().get(0).getId().getName());
assertTrue(meterRegistry.getMeters().get(0).getId().getTags().isEmpty());
}

static class OpenTelemetryMeterRegistry extends SimpleMeterRegistry {
// Empty class to mock OpenTelemetryMeterRegistry
}
}

0 comments on commit 8fe84ee

Please sign in to comment.