Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add b3 propagator for tracing #1546

Merged
merged 11 commits into from
Nov 26, 2021
Merged
4 changes: 4 additions & 0 deletions data-plane/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-extension-trace-propagators</artifactId>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.extension.trace.propagation.B3Propagator;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.resources.Resource;
Expand All @@ -30,14 +31,15 @@
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

Expand All @@ -57,8 +59,9 @@ public final class TracingConfig {
private final Backend backend;
private final String url;
private final float samplingRate;
private final HeadsFormat headsFormat;

TracingConfig(final Backend backend, final String url, final float samplingRate) {
TracingConfig(final Backend backend, final String url, final float samplingRate, final HeadsFormat headsFormat) {
if (!backend.equals(Backend.UNKNOWN) && !URI.create(url).isAbsolute()) {
throw new IllegalArgumentException(String.format(
"Backend is %s but the endpoint isn't an absolute URI: %s",
Expand All @@ -69,6 +72,7 @@ public final class TracingConfig {

this.backend = backend;
this.url = url;
this.headsFormat = headsFormat;
if (backend.equals(Backend.UNKNOWN)) {
this.samplingRate = 0F;
} else {
Expand All @@ -78,10 +82,11 @@ public final class TracingConfig {

public OpenTelemetrySdk setup() {
logger.info(
"Registering tracing configurations {} {} {}",
"Registering tracing configurations {} {} {} {}",
keyValue("backend", getBackend()),
keyValue("sampleRate", getSamplingRate()),
keyValue("loggingDebugEnabled", logger.isDebugEnabled())
keyValue("loggingDebugEnabled", logger.isDebugEnabled()),
keyValue("headsFormat", getHeadsFormat())
);

SdkTracerProviderBuilder tracerProviderBuilder = SdkTracerProvider.builder();
Expand Down Expand Up @@ -122,9 +127,14 @@ public OpenTelemetrySdk setup() {
sdkBuilder.setTracerProvider(
tracerProviderBuilder.build()
);
sdkBuilder.setPropagators(ContextPropagators.create(
W3CTraceContextPropagator.getInstance()
));

ContextPropagators contextPropagators;
switch (getHeadsFormat()) {
case B3_MULTI_HEADER -> contextPropagators = ContextPropagators.create(B3Propagator.injectingMultiHeaders());
case B3_SINGLE_HEADER -> contextPropagators = ContextPropagators.create(B3Propagator.injectingSingleHeader());
default -> contextPropagators = ContextPropagators.create(W3CTraceContextPropagator.getInstance());
}
snowwolf007cn marked this conversation as resolved.
Show resolved Hide resolved
sdkBuilder.setPropagators(contextPropagators);

return sdkBuilder.buildAndRegisterGlobal();
}
Expand All @@ -141,12 +151,17 @@ float getSamplingRate() {
return samplingRate;
}

HeadsFormat getHeadsFormat() {
return headsFormat;
}

@Override
public String toString() {
return "TracingConfig{" +
"backend=" + backend +
", url='" + url + '\'' +
", samplingRate=" + samplingRate +
", headsFormat=" + headsFormat +
'}';
}

Expand All @@ -160,6 +175,10 @@ private static Path sampleRatePath(final String root) {
return pathOf(root, "sample-rate");
}

private static Path headsFormatPath(final String root) {
return pathOf(root, "heads-format");
}

private static SpanExporter zipkinExporter(TracingConfig tracingConfig) {
return ZipkinSpanExporter
.builder()
Expand Down Expand Up @@ -205,6 +224,10 @@ static Float SamplingRate(final InputStream in) throws IOException {
return Float.valueOf(s);
}

static HeadsFormat HeadsFormat(InputStream in) throws IOException {
return HeadsFormat.from(trim(in));
}

private static String trim(InputStream in) throws IOException {
return new String(in.readAllBytes()).trim();
}
Expand All @@ -213,19 +236,20 @@ private static String trim(InputStream in) throws IOException {
public static TracingConfig fromDir(final String path) throws IOException {
final var backendPath = backendPath(path);
if (!Files.exists(backendPath)) {
return new TracingConfig(Backend.UNKNOWN, null, 0);
return new TracingConfig(Backend.UNKNOWN, null, 0, HeadsFormat.W3C);
}

var sampleRate = 0F;
var backend = Backend.UNKNOWN;
var endpoint = "";
var headsFormat = HeadsFormat.W3C;

try (final var backendFile = new FileInputStream(backendPath.toString())) {
backend = Parser.backend(backendFile);
}

if (backend.equals(Backend.UNKNOWN)) {
return new TracingConfig(Backend.UNKNOWN, null, 0);
return new TracingConfig(Backend.UNKNOWN, null, 0, HeadsFormat.W3C);
}

final var sampleRatePath = sampleRatePath(path);
Expand All @@ -237,14 +261,25 @@ public static TracingConfig fromDir(final String path) throws IOException {

if (backend.equals(Backend.ZIPKIN)) {
final var zipkinPath = pathOf(path, "zipkin-endpoint");
final var headsFormatPath = headsFormatPath(path);
if (Files.exists(zipkinPath)) {
try (final var url = new FileInputStream(zipkinPath.toString())) {
endpoint = Parser.URL(url);
}
}
if (Files.exists(headsFormatPath)) {
try (final var headsFormatFile = new FileInputStream(headsFormatPath.toString())) {
pierDipi marked this conversation as resolved.
Show resolved Hide resolved
final var parsed = Parser.HeadsFormat(headsFormatFile);
//B3 propagation is available when backend is zipkin only
if (parsed.equals(HeadsFormat.B3_MULTI_HEADER)
|| parsed.equals(HeadsFormat.B3_SINGLE_HEADER)) {
headsFormat = parsed;
}
}
}
}

return new TracingConfig(backend, endpoint, sampleRate);
return new TracingConfig(backend, endpoint, sampleRate, headsFormat);
}

// Backend definition
Expand All @@ -261,4 +296,18 @@ public static Backend from(final String s) {
}
}

enum HeadsFormat {
B3_MULTI_HEADER,
B3_SINGLE_HEADER,
W3C;

public static HeadsFormat from(final String s) {
return switch (s.trim().toLowerCase()) {
case "b3-multi-header" -> B3_MULTI_HEADER;
case "b3-single-header" -> B3_SINGLE_HEADER;
default -> W3C;
};
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
package dev.knative.eventing.kafka.broker.core.tracing;

import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig.Backend;
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig.HeadsFormat;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import org.junit.jupiter.api.Test;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

public class TracingConfigTest {
Expand All @@ -39,12 +39,14 @@ public void shouldParseConfigGivenDirectory() throws IOException {
write(dir, "/backend", " zipkin ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/heads-format", " b3-multi-header ");
snowwolf007cn marked this conversation as resolved.
Show resolved Hide resolved

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.ZIPKIN);
assertThat(config.getSamplingRate()).isEqualTo(0.1F);
assertThat(config.getUrl()).isEqualTo("http://localhost:9241/v2/api/spans/");
assertThat(config.getHeadsFormat()).isEqualTo(HeadsFormat.B3_MULTI_HEADER);
}

@Test
Expand All @@ -55,12 +57,36 @@ public void shouldReturnUnknownBackendWhenBackendContainsUnknownName() throws IO
write(dir, "/backend", " 1234 ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/heads-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getBackend()).isEqualTo(Backend.UNKNOWN);
}

@Test
public void shouldReturnW3CHeadsFormatWhenBackendContainsUnknownName() throws IOException {
snowwolf007cn marked this conversation as resolved.
Show resolved Hide resolved
final var dir = Files.createTempDirectory("tracing");

write(dir, "/backend", " 1234 ");
write(dir, "/sample-rate", " 0.1 ");
write(dir, "/zipkin-endpoint", " http://localhost:9241/v2/api/spans/ ");
write(dir, "/heads-format", " b3-multi-header ");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getHeadsFormat()).isEqualTo(HeadsFormat.W3C);
}

@Test
public void shouldReturnW3CHeadsFormatWhenNoFilesArePresent() throws IOException {
snowwolf007cn marked this conversation as resolved.
Show resolved Hide resolved
final var dir = Files.createTempDirectory("tracing");

final var config = TracingConfig.fromDir(dir.toAbsolutePath().toString());

assertThat(config.getHeadsFormat()).isEqualTo(HeadsFormat.W3C);
}

@Test
public void shouldReturnUnknownBackendWhenNoFilesArePresent() throws IOException {

Expand All @@ -71,6 +97,14 @@ public void shouldReturnUnknownBackendWhenNoFilesArePresent() throws IOException
assertThat(config.getBackend()).isEqualTo(Backend.UNKNOWN);
}

@Test
public void shouldReturnW3CHeadsFormatWhenDirectoryIsNotPresent() throws IOException {
snowwolf007cn marked this conversation as resolved.
Show resolved Hide resolved

final var config = TracingConfig.fromDir("/tmp/" + UUID.randomUUID().toString());

assertThat(config.getHeadsFormat()).isEqualTo(HeadsFormat.W3C);
}

@Test
public void shouldReturnUnknownBackendWhenDirectoryIsNotPresent() throws IOException {

Expand Down Expand Up @@ -105,7 +139,7 @@ public void shouldNotThrowWhenURLIsNotAbsoluteAndBackendUnknown() throws IOExcep

@Test
public void setupShouldNotFailWhenBackendIsUnknown() {
assertThatNoException().isThrownBy(() -> new TracingConfig(Backend.UNKNOWN, null, 0F).setup());
assertThatNoException().isThrownBy(() -> new TracingConfig(Backend.UNKNOWN, null, 0F, HeadsFormat.W3C).setup());
}

private static void write(final Path root, final String name, final String s) throws IOException {
Expand Down
6 changes: 6 additions & 0 deletions data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-extension-trace-propagators</artifactId>
<version>${opentelemetry.version}</version>
</dependency>

<!-- Kubernetes -->
<dependency>
<groupId>io.fabric8</groupId>
Expand Down