From 5870a471b57d16eaba99fbd3d2546701b90da43a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 24 Aug 2025 20:10:37 +0000 Subject: [PATCH 1/2] Initial plan From efdf186f2ca322cc4df7881d3e716e8fed9280f4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 24 Aug 2025 20:18:25 +0000 Subject: [PATCH 2/2] Changes before error encountered Co-authored-by: trask <218610+trask@users.noreply.github.com> --- .../contrib/kafka/KafkaSpanExporter.java | 10 ++++--- .../kafka/KafkaSpanExporterBuilder.java | 30 ++++++++++--------- .../contrib/kafka/SpanDataDeserializer.java | 2 +- .../contrib/kafka/SpanDataSerializer.java | 8 ++--- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporter.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporter.java index b5559832e..24a37e85b 100644 --- a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporter.java +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporter.java @@ -5,6 +5,8 @@ package io.opentelemetry.contrib.kafka; +import static java.util.Objects.requireNonNull; + import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -26,7 +28,7 @@ @ThreadSafe @SuppressWarnings("FutureReturnValueIgnored") -public class KafkaSpanExporter implements SpanExporter { +public final class KafkaSpanExporter implements SpanExporter { private static final Logger logger = LoggerFactory.getLogger(KafkaSpanExporter.class); private final String topicName; private final Producer> producer; @@ -43,9 +45,9 @@ public static KafkaSpanExporterBuilder newBuilder() { Producer> producer, ExecutorService executorService, long timeoutInSeconds) { - this.topicName = topicName; - this.producer = producer; - this.executorService = executorService; + this.topicName = requireNonNull(topicName, "topicName"); + this.producer = requireNonNull(producer, "producer"); + this.executorService = requireNonNull(executorService, "executorService"); this.timeoutInSeconds = timeoutInSeconds; } diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java index e6b49d6dc..303dd6aa1 100644 --- a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/KafkaSpanExporterBuilder.java @@ -7,6 +7,7 @@ import static java.util.Objects.isNull; import static java.util.Objects.nonNull; +import static java.util.Objects.requireNonNull; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; @@ -16,15 +17,16 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import javax.annotation.Nullable; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.Serializer; -public class KafkaSpanExporterBuilder { +public final class KafkaSpanExporterBuilder { private static final long DEFAULT_TIMEOUT_IN_SECONDS = 5L; - private String topicName; - private Producer> producer; - private ExecutorService executorService; + @Nullable private String topicName; + @Nullable private Producer> producer; + @Nullable private ExecutorService executorService; private long timeoutInSeconds = DEFAULT_TIMEOUT_IN_SECONDS; @SuppressWarnings(value = {"NullAway"}) @@ -32,19 +34,19 @@ public KafkaSpanExporterBuilder() {} @CanIgnoreReturnValue public KafkaSpanExporterBuilder setTopicName(String topicName) { - this.topicName = topicName; + this.topicName = requireNonNull(topicName, "topicName"); return this; } @CanIgnoreReturnValue public KafkaSpanExporterBuilder setProducer(Producer> producer) { - this.producer = producer; + this.producer = requireNonNull(producer, "producer"); return this; } @CanIgnoreReturnValue public KafkaSpanExporterBuilder setExecutorService(ExecutorService executorService) { - this.executorService = executorService; + this.executorService = requireNonNull(executorService, "executorService"); return this; } @@ -67,10 +69,10 @@ public KafkaSpanExporter build() { return new KafkaSpanExporter(topicName, producer, executorService, timeoutInSeconds); } - public static class ProducerBuilder { - private Map config; - private Serializer keySerializer; - private Serializer> valueSerializer; + public static final class ProducerBuilder { + @Nullable private Map config; + @Nullable private Serializer keySerializer; + @Nullable private Serializer> valueSerializer; public static ProducerBuilder newInstance() { return new ProducerBuilder(); @@ -81,19 +83,19 @@ public ProducerBuilder() {} @CanIgnoreReturnValue public ProducerBuilder setConfig(Map config) { - this.config = config; + this.config = requireNonNull(config, "config"); return this; } @CanIgnoreReturnValue public ProducerBuilder setKeySerializer(Serializer keySerializer) { - this.keySerializer = keySerializer; + this.keySerializer = requireNonNull(keySerializer, "keySerializer"); return this; } @CanIgnoreReturnValue public ProducerBuilder setValueSerializer(Serializer> valueSerializer) { - this.valueSerializer = valueSerializer; + this.valueSerializer = requireNonNull(valueSerializer, "valueSerializer"); return this; } diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataDeserializer.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataDeserializer.java index 4c5ff7112..9da3f266f 100644 --- a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataDeserializer.java +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataDeserializer.java @@ -11,7 +11,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; -public class SpanDataDeserializer implements Deserializer { +public final class SpanDataDeserializer implements Deserializer { @SuppressWarnings("NullAway") @Override public ExportTraceServiceRequest deserialize(String topic, byte[] data) { diff --git a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java index 4c689f16a..fb54f9ad0 100644 --- a/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java +++ b/kafka-exporter/src/main/java/io/opentelemetry/contrib/kafka/SpanDataSerializer.java @@ -5,6 +5,7 @@ package io.opentelemetry.contrib.kafka; +import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler; @@ -16,16 +17,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Objects; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; -public class SpanDataSerializer implements Serializer> { +public final class SpanDataSerializer implements Serializer> { @Override public byte[] serialize(String topic, Collection data) { - if (Objects.isNull(data)) { - throw new SerializationException("Cannot serialize null"); - } + requireNonNull(data, "data"); return convertSpansToRequest(data).toByteArray(); }