Skip to content

Commit

Permalink
add b3 propagator for tracing (#1546)
Browse files Browse the repository at this point in the history
* add b3 support.

* add unit test

* Update data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/TracingConfig.java

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* modify headsFormat to headersFormat, and add 2 more tests for b3 headers-format.

* typo of dir name in some tests.

* Update data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/tracing/TracingConfigTest.java

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Update data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/tracing/TracingConfigTest.java

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Update data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/tracing/TracingConfigTest.java

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* rename more variable names from 'heads' to 'headers'

* regenerate the file with third-party licenses

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
snowwolf007cn and pierDipi committed Nov 26, 2021
1 parent 1ae3943 commit b1a1878
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 17 deletions.
3 changes: 2 additions & 1 deletion data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Lists of 157 third-party dependencies.
Lists of 158 third-party dependencies.
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.2.7 - http://logback.qos.ch/logback-classic)
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.2.7 - http://logback.qos.ch/logback-core)
(The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.11.4 - http://github.com/FasterXML/jackson)
Expand Down Expand Up @@ -88,6 +88,7 @@ Lists of 157 third-party dependencies.
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-context:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-exporter-logging:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-exporter-zipkin:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-extension-trace-propagators:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-sdk:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-sdk-common:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-sdk-logs:1.9.1-alpha - https://github.com/open-telemetry/opentelemetry-java)
Expand Down
4 changes: 4 additions & 0 deletions data-plane/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-extension-trace-propagators</artifactId>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.resources.Resource;
Expand All @@ -30,14 +31,15 @@
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

Expand All @@ -57,8 +59,9 @@ public final class TracingConfig {
private final Backend backend;
private final String url;
private final float samplingRate;
private final HeadersFormat headersFormat;

TracingConfig(final Backend backend, final String url, final float samplingRate) {
TracingConfig(final Backend backend, final String url, final float samplingRate, final HeadersFormat headersFormat) {
if (!backend.equals(Backend.UNKNOWN) && !URI.create(url).isAbsolute()) {
throw new IllegalArgumentException(String.format(
"Backend is %s but the endpoint isn't an absolute URI: %s",
Expand All @@ -69,6 +72,7 @@ public final class TracingConfig {

this.backend = backend;
this.url = url;
this.headersFormat = headersFormat;
if (backend.equals(Backend.UNKNOWN)) {
this.samplingRate = 0F;
} else {
Expand All @@ -78,10 +82,11 @@ public final class TracingConfig {

public OpenTelemetrySdk setup() {
logger.info(
"Registering tracing configurations {} {} {}",
"Registering tracing configurations {} {} {} {}",
keyValue("backend", getBackend()),
keyValue("sampleRate", getSamplingRate()),
keyValue("loggingDebugEnabled", logger.isDebugEnabled())
keyValue("loggingDebugEnabled", logger.isDebugEnabled()),
keyValue("headersFormat", getHeadersFormat())
);

SdkTracerProviderBuilder tracerProviderBuilder = SdkTracerProvider.builder();
Expand Down Expand Up @@ -122,9 +127,13 @@ public OpenTelemetrySdk setup() {
sdkBuilder.setTracerProvider(
tracerProviderBuilder.build()
);
sdkBuilder.setPropagators(ContextPropagators.create(
W3CTraceContextPropagator.getInstance()
));

final var contextPropagators = switch (getHeadersFormat()) {
case B3_MULTI_HEADER -> ContextPropagators.create(B3Propagator.injectingMultiHeaders());
case B3_SINGLE_HEADER -> ContextPropagators.create(B3Propagator.injectingSingleHeader());
default -> ContextPropagators.create(W3CTraceContextPropagator.getInstance());
};
sdkBuilder.setPropagators(contextPropagators);

return sdkBuilder.buildAndRegisterGlobal();
}
Expand All @@ -141,12 +150,17 @@ float getSamplingRate() {
return samplingRate;
}

HeadersFormat getHeadersFormat() {
return headersFormat;
}

@Override
public String toString() {
return "TracingConfig{" +
"backend=" + backend +
", url='" + url + '\'' +
", samplingRate=" + samplingRate +
", headersFormat=" + headersFormat +
'}';
}

Expand All @@ -160,6 +174,10 @@ private static Path sampleRatePath(final String root) {
return pathOf(root, "sample-rate");
}

private static Path headersFormatPath(final String root) {
return pathOf(root, "headers-format");
}

private static SpanExporter zipkinExporter(TracingConfig tracingConfig) {
return ZipkinSpanExporter
.builder()
Expand Down Expand Up @@ -205,6 +223,10 @@ static Float SamplingRate(final InputStream in) throws IOException {
return Float.valueOf(s);
}

static HeadersFormat HeadersFormat(InputStream in) throws IOException {
return HeadersFormat.from(trim(in));
}

private static String trim(InputStream in) throws IOException {
return new String(in.readAllBytes()).trim();
}
Expand All @@ -213,19 +235,20 @@ private static String trim(InputStream in) throws IOException {
public static TracingConfig fromDir(final String path) throws IOException {
final var backendPath = backendPath(path);
if (!Files.exists(backendPath)) {
return new TracingConfig(Backend.UNKNOWN, null, 0);
return new TracingConfig(Backend.UNKNOWN, null, 0, HeadersFormat.W3C);
}

var sampleRate = 0F;
var backend = Backend.UNKNOWN;
var endpoint = "";
var headersFormat = HeadersFormat.W3C;

try (final var backendFile = new FileInputStream(backendPath.toString())) {
backend = Parser.backend(backendFile);
}

if (backend.equals(Backend.UNKNOWN)) {
return new TracingConfig(Backend.UNKNOWN, null, 0);
return new TracingConfig(Backend.UNKNOWN, null, 0, HeadersFormat.W3C);
}

final var sampleRatePath = sampleRatePath(path);
Expand All @@ -237,14 +260,25 @@ public static TracingConfig fromDir(final String path) throws IOException {

if (backend.equals(Backend.ZIPKIN)) {
final var zipkinPath = pathOf(path, "zipkin-endpoint");
final var headersFormatPath = headersFormatPath(path);
if (Files.exists(zipkinPath)) {
try (final var url = new FileInputStream(zipkinPath.toString())) {
endpoint = Parser.URL(url);
}
}
if (Files.exists(headersFormatPath)) {
try (final var headersFormatFile = new FileInputStream(headersFormatPath.toString())) {
final var parsed = Parser.HeadersFormat(headersFormatFile);
//B3 propagation is available when backend is zipkin only
if (parsed.equals(HeadersFormat.B3_MULTI_HEADER)
|| parsed.equals(HeadersFormat.B3_SINGLE_HEADER)) {
headersFormat = parsed;
}
}
}
}

return new TracingConfig(backend, endpoint, sampleRate);
return new TracingConfig(backend, endpoint, sampleRate, headersFormat);
}

// Backend definition
Expand All @@ -261,4 +295,18 @@ public static Backend from(final String s) {
}
}

enum HeadersFormat {
B3_MULTI_HEADER,
B3_SINGLE_HEADER,
W3C;

public static HeadersFormat from(final String s) {
return switch (s.trim().toLowerCase()) {
case "b3-multi-header" -> B3_MULTI_HEADER;
case "b3-single-header" -> B3_SINGLE_HEADER;
default -> W3C;
};
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
package dev.knative.eventing.kafka.broker.core.tracing;

import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig.Backend;
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig.HeadersFormat;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import org.junit.jupiter.api.Test;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

public class TracingConfigTest {
Expand All @@ -45,8 +45,46 @@ public void shouldParseConfigGivenDirectory() throws IOException {
assertThat(config.getBackend()).isEqualTo(Backend.ZIPKIN);
assertThat(config.getSamplingRate()).isEqualTo(0.1F);
assertThat(config.getUrl()).isEqualTo("http://localhost:9241/v2/api/spans/");
assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldParseConfigB3SingleHeaderGivenDirectory() throws IOException {

final var dir = Files.createTempDirectory("tracing");

write(dir, "/backend", " zipkin ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-single-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.ZIPKIN);
assertThat(config.getSamplingRate()).isEqualTo(0.1F);
assertThat(config.getUrl()).isEqualTo("http://localhost:9241/v2/api/spans/");
assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.B3_SINGLE_HEADER);
}

@Test
public void shouldParseConfigB3MultiHeaderGivenDirectory() throws IOException {

final var dir = Files.createTempDirectory("tracing");

write(dir, "/backend", " zipkin ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.ZIPKIN);
assertThat(config.getSamplingRate()).isEqualTo(0.1F);
assertThat(config.getUrl()).isEqualTo("http://localhost:9241/v2/api/spans/");
assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.B3_MULTI_HEADER);
}


@Test
public void shouldReturnUnknownBackendWhenBackendContainsUnknownName() throws IOException {

Expand All @@ -55,12 +93,36 @@ public void shouldReturnUnknownBackendWhenBackendContainsUnknownName() throws IO
write(dir, "/backend", " 1234 ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.UNKNOWN);
}

@Test
public void shouldReturnW3CHeadersFormatWhenBackendContainsUnknownName() throws IOException {
final var dir = Files.createTempDirectory("tracing");

write(dir, "/backend", " 1234 ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldReturnW3CHeadersFormatWhenNoFilesArePresent() throws IOException {
final var dir = Files.createTempDirectory("tracing");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldReturnUnknownBackendWhenNoFilesArePresent() throws IOException {

Expand All @@ -71,6 +133,14 @@ public void shouldReturnUnknownBackendWhenNoFilesArePresent() throws IOException
assertThat(config.getBackend()).isEqualTo(Backend.UNKNOWN);
}

@Test
public void shouldReturnW3CHeadersFormatWhenDirectoryIsNotPresent() throws IOException {

final var config = TracingConfig.fromDir("/tmp/" + UUID.randomUUID().toString());

assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldReturnUnknownBackendWhenDirectoryIsNotPresent() throws IOException {

Expand Down Expand Up @@ -105,7 +175,7 @@ public void shouldNotThrowWhenURLIsNotAbsoluteAndBackendUnknown() throws IOExcep

@Test
public void setupShouldNotFailWhenBackendIsUnknown() {
assertThatNoException().isThrownBy(() -> new TracingConfig(Backend.UNKNOWN, null, 0F).setup());
assertThatNoException().isThrownBy(() -> new TracingConfig(Backend.UNKNOWN, null, 0F, HeadersFormat.W3C).setup());
}

private static void write(final Path root, final String name, final String s) throws IOException {
Expand Down
6 changes: 6 additions & 0 deletions data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-extension-trace-propagators</artifactId>
<version>${opentelemetry.version}</version>
</dependency>

<!-- Kubernetes -->
<dependency>
<groupId>io.fabric8</groupId>
Expand Down

0 comments on commit b1a1878

Please sign in to comment.