From 790f80638208b44916fd110650ca20987cab0c8e Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 7 Apr 2020 11:51:00 -0400 Subject: [PATCH 1/2] GH-1437: Check for immediate failure on send Resolves https://github.com/spring-projects/spring-kafka/issues/1437 The future returned by `Producer.send()` may have been immediately completed with an exception; check if `future.isDone()` and call `get()` so that any such exception is propagated to the caller. **I will perform backports after review/merge.** --- .../kafka/core/KafkaTemplate.java | 20 ++++++++++++++++++- .../kafka/core/KafkaTemplateTests.java | 16 +++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index f5f0726c2f..098d4f4227 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -20,12 +20,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; @@ -38,6 +41,7 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextStoppedEvent; import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.LoggingProducerListener; @@ -535,7 +539,21 @@ protected ListenableFuture> doSend(final ProducerRecord p if (this.micrometerHolder != null) { sample = this.micrometerHolder.start(); } - producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)); + Future sendFuture = + producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)); + // May be an immediate failure + if (sendFuture.isDone()) { + try { + sendFuture.get(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new KafkaException("Interrupted", e); + } + catch (ExecutionException e) { + throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace + } + } if (this.autoFlush) { flush(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index b59caec7cc..05bcc57e31 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.allOf; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.Mockito.mock; import static org.springframework.kafka.test.assertj.KafkaConditions.key; import static org.springframework.kafka.test.assertj.KafkaConditions.keyValue; @@ -46,6 +47,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -55,6 +57,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.CompositeProducerListener; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; @@ -386,4 +389,17 @@ void testConfigOverridesWithSerializers() { assertThat(template.getProducerFactory().getValueSerializerSupplier()).isSameAs(valueSerializer); } + @Test + void testFutureFailureOnSend() { + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + senderProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); + KafkaTemplate template = new KafkaTemplate<>(pf, true); + + assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + template.send("missing.topic", "foo")) + .withCauseExactlyInstanceOf(TimeoutException.class); + pf.destroy(); + } + } From 34514125fc3ef66bd97092043874faeef16da3b0 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 7 Apr 2020 13:15:30 -0400 Subject: [PATCH 2/2] Fix mock tests. --- .../DefaultKafkaProducerFactoryTests.java | 5 ++- .../core/KafkaTemplateTransactionTests.java | 34 ++++++++++++------- .../kafka/core/RoutingKafkaTemplateTests.java | 3 ++ .../listener/TransactionalContainerTests.java | 3 ++ .../ReplyingKafkaTemplateTests.java | 3 +- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index ab401cd410..bd1b09170c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.inOrder; @@ -45,6 +46,7 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.transaction.CannotCreateTransactionException; import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -57,6 +59,7 @@ public class DefaultKafkaProducerFactoryTests { @Test void testProducerClosedAfterBadTransition() throws Exception { final Producer producer = mock(Producer.class); + given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 129879f31e..f9db7a0355 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -77,6 +77,7 @@ import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.TransactionTemplate; +import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -312,8 +313,10 @@ public void testTransactionSynchronizationExceptionOnCommit() { public void testDeadLetterPublisherWhileTransactionActive() { @SuppressWarnings("unchecked") Producer producer1 = mock(Producer.class); + given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>()); @SuppressWarnings("unchecked") Producer producer2 = mock(Producer.class); + given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>()); producer1.initTransactions(); @SuppressWarnings("unchecked") @@ -335,6 +338,7 @@ public void testDeadLetterPublisherWhileTransactionActive() { }); verify(producer1).beginTransaction(); + verify(producer1).commitTransaction(); verify(producer1).close(any()); verify(producer2, never()).beginTransaction(); @@ -483,8 +487,10 @@ public void testAbort() { public void testExecuteInTransactionNewInnerTx() { @SuppressWarnings("unchecked") Producer producer1 = mock(Producer.class); + given(producer1.send(any(), any())).willReturn(new SettableListenableFuture<>()); @SuppressWarnings("unchecked") Producer producer2 = mock(Producer.class); + given(producer2.send(any(), any())).willReturn(new SettableListenableFuture<>()); producer1.initTransactions(); AtomicBoolean first = new AtomicBoolean(true); @@ -560,26 +566,30 @@ void testNonTxWithTx() { @EnableTransactionManagement public static class DeclarativeConfig { - @SuppressWarnings("rawtypes") + @SuppressWarnings({ "rawtypes", "unchecked" }) @Bean - public ProducerFactory pf() { - ProducerFactory pf = mock(ProducerFactory.class); - given(pf.transactionCapable()).willReturn(true); - given(pf.createProducer(isNull())).willReturn(producer1()); - given(pf.createProducer(anyString())).willReturn(producer2()); - return pf; + public Producer producer1() { + Producer mock = mock(Producer.class); + given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>()); + return mock; } - @SuppressWarnings("rawtypes") + @SuppressWarnings({ "rawtypes", "unchecked" }) @Bean - public Producer producer1() { - return mock(Producer.class); + public Producer producer2() { + Producer mock = mock(Producer.class); + given(mock.send(any(), any())).willReturn(new SettableListenableFuture<>()); + return mock; } @SuppressWarnings("rawtypes") @Bean - public Producer producer2() { - return mock(Producer.class); + public ProducerFactory pf() { + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.transactionCapable()).willReturn(true); + given(pf.createProducer(isNull())).willReturn(producer1()); + given(pf.createProducer(anyString())).willReturn(producer2()); + return pf; } @SuppressWarnings({ "rawtypes", "unchecked" }) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java index 9b6e2189c6..dfd89a0c0a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/RoutingKafkaTemplateTests.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -43,7 +44,9 @@ public class RoutingKafkaTemplateTests { @Test public void routing() { Producer p1 = mock(Producer.class); + given(p1.send(any(), any())).willReturn(new SettableListenableFuture<>()); Producer p2 = mock(Producer.class); + given(p2.send(any(), any())).willReturn(new SettableListenableFuture<>()); ProducerFactory pf1 = mock(ProducerFactory.class); ProducerFactory pf2 = mock(ProducerFactory.class); given(pf1.createProducer()).willReturn(p1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 411fbf3a72..aa90796aaa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -99,6 +99,7 @@ import org.springframework.transaction.support.AbstractPlatformTransactionManager; import org.springframework.transaction.support.DefaultTransactionStatus; import org.springframework.util.backoff.FixedBackOff; +import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -178,6 +179,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl ConsumerFactory cf = mock(ConsumerFactory.class); willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides()); Producer producer = mock(Producer.class); + given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); final CountDownLatch closeLatch = new CountDownLatch(2); willAnswer(i -> { closeLatch.countDown(); @@ -421,6 +423,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides()); Producer producer = mock(Producer.class); + given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>()); final CountDownLatch closeLatch = new CountDownLatch(1); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 9375353e1f..39fd441688 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -87,6 +87,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -427,7 +428,7 @@ public void testAggregateOrphansNotStored() throws Exception { willAnswer(invocation -> { ProducerRecord rec = invocation.getArgument(0); correlation.set(rec.headers().lastHeader(KafkaHeaders.CORRELATION_ID).value()); - return null; + return new SettableListenableFuture<>(); }).given(producer).send(any(), any()); AggregatingReplyingKafkaTemplate template = new AggregatingReplyingKafkaTemplate(pf, container, (list, timeout) -> true);