Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add b3 propagator for tracing #1546

Merged
merged 11 commits into from
Nov 26, 2021
Merged
3 changes: 2 additions & 1 deletion data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Lists of 157 third-party dependencies.
Lists of 158 third-party dependencies.
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.2.7 - http://logback.qos.ch/logback-classic)
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.2.7 - http://logback.qos.ch/logback-core)
(The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.11.4 - http://github.com/FasterXML/jackson)
Expand Down Expand Up @@ -88,6 +88,7 @@ Lists of 157 third-party dependencies.
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-context:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-exporter-logging:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-exporter-zipkin:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-extension-trace-propagators:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-sdk:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-sdk-common:1.9.1 - https://github.com/open-telemetry/opentelemetry-java)
(The Apache License, Version 2.0) OpenTelemetry Java (io.opentelemetry:opentelemetry-sdk-logs:1.9.1-alpha - https://github.com/open-telemetry/opentelemetry-java)
Expand Down
4 changes: 4 additions & 0 deletions data-plane/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-extension-trace-propagators</artifactId>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.resources.Resource;
Expand All @@ -30,14 +31,15 @@
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

Expand All @@ -57,8 +59,9 @@ public final class TracingConfig {
private final Backend backend;
private final String url;
private final float samplingRate;
private final HeadersFormat headersFormat;

TracingConfig(final Backend backend, final String url, final float samplingRate) {
TracingConfig(final Backend backend, final String url, final float samplingRate, final HeadersFormat headersFormat) {
if (!backend.equals(Backend.UNKNOWN) && !URI.create(url).isAbsolute()) {
throw new IllegalArgumentException(String.format(
"Backend is %s but the endpoint isn't an absolute URI: %s",
Expand All @@ -69,6 +72,7 @@ public final class TracingConfig {

this.backend = backend;
this.url = url;
this.headersFormat = headersFormat;
if (backend.equals(Backend.UNKNOWN)) {
this.samplingRate = 0F;
} else {
Expand All @@ -78,10 +82,11 @@ public final class TracingConfig {

public OpenTelemetrySdk setup() {
logger.info(
"Registering tracing configurations {} {} {}",
"Registering tracing configurations {} {} {} {}",
keyValue("backend", getBackend()),
keyValue("sampleRate", getSamplingRate()),
keyValue("loggingDebugEnabled", logger.isDebugEnabled())
keyValue("loggingDebugEnabled", logger.isDebugEnabled()),
keyValue("headersFormat", getHeadersFormat())
);

SdkTracerProviderBuilder tracerProviderBuilder = SdkTracerProvider.builder();
Expand Down Expand Up @@ -122,9 +127,13 @@ public OpenTelemetrySdk setup() {
sdkBuilder.setTracerProvider(
tracerProviderBuilder.build()
);
sdkBuilder.setPropagators(ContextPropagators.create(
W3CTraceContextPropagator.getInstance()
));

final var contextPropagators = switch (getHeadersFormat()) {
case B3_MULTI_HEADER -> ContextPropagators.create(B3Propagator.injectingMultiHeaders());
case B3_SINGLE_HEADER -> ContextPropagators.create(B3Propagator.injectingSingleHeader());
default -> ContextPropagators.create(W3CTraceContextPropagator.getInstance());
};
sdkBuilder.setPropagators(contextPropagators);

return sdkBuilder.buildAndRegisterGlobal();
}
Expand All @@ -141,12 +150,17 @@ float getSamplingRate() {
return samplingRate;
}

HeadersFormat getHeadersFormat() {
return headersFormat;
}

@Override
public String toString() {
return "TracingConfig{" +
"backend=" + backend +
", url='" + url + '\'' +
", samplingRate=" + samplingRate +
", headersFormat=" + headersFormat +
'}';
}

Expand All @@ -160,6 +174,10 @@ private static Path sampleRatePath(final String root) {
return pathOf(root, "sample-rate");
}

private static Path headersFormatPath(final String root) {
return pathOf(root, "headers-format");
}

private static SpanExporter zipkinExporter(TracingConfig tracingConfig) {
return ZipkinSpanExporter
.builder()
Expand Down Expand Up @@ -205,6 +223,10 @@ static Float SamplingRate(final InputStream in) throws IOException {
return Float.valueOf(s);
}

static HeadersFormat HeadersFormat(InputStream in) throws IOException {
return HeadersFormat.from(trim(in));
}

private static String trim(InputStream in) throws IOException {
return new String(in.readAllBytes()).trim();
}
Expand All @@ -213,19 +235,20 @@ private static String trim(InputStream in) throws IOException {
public static TracingConfig fromDir(final String path) throws IOException {
final var backendPath = backendPath(path);
if (!Files.exists(backendPath)) {
return new TracingConfig(Backend.UNKNOWN, null, 0);
return new TracingConfig(Backend.UNKNOWN, null, 0, HeadersFormat.W3C);
}

var sampleRate = 0F;
var backend = Backend.UNKNOWN;
var endpoint = "";
var headersFormat = HeadersFormat.W3C;

try (final var backendFile = new FileInputStream(backendPath.toString())) {
backend = Parser.backend(backendFile);
}

if (backend.equals(Backend.UNKNOWN)) {
return new TracingConfig(Backend.UNKNOWN, null, 0);
return new TracingConfig(Backend.UNKNOWN, null, 0, HeadersFormat.W3C);
}

final var sampleRatePath = sampleRatePath(path);
Expand All @@ -237,14 +260,25 @@ public static TracingConfig fromDir(final String path) throws IOException {

if (backend.equals(Backend.ZIPKIN)) {
final var zipkinPath = pathOf(path, "zipkin-endpoint");
final var headersFormatPath = headersFormatPath(path);
if (Files.exists(zipkinPath)) {
try (final var url = new FileInputStream(zipkinPath.toString())) {
endpoint = Parser.URL(url);
}
}
if (Files.exists(headersFormatPath)) {
try (final var headersFormatFile = new FileInputStream(headersFormatPath.toString())) {
final var parsed = Parser.HeadersFormat(headersFormatFile);
//B3 propagation is available when backend is zipkin only
if (parsed.equals(HeadersFormat.B3_MULTI_HEADER)
|| parsed.equals(HeadersFormat.B3_SINGLE_HEADER)) {
headersFormat = parsed;
}
}
}
}

return new TracingConfig(backend, endpoint, sampleRate);
return new TracingConfig(backend, endpoint, sampleRate, headersFormat);
}

// Backend definition
Expand All @@ -261,4 +295,18 @@ public static Backend from(final String s) {
}
}

enum HeadersFormat {
B3_MULTI_HEADER,
B3_SINGLE_HEADER,
W3C;

public static HeadersFormat from(final String s) {
return switch (s.trim().toLowerCase()) {
case "b3-multi-header" -> B3_MULTI_HEADER;
case "b3-single-header" -> B3_SINGLE_HEADER;
default -> W3C;
};
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
package dev.knative.eventing.kafka.broker.core.tracing;

import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig.Backend;
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig.HeadersFormat;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import org.junit.jupiter.api.Test;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

public class TracingConfigTest {
Expand All @@ -45,8 +45,46 @@ public void shouldParseConfigGivenDirectory() throws IOException {
assertThat(config.getBackend()).isEqualTo(Backend.ZIPKIN);
assertThat(config.getSamplingRate()).isEqualTo(0.1F);
assertThat(config.getUrl()).isEqualTo("http://localhost:9241/v2/api/spans/");
assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldParseConfigB3SingleHeaderGivenDirectory() throws IOException {

final var dir = Files.createTempDirectory("tracing");

write(dir, "/backend", " zipkin ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-single-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.ZIPKIN);
assertThat(config.getSamplingRate()).isEqualTo(0.1F);
assertThat(config.getUrl()).isEqualTo("http://localhost:9241/v2/api/spans/");
assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.B3_SINGLE_HEADER);
}

@Test
public void shouldParseConfigB3MultiHeaderGivenDirectory() throws IOException {

final var dir = Files.createTempDirectory("tracing");

write(dir, "/backend", " zipkin ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.ZIPKIN);
assertThat(config.getSamplingRate()).isEqualTo(0.1F);
assertThat(config.getUrl()).isEqualTo("http://localhost:9241/v2/api/spans/");
assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.B3_MULTI_HEADER);
}


@Test
public void shouldReturnUnknownBackendWhenBackendContainsUnknownName() throws IOException {

Expand All @@ -55,12 +93,36 @@ public void shouldReturnUnknownBackendWhenBackendContainsUnknownName() throws IO
write(dir, "/backend", " 1234 ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.UNKNOWN);
}

@Test
public void shouldReturnW3CHeadersFormatWhenBackendContainsUnknownName() throws IOException {
final var dir = Files.createTempDirectory("tracing");

write(dir, "/backend", " 1234 ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/headers-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldReturnW3CHeadersFormatWhenNoFilesArePresent() throws IOException {
final var dir = Files.createTempDirectory("tracing");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldReturnUnknownBackendWhenNoFilesArePresent() throws IOException {

Expand All @@ -71,6 +133,14 @@ public void shouldReturnUnknownBackendWhenNoFilesArePresent() throws IOException
assertThat(config.getBackend()).isEqualTo(Backend.UNKNOWN);
}

@Test
public void shouldReturnW3CHeadersFormatWhenDirectoryIsNotPresent() throws IOException {

final var config = TracingConfig.fromDir("/tmp/" + UUID.randomUUID().toString());

assertThat(config.getHeadersFormat()).isEqualTo(HeadersFormat.W3C);
}

@Test
public void shouldReturnUnknownBackendWhenDirectoryIsNotPresent() throws IOException {

Expand Down Expand Up @@ -105,7 +175,7 @@ public void shouldNotThrowWhenURLIsNotAbsoluteAndBackendUnknown() throws IOExcep

@Test
public void setupShouldNotFailWhenBackendIsUnknown() {
assertThatNoException().isThrownBy(() -> new TracingConfig(Backend.UNKNOWN, null, 0F).setup());
assertThatNoException().isThrownBy(() -> new TracingConfig(Backend.UNKNOWN, null, 0F, HeadersFormat.W3C).setup());
}

private static void write(final Path root, final String name, final String s) throws IOException {
Expand Down
6 changes: 6 additions & 0 deletions data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-extension-trace-propagators</artifactId>
<version>${opentelemetry.version}</version>
</dependency>

<!-- Kubernetes -->
<dependency>
<groupId>io.fabric8</groupId>
Expand Down