Skip to content

Commit

Permalink
Introduce HTTP OTLP exporter
Browse files Browse the repository at this point in the history
The integration utilizes the HttpSender
interface introduced in OTel 1.28
and is based on the Vert.x HTTP Client
  • Loading branch information
geoand committed Aug 2, 2023
1 parent f78c8a9 commit 8465393
Show file tree
Hide file tree
Showing 25 changed files with 505 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public interface OtlpExporterTracesConfig {
/**
* OTLP defines the encoding of telemetry data and the protocol used to exchange data between the client and the
* server. Depending on the exporter, the available protocols will be different.
* <p>
* Currently, only {@code grpc} and {@code http} are allowed.
*/
@WithDefault(Protocol.HTTP_PROTOBUF)
@WithDefault(Protocol.GRPC)
Optional<String> protocol();

/**
Expand Down Expand Up @@ -89,9 +91,8 @@ interface TrustCert {
Optional<List<String>> certs();
}

public static class Protocol {
class Protocol {
public static final String GRPC = "grpc";
public static final String HTTP_PROTOBUF = "http/protobuf";
public static final String HTTP_JSON = "http/json";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.opentelemetry.runtime.exporter.otlp;

import java.io.ByteArrayOutputStream;

/**
* Used when we know that the stream will never be used again, therefore we can skip copying the data
* WARNING: This should only be used when we know that we will write at least this many bytes to the stream
*/
final class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
NonCopyingByteArrayOutputStream(int size) {
super(size);
}

@Override
public byte[] toByteArray() {
return buf;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.quarkus.opentelemetry.runtime.exporter.otlp;

import java.net.URI;
import java.util.Locale;

final class OtlpExporterUtil {

private OtlpExporterUtil() {
}

static int getPort(URI uri) {
int originalPort = uri.getPort();
if (originalPort > -1) {
return originalPort;
}

if (isHttps(uri)) {
return 443;
}
return 80;
}

static boolean isHttps(URI uri) {
return "https".equals(uri.getScheme().toLowerCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.opentelemetry.runtime.exporter.otlp;

import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig.DEFAULT_GRPC_BASE_URI;
import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig.Protocol.GRPC;
import static io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterTracesConfig.Protocol.HTTP_PROTOBUF;

import java.net.URI;
Expand All @@ -16,6 +17,8 @@

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessorBuilder;
Expand All @@ -41,12 +44,12 @@ public Function<SyntheticCreationalContext<LateBoundBatchSpanProcessor>, LateBou
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
TlsConfig tlsConfig, Supplier<Vertx> vertx) {
URI grpcBaseUri = getGrpcBaseUri(exporterRuntimeConfig); // do the creation and validation here in order to preserve backward compatibility
URI baseUri = getBaseUri(exporterRuntimeConfig); // do the creation and validation here in order to preserve backward compatibility
return new Function<>() {
@Override
public LateBoundBatchSpanProcessor apply(
SyntheticCreationalContext<LateBoundBatchSpanProcessor> context) {
if (otelRuntimeConfig.sdkDisabled() || grpcBaseUri == null) {
if (otelRuntimeConfig.sdkDisabled() || baseUri == null) {
return RemoveableLateBoundBatchSpanProcessor.INSTANCE;
}
// Only create the OtlpGrpcSpanExporter if an endpoint was set in runtime config and was properly validated at startup
Expand All @@ -57,7 +60,7 @@ public LateBoundBatchSpanProcessor apply(
}

try {
var spanExporter = createOtlpGrpcSpanExporter(exporterRuntimeConfig, vertx.get(), grpcBaseUri);
var spanExporter = createSpanExporter(exporterRuntimeConfig, vertx.get(), baseUri);

BatchSpanProcessorBuilder processorBuilder = BatchSpanProcessor.builder(spanExporter);

Expand All @@ -73,102 +76,95 @@ public LateBoundBatchSpanProcessor apply(
}
}

private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig,
Vertx vertx, final URI grpcBaseUri) {

private SpanExporter createSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig,
Vertx vertx, final URI baseUri) {
OtlpExporterTracesConfig tracesConfig = exporterRuntimeConfig.traces();
if (tracesConfig.protocol().isPresent()) {
if (!tracesConfig.protocol().get().equals(HTTP_PROTOBUF)) {
throw new IllegalStateException("Only the GRPC Exporter is currently supported. " +
"Please check `quarkus.otel.exporter.otlp.traces.protocol` property");
}
if (tracesConfig.protocol().isEmpty()) {
throw new IllegalStateException("No OTLP protocol specified. " +
"Please check `quarkus.otel.exporter.otlp.traces.protocol` property");
}

boolean compressionEnabled = false;
if (tracesConfig.compression().isPresent()) {
compressionEnabled = (tracesConfig.compression().get() == CompressionType.GZIP);
String protocol = tracesConfig.protocol().get();
if (GRPC.equals(protocol)) {
return createOtlpGrpcSpanExporter(exporterRuntimeConfig, vertx, baseUri);
} else if (HTTP_PROTOBUF.equals(protocol)) {
return createHttpSpanExporter(exporterRuntimeConfig, vertx, baseUri, protocol);
}

Map<String, String> headersMap = new HashMap<>();
OtlpUserAgent.addUserAgentHeader(headersMap::put);
if (tracesConfig.headers().isPresent()) {
List<String> headers = tracesConfig.headers().get();
if (!headers.isEmpty()) {
for (String header : headers) {
if (header.isEmpty()) {
continue;
}
String[] parts = header.split("=", 2);
String key = parts[0].trim();
String value = parts[1].trim();
headersMap.put(key, value);
}
}
}
throw new IllegalArgumentException(String.format("Unsupported OTLP protocol %s specified. " +
"Please check `quarkus.otel.exporter.otlp.traces.protocol` property", protocol));
}

private SpanExporter createOtlpGrpcSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig,
Vertx vertx, final URI baseUri) {

OtlpExporterTracesConfig tracesConfig = exporterRuntimeConfig.traces();

return new VertxGrpcExporter(
"otlp", // use the same as OTel does
"span", // use the same as OTel does
MeterProvider::noop,
grpcBaseUri,
compressionEnabled,
baseUri,
determineCompression(tracesConfig),
tracesConfig.timeout(),
headersMap,
new Consumer<>() {
@Override
public void accept(HttpClientOptions options) {
configureTLS(options);
}

private void configureTLS(HttpClientOptions options) {
// TODO: this can reuse existing stuff when https://github.com/quarkusio/quarkus/pull/33228 is in
options.setKeyCertOptions(toPemKeyCertOptions(tracesConfig));
options.setPemTrustOptions(toPemTrustOptions(tracesConfig));

if (VertxGrpcExporter.isHttps(grpcBaseUri)) {
options.setSsl(true);
options.setUseAlpn(true);
}
if (tlsConfig.trustAll) {
options.setTrustAll(true);
options.setVerifyHost(false);
}
}

private KeyCertOptions toPemKeyCertOptions(OtlpExporterTracesConfig configuration) {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions();
OtlpExporterTracesConfig.KeyCert keyCert = configuration.keyCert();
if (keyCert.certs().isPresent()) {
for (String cert : keyCert.certs().get()) {
pemKeyCertOptions.addCertPath(cert);
}
}
if (keyCert.keys().isPresent()) {
for (String cert : keyCert.keys().get()) {
pemKeyCertOptions.addKeyPath(cert);
}
}
return pemKeyCertOptions;
}

private PemTrustOptions toPemTrustOptions(OtlpExporterTracesConfig configuration) {
PemTrustOptions pemTrustOptions = new PemTrustOptions();
OtlpExporterTracesConfig.TrustCert trustCert = configuration.trustCert();
if (trustCert.certs().isPresent()) {
for (String cert : trustCert.certs().get()) {
pemTrustOptions.addCertPath(cert);
}
}
return pemTrustOptions;
}
},
populateTracingExportHttpHeaders(tracesConfig),
new HttpClientOptionsConsumer(tracesConfig, baseUri, tlsConfig),
vertx);

}

private SpanExporter createHttpSpanExporter(OtlpExporterRuntimeConfig exporterRuntimeConfig, Vertx vertx,
URI baseUri, String protocol) {

OtlpExporterTracesConfig tracesConfig = exporterRuntimeConfig.traces();

boolean exportAsJson = false; //TODO: this will be enhanced in the future

return new VertxHttpExporter(new HttpExporter<TraceRequestMarshaler>(
"otlp", // use the same as OTel does
"span", // use the same as OTel does
new VertxHttpExporter.VertxHttpSender(
baseUri,
determineCompression(tracesConfig),
tracesConfig.timeout(),
populateTracingExportHttpHeaders(tracesConfig),
exportAsJson ? "application/json" : "application/x-protobuf",
new HttpClientOptionsConsumer(tracesConfig, baseUri, tlsConfig),
vertx),
MeterProvider::noop,
exportAsJson));
}
};
}

private URI getGrpcBaseUri(OtlpExporterRuntimeConfig exporterRuntimeConfig) {
private static boolean determineCompression(OtlpExporterTracesConfig tracesConfig) {
if (tracesConfig.compression().isPresent()) {
return (tracesConfig.compression().get() == CompressionType.GZIP);
}
return false;
}

private static Map<String, String> populateTracingExportHttpHeaders(OtlpExporterTracesConfig tracesConfig) {
Map<String, String> headersMap = new HashMap<>();
OtlpUserAgent.addUserAgentHeader(headersMap::put);
if (tracesConfig.headers().isPresent()) {
List<String> headers = tracesConfig.headers().get();
if (!headers.isEmpty()) {
for (String header : headers) {
if (header.isEmpty()) {
continue;
}
String[] parts = header.split("=", 2);
String key = parts[0].trim();
String value = parts[1].trim();
headersMap.put(key, value);
}
}
}
return headersMap;
}

private URI getBaseUri(OtlpExporterRuntimeConfig exporterRuntimeConfig) {
String endpoint = resolveEndpoint(exporterRuntimeConfig).trim();
if (endpoint.isEmpty()) {
return null;
Expand All @@ -191,4 +187,62 @@ private static boolean excludeDefaultEndpoint(String endpoint) {
return !DEFAULT_GRPC_BASE_URI.equals(endpoint);
}

private static class HttpClientOptionsConsumer implements Consumer<HttpClientOptions> {
private final OtlpExporterTracesConfig tracesConfig;
private final URI baseUri;
private final TlsConfig tlsConfig;

public HttpClientOptionsConsumer(OtlpExporterTracesConfig tracesConfig, URI baseUri, TlsConfig tlsConfig) {
this.tracesConfig = tracesConfig;
this.baseUri = baseUri;
this.tlsConfig = tlsConfig;
}

@Override
public void accept(HttpClientOptions options) {
configureTLS(options);
}

private void configureTLS(HttpClientOptions options) {
// TODO: this can reuse existing stuff when https://github.com/quarkusio/quarkus/pull/33228 is in
options.setKeyCertOptions(toPemKeyCertOptions());
options.setPemTrustOptions(toPemTrustOptions());

if (OtlpExporterUtil.isHttps(baseUri)) {
options.setSsl(true);
options.setUseAlpn(true);
}
if (tlsConfig.trustAll) {
options.setTrustAll(true);
options.setVerifyHost(false);
}
}

private KeyCertOptions toPemKeyCertOptions() {
PemKeyCertOptions pemKeyCertOptions = new PemKeyCertOptions();
OtlpExporterTracesConfig.KeyCert keyCert = tracesConfig.keyCert();
if (keyCert.certs().isPresent()) {
for (String cert : keyCert.certs().get()) {
pemKeyCertOptions.addCertPath(cert);
}
}
if (keyCert.keys().isPresent()) {
for (String cert : keyCert.keys().get()) {
pemKeyCertOptions.addKeyPath(cert);
}
}
return pemKeyCertOptions;
}

private PemTrustOptions toPemTrustOptions() {
PemTrustOptions pemTrustOptions = new PemTrustOptions();
OtlpExporterTracesConfig.TrustCert trustCert = tracesConfig.trustCert();
if (trustCert.certs().isPresent()) {
for (String cert : trustCert.certs().get()) {
pemTrustOptions.addCertPath(cert);
}
}
return pemTrustOptions;
}
}
}

0 comments on commit 8465393

Please sign in to comment.