From 38a17d13bfc02139fb2591274db5c5d003e150be Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 22 Sep 2023 09:50:59 -0400 Subject: [PATCH 1/2] GH-2817: KafkaTemplate: No double error timers Fixes https://github.com/spring-projects/spring-kafka/issues/2817 An observation in `KafkaTemplate` can be marked with error from a `Callback`. Then `Future` is evaluated and its exception is thrown back to the `observeSend()`. Here this exception is caught and reported to the observation again. This creates a second timer in the Micrometer, but with different error tag * Check for error presence in the `observeSend()` `catch` block and skip second report **Cherry-pick to `3.0.x`** --- .../kafka/core/KafkaTemplate.java | 9 ++-- .../support/micrometer/ObservationTests.java | 53 +++++++++++-------- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 3963efce75..80abb03113 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -756,8 +756,11 @@ private CompletableFuture> observeSend(final ProducerRecord> doSend(final ProducerRecord } Future sendFuture = producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation)); - // May be an immediate failure + // Maybe an immediate failure if (sendFuture.isDone()) { try { sendFuture.get(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 738b988289..6f5450e43e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.support.micrometer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; @@ -29,10 +30,28 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import io.micrometer.common.KeyValues; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.TraceContext; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; +import io.micrometer.tracing.test.simple.SimpleSpan; +import io.micrometer.tracing.test.simple.SimpleTracer; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.header.Headers; import org.junit.jupiter.api.Test; @@ -41,6 +60,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; @@ -61,28 +81,11 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import io.micrometer.common.KeyValues; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.micrometer.core.tck.MeterRegistryAssert; -import io.micrometer.observation.ObservationHandler; -import io.micrometer.observation.ObservationRegistry; -import io.micrometer.observation.tck.TestObservationRegistry; -import io.micrometer.tracing.Span; -import io.micrometer.tracing.TraceContext; -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.handler.DefaultTracingObservationHandler; -import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; -import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; -import io.micrometer.tracing.propagation.Propagator; -import io.micrometer.tracing.test.simple.SimpleSpan; -import io.micrometer.tracing.test.simple.SimpleTracer; - /** * @author Gary Russell - * @since 3.0 + * @author Artem Bilan * + * @since 3.0 */ @SpringJUnitConfig @EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" }) @@ -216,6 +219,14 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) .getPropertyValue(endpointRegistry.getListenerContainer("obs3"), "containers", List.class).get(0); cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class); assertThat(cAdmin).isSameAs(config.mockAdmin); + + assertThatExceptionOfType(KafkaException.class) + .isThrownBy(() -> template.send("wrong%Topic", "data")) + .withCauseExactlyInstanceOf(InvalidTopicException.class); + + MeterRegistryAssert.assertThat(meterRegistry) + .hasTimerWithNameAndTags("spring.kafka.template", KeyValues.of("error", "InvalidTopicException")) + .doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException")); } @Configuration @@ -235,7 +246,7 @@ KafkaAdmin admin(EmbeddedKafkaBroker broker) { @Bean ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + broker.getBrokersAsString()); return new DefaultKafkaProducerFactory<>(producerProps); } @@ -243,7 +254,7 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); - consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + broker.getBrokersAsString() + "," + broker.getBrokersAsString()); return new DefaultKafkaConsumerFactory<>(consumerProps); } From e5bd5a64e7d9ac02758a77f5b86cc53bdff61c70 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 22 Sep 2023 09:57:05 -0400 Subject: [PATCH 2/2] * Fix import order for Checkstyle rule --- .../support/micrometer/ObservationTests.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 6f5450e43e..d67396aa8b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -30,23 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import io.micrometer.common.KeyValues; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.micrometer.core.tck.MeterRegistryAssert; -import io.micrometer.observation.ObservationHandler; -import io.micrometer.observation.ObservationRegistry; -import io.micrometer.observation.tck.TestObservationRegistry; -import io.micrometer.tracing.Span; -import io.micrometer.tracing.TraceContext; -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.handler.DefaultTracingObservationHandler; -import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; -import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; -import io.micrometer.tracing.propagation.Propagator; -import io.micrometer.tracing.test.simple.SimpleSpan; -import io.micrometer.tracing.test.simple.SimpleTracer; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -81,6 +64,24 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import io.micrometer.common.KeyValues; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.TraceContext; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; +import io.micrometer.tracing.test.simple.SimpleSpan; +import io.micrometer.tracing.test.simple.SimpleTracer; + /** * @author Gary Russell * @author Artem Bilan