diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 3963efce75..80abb03113 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -756,8 +756,11 @@ private CompletableFuture> observeSend(final ProducerRecord> doSend(final ProducerRecord } Future sendFuture = producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample, observation)); - // May be an immediate failure + // Maybe an immediate failure if (sendFuture.isDone()) { try { sendFuture.get(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 738b988289..d67396aa8b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.support.micrometer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; @@ -33,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.header.Headers; import org.junit.jupiter.api.Test; @@ -41,6 +43,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; @@ -81,8 +84,9 @@ /** * @author Gary Russell - * @since 3.0 + * @author Artem Bilan * + * @since 3.0 */ @SpringJUnitConfig @EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" }) @@ -216,6 +220,14 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) .getPropertyValue(endpointRegistry.getListenerContainer("obs3"), "containers", List.class).get(0); cAdmin = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class); assertThat(cAdmin).isSameAs(config.mockAdmin); + + assertThatExceptionOfType(KafkaException.class) + .isThrownBy(() -> template.send("wrong%Topic", "data")) + .withCauseExactlyInstanceOf(InvalidTopicException.class); + + MeterRegistryAssert.assertThat(meterRegistry) + .hasTimerWithNameAndTags("spring.kafka.template", KeyValues.of("error", "InvalidTopicException")) + .doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException")); } @Configuration @@ -235,7 +247,7 @@ KafkaAdmin admin(EmbeddedKafkaBroker broker) { @Bean ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + broker.getBrokersAsString()); return new DefaultKafkaProducerFactory<>(producerProps); } @@ -243,7 +255,7 @@ ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { @Bean ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { Map consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); - consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," + broker.getBrokersAsString() + "," + broker.getBrokersAsString()); return new DefaultKafkaConsumerFactory<>(consumerProps); }