Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.contrib.kafka;

import static java.util.Objects.requireNonNull;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
Expand All @@ -26,7 +28,7 @@

@ThreadSafe
@SuppressWarnings("FutureReturnValueIgnored")
public class KafkaSpanExporter implements SpanExporter {
public final class KafkaSpanExporter implements SpanExporter {
private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class);
private final String topicName;
private final Producer<String, Collection<SpanData>> producer;
Expand All @@ -43,9 +45,9 @@ public static KafkaSpanExporterBuilder newBuilder() {
Producer<String, Collection<SpanData>> producer,
ExecutorService executorService,
long timeoutInSeconds) {
this.topicName = topicName;
this.producer = producer;
this.executorService = executorService;
this.topicName = requireNonNull(topicName, "topicName");
this.producer = requireNonNull(producer, "producer");
this.executorService = requireNonNull(executorService, "executorService");
this.timeoutInSeconds = timeoutInSeconds;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

Expand All @@ -16,35 +17,36 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;

public class KafkaSpanExporterBuilder {
public final class KafkaSpanExporterBuilder {
private static final long DEFAULT_TIMEOUT_IN_SECONDS = 5L;
private String topicName;
private Producer<String, Collection<SpanData>> producer;
private ExecutorService executorService;
@Nullable private String topicName;
@Nullable private Producer<String, Collection<SpanData>> producer;
@Nullable private ExecutorService executorService;
private long timeoutInSeconds = DEFAULT_TIMEOUT_IN_SECONDS;

@SuppressWarnings(value = {"NullAway"})
public KafkaSpanExporterBuilder() {}

@CanIgnoreReturnValue
public KafkaSpanExporterBuilder setTopicName(String topicName) {
this.topicName = topicName;
this.topicName = requireNonNull(topicName, "topicName");
return this;
}

@CanIgnoreReturnValue
public KafkaSpanExporterBuilder setProducer(Producer<String, Collection<SpanData>> producer) {
this.producer = producer;
this.producer = requireNonNull(producer, "producer");
return this;
}

@CanIgnoreReturnValue
public KafkaSpanExporterBuilder setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
this.executorService = requireNonNull(executorService, "executorService");
return this;
}

Expand All @@ -67,10 +69,10 @@ public KafkaSpanExporter build() {
return new KafkaSpanExporter(topicName, producer, executorService, timeoutInSeconds);
}

public static class ProducerBuilder {
private Map<String, Object> config;
private Serializer<String> keySerializer;
private Serializer<Collection<SpanData>> valueSerializer;
public static final class ProducerBuilder {
@Nullable private Map<String, Object> config;
@Nullable private Serializer<String> keySerializer;
@Nullable private Serializer<Collection<SpanData>> valueSerializer;

public static ProducerBuilder newInstance() {
return new ProducerBuilder();
Expand All @@ -81,19 +83,19 @@ public ProducerBuilder() {}

@CanIgnoreReturnValue
public ProducerBuilder setConfig(Map<String, Object> config) {
this.config = config;
this.config = requireNonNull(config, "config");
return this;
}

@CanIgnoreReturnValue
public ProducerBuilder setKeySerializer(Serializer<String> keySerializer) {
this.keySerializer = keySerializer;
this.keySerializer = requireNonNull(keySerializer, "keySerializer");
return this;
}

@CanIgnoreReturnValue
public ProducerBuilder setValueSerializer(Serializer<Collection<SpanData>> valueSerializer) {
this.valueSerializer = valueSerializer;
this.valueSerializer = requireNonNull(valueSerializer, "valueSerializer");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class SpanDataDeserializer implements Deserializer<ExportTraceServiceRequest> {
public final class SpanDataDeserializer implements Deserializer<ExportTraceServiceRequest> {
@SuppressWarnings("NullAway")
@Override
public ExportTraceServiceRequest deserialize(String topic, byte[] data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.contrib.kafka;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
Expand All @@ -16,16 +17,13 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class SpanDataSerializer implements Serializer<Collection<SpanData>> {
public final class SpanDataSerializer implements Serializer<Collection<SpanData>> {
@Override
public byte[] serialize(String topic, Collection<SpanData> data) {
if (Objects.isNull(data)) {
throw new SerializationException("Cannot serialize null");
}
requireNonNull(data, "data");
return convertSpansToRequest(data).toByteArray();
}

Expand Down