From fd2166e6409fc8080afe3bd2f65617bf29284670 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 16 Aug 2019 14:28:26 -0400 Subject: [PATCH] GH-1196: Use close(Duration) instead of close() Resolves https://github.com/spring-projects/spring-kafka/issues/1196 Add `closeTimeout` to `KafkaTemplate` and `KafkaTransactionManager` (default 5s). Use a zero timeout if a transaction operation failed with a timeout. Deprecate 1.3.x public APIs --- .../core/DefaultKafkaProducerFactory.java | 78 +++++++++++++------ .../kafka/core/KafkaResourceHolder.java | 35 ++++++++- .../kafka/core/KafkaTemplate.java | 32 +++++++- .../kafka/core/ProducerFactoryUtils.java | 46 ++++++++++- .../transaction/KafkaTransactionManager.java | 34 +++++++- .../DefaultKafkaProducerFactoryTests.java | 7 +- .../core/KafkaTemplateTransactionTests.java | 76 +++++++++++++++++- .../listener/TransactionalContainerTests.java | 18 ++--- 8 files changed, 277 insertions(+), 49 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index a927f51623..a197b0e2bb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -16,6 +16,9 @@ package org.springframework.kafka.core; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -43,7 +46,9 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.AppInfoParser; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; @@ -147,8 +152,10 @@ public void setValueSerializer(@Nullable Serializer valueSerializer) { } /** - * The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked). - * Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}. + * The time to wait when physically closing the producer via the factory rather than + * closing the producer itself (when {@link #reset()}, {@link #destroy() or + * #closeProducerFor(String)} are invoked). Specified in seconds; default + * {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}. * @param physicalCloseTimeout the timeout in seconds. * @since 1.0.7 */ @@ -216,7 +223,7 @@ public boolean transactionCapable() { @SuppressWarnings("resource") @Override - public void destroy() throws Exception { //NOSONAR + public void destroy() { CloseSafeProducer producerToClose = this.producer; this.producer = null; if (producerToClose != null) { @@ -400,6 +407,25 @@ public void closeProducerFor(String transactionIdSuffix) { */ protected static class CloseSafeProducer implements Producer { + private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0); + + private static final Method CLOSE_WITH_DURATION; + + static { + Method method = null; + String clientVersion = AppInfoParser.getVersion(); + try { + if (!clientVersion.startsWith("1.") && !clientVersion.startsWith("2.0.") + && !clientVersion.startsWith("2.1.")) { + method = KafkaProducer.class.getDeclaredMethod("close", Duration.class); + } + } + catch (NoSuchMethodException e) { + logger.error("Failed to get close(Duration) method for version: " + clientVersion, e); + } + CLOSE_WITH_DURATION = method; + } + private final Producer delegate; private final BlockingQueue> cache; @@ -408,7 +434,7 @@ protected static class CloseSafeProducer implements Producer { private final String txId; - private volatile boolean txFailed; + private volatile Exception txFailed; CloseSafeProducer(Producer delegate) { this(delegate, null, null); @@ -476,7 +502,7 @@ public void beginTransaction() throws ProducerFencedException { if (logger.isErrorEnabled()) { logger.error("beginTransaction failed: " + this, e); } - this.txFailed = true; + this.txFailed = e; throw e; } } @@ -500,7 +526,7 @@ public void commitTransaction() throws ProducerFencedException { if (logger.isErrorEnabled()) { logger.error("commitTransaction failed: " + this, e); } - this.txFailed = true; + this.txFailed = e; throw e; } } @@ -517,7 +543,7 @@ public void abortTransaction() throws ProducerFencedException { if (logger.isErrorEnabled()) { logger.error("Abort failed: " + this, e); } - this.txFailed = true; + this.txFailed = e; throw e; } } @@ -530,17 +556,16 @@ public void close() { @Override public void close(long timeout, @Nullable TimeUnit unit) { if (this.cache != null) { - if (this.txFailed) { + Duration closeTimeout = this.txFailed instanceof TimeoutException || unit == null + ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT + : Duration.ofMillis(unit.toMillis(timeout)); + if (this.txFailed != null) { if (logger.isWarnEnabled()) { - logger.warn("Error during transactional operation; producer removed from cache; possible cause: " - + "broker restarted during transaction: " + this); - } - if (unit == null) { - this.delegate.close(); - } - else { - this.delegate.close(timeout, unit); + logger.warn("Error during transactional operation; producer removed from cache; " + + "possible cause: " + + "broker restarted during transaction: " + this); } + closeDelegate(closeTimeout); if (this.removeConsumerProducer != null) { this.removeConsumerProducer.accept(this); } @@ -550,12 +575,7 @@ public void close(long timeout, @Nullable TimeUnit unit) { synchronized (this) { if (!this.cache.contains(this) && !this.cache.offer(this)) { - if (unit == null) { - this.delegate.close(); - } - else { - this.delegate.close(timeout, unit); - } + closeDelegate(closeTimeout); } } } @@ -563,6 +583,20 @@ public void close(long timeout, @Nullable TimeUnit unit) { } } + private void closeDelegate(Duration closeTimeout) { + if (CLOSE_WITH_DURATION != null) { + try { + CLOSE_WITH_DURATION.invoke(this.delegate, closeTimeout); + } + catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + logger.error("Failed to invoke close(Duration) with reflection", e); + } + } + else { + this.delegate.close(closeTimeout.toMillis(), TimeUnit.MILLISECONDS); + } + } + @Override public String toString() { return "CloseSafeProducer [delegate=" + this.delegate + "" diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java index 523055cd00..42792dfba8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java @@ -16,9 +16,13 @@ package org.springframework.kafka.core; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + import org.apache.kafka.clients.producer.Producer; import org.springframework.transaction.support.ResourceHolderSupport; +import org.springframework.util.Assert; /** * Kafka resource holder, wrapping a Kafka producer. KafkaTransactionManager binds instances of this @@ -33,12 +37,41 @@ public class KafkaResourceHolder extends ResourceHolderSupport { private final Producer producer; + private final Duration closeTimeout; + /** * Construct an instance for the producer. * @param producer the producer. */ public KafkaResourceHolder(Producer producer) { + this(producer, ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); + } + + /** + * Construct an instance for the producer. + * @param producer the producer. + * @param closeTimeout the close timeout. + * @deprecated in favor of {@link #KafkaResourceHolder(Producer, Duration)} + * @since 1.3.11 + */ + @Deprecated + public KafkaResourceHolder(Producer producer, long closeTimeout) { + Assert.notNull(producer, "'producer' cannot be null"); + Assert.notNull(closeTimeout, "'closeTimeout' cannot be null"); + this.producer = producer; + this.closeTimeout = Duration.ofMillis(closeTimeout); + } + + /** + * Construct an instance for the producer. + * @param producer the producer. + * @param closeTimeout the close timeout. + */ + public KafkaResourceHolder(Producer producer, Duration closeTimeout) { + Assert.notNull(producer, "'producer' cannot be null"); + Assert.notNull(closeTimeout, "'closeTimeout' cannot be null"); this.producer = producer; + this.closeTimeout = closeTimeout; } public Producer getProducer() { @@ -50,7 +83,7 @@ public void commit() { } public void close() { - this.producer.close(); + this.producer.close(this.closeTimeout.toMillis(), TimeUnit.MILLISECONDS); } public void rollback() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 5af2aab358..18d5219dca 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -16,8 +16,10 @@ package org.springframework.kafka.core; +import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -77,6 +79,7 @@ public class KafkaTemplate implements KafkaOperations { private volatile ProducerListener producerListener = new LoggingProducerListener(); + private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT; /** * Create an instance using the supplied producer factory and autoFlush false. @@ -158,6 +161,27 @@ public boolean isTransactional() { return this.transactional; } + /** + * Set the maximum time to wait when closing a producer; default 5 seconds. + * @param closeTimeout the close timeout. + * @deprecated in favor of {@link #setCloseTimeout(Duration)}. + * @since 1.3.11 + */ + @Deprecated + public void setCloseTimeout(long closeTimeout) { + setCloseTimeout(Duration.ofMillis(closeTimeout)); + } + + /** + * Set the maximum time to wait when closing a producer; default 5 seconds. + * @param closeTimeout the close timeout. + * @since 2.1.14 + */ + public void setCloseTimeout(Duration closeTimeout) { + Assert.notNull(closeTimeout, "'closeTimeout' cannot be null"); + this.closeTimeout = closeTimeout; + } + /** * Return the producer factory used by this template. * @return the factory. @@ -354,9 +378,9 @@ public void sendOffsetsToTransaction(Map offs producer.sendOffsetsToTransaction(offsets, consumerGroupId); } - protected void closeProducer(Producer producer, boolean inLocalTx) { - if (!inLocalTx) { - producer.close(); + protected void closeProducer(Producer producer, boolean inTx) { + if (!inTx) { + producer.close(this.closeTimeout.toMillis(), TimeUnit.MILLISECONDS); } } @@ -441,7 +465,7 @@ private Producer getTheProducer() { return producer; } KafkaResourceHolder holder = ProducerFactoryUtils - .getTransactionalResourceHolder(this.producerFactory); + .getTransactionalResourceHolder(this.producerFactory, this.closeTimeout); return holder.getProducer(); } else { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java index 97d699966a..a64c728419 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java @@ -16,6 +16,9 @@ package org.springframework.kafka.core; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + import org.apache.kafka.clients.producer.Producer; import org.springframework.lang.Nullable; @@ -35,6 +38,11 @@ */ public final class ProducerFactoryUtils { + /** + * The default close timeout (5 seconds). + */ + public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(5); + private static ThreadLocal groupIds = new ThreadLocal<>(); private ProducerFactoryUtils() { @@ -51,6 +59,38 @@ private ProducerFactoryUtils() { public static KafkaResourceHolder getTransactionalResourceHolder( final ProducerFactory producerFactory) { + return getTransactionalResourceHolder(producerFactory, DEFAULT_CLOSE_TIMEOUT); + } + + /** + * Obtain a Producer that is synchronized with the current transaction, if any. + * @param producerFactory the ProducerFactory to obtain a Channel for + * @param closeTimeout the producer close timeout. + * @param the key type. + * @param the value type. + * @return the resource holder. + * @deprecated in favor of {@link #getTransactionalResourceHolder(ProducerFactory, Duration)} + * @since 1.3.11 + */ + @Deprecated + public static KafkaResourceHolder getTransactionalResourceHolder( + final ProducerFactory producerFactory, long closeTimeout) { + + return getTransactionalResourceHolder(producerFactory, Duration.ofMillis(closeTimeout)); + } + + /** + * Obtain a Producer that is synchronized with the current transaction, if any. + * @param producerFactory the ProducerFactory to obtain a Channel for + * @param closeTimeout the producer close timeout. + * @param the key type. + * @param the value type. + * @return the resource holder. + * @since 2.1.14 + */ + public static KafkaResourceHolder getTransactionalResourceHolder( + final ProducerFactory producerFactory, Duration closeTimeout) { + Assert.notNull(producerFactory, "ProducerFactory must not be null"); @SuppressWarnings("unchecked") @@ -63,11 +103,11 @@ public static KafkaResourceHolder getTransactionalResourceHolder( producer.beginTransaction(); } catch (RuntimeException e) { - producer.close(); + producer.close(closeTimeout.toMillis(), TimeUnit.MILLISECONDS); throw e; } - resourceHolder = new KafkaResourceHolder(producer); + resourceHolder = new KafkaResourceHolder(producer, closeTimeout); bindResourceToTransaction(resourceHolder, producerFactory); } return resourceHolder; @@ -75,7 +115,7 @@ public static KafkaResourceHolder getTransactionalResourceHolder( public static void releaseResources(@Nullable KafkaResourceHolder resourceHolder) { if (resourceHolder != null) { - resourceHolder.getProducer().close(); + resourceHolder.close(); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java b/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java index c8ead5d840..be3cb2f617 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java @@ -16,6 +16,8 @@ package org.springframework.kafka.transaction; +import java.time.Duration; + import org.springframework.kafka.core.KafkaResourceHolder; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.ProducerFactoryUtils; @@ -42,9 +44,9 @@ * *

* Application code is required to retrieve the transactional Kafka resources via - * {@link ProducerFactoryUtils#getTransactionalResourceHolder(ProducerFactory)}. Spring's - * {@link org.springframework.kafka.core.KafkaTemplate KafkaTemplate} will auto detect a - * thread-bound Producer and automatically participate in it. + * {@link ProducerFactoryUtils#getTransactionalResourceHolder(ProducerFactory, java.time.Duration)}. + * Spring's {@link org.springframework.kafka.core.KafkaTemplate KafkaTemplate} will auto + * detect a thread-bound Producer and automatically participate in it. * *

* The use of {@link org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -70,6 +72,8 @@ public class KafkaTransactionManager extends AbstractPlatformTransactionMa private final ProducerFactory producerFactory; + private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT; + /** * Create a new KafkaTransactionManager, given a ProducerFactory. * Transaction synchronization is turned off by default, as this manager might be used alongside a datastore-based @@ -93,6 +97,27 @@ public ProducerFactory getProducerFactory() { return this.producerFactory; } + /** + * Set the maximum time to wait when closing a producer; default 5 seconds. + * @param closeTimeout the close timeout. + * @deprecated in favor of {@link #setCloseTimeout(Duration)}. + * @since 1.3.11 + */ + @Deprecated + public void setCloseTimeout(long closeTimeout) { + setCloseTimeout(Duration.ofMillis(closeTimeout)); + } + + /** + * Set the maximum time to wait when closing a producer; default 5 seconds. + * @param closeTimeout the close timeout. + * @since 2.1.14 + */ + public void setCloseTimeout(Duration closeTimeout) { + Assert.notNull(closeTimeout, "'closeTimeout' cannot be null"); + this.closeTimeout = closeTimeout; + } + /** * Return the producer factory. * @return the producer factory. @@ -132,7 +157,8 @@ protected void doBegin(Object transaction, TransactionDefinition definition) { KafkaTransactionObject txObject = (KafkaTransactionObject) transaction; KafkaResourceHolder resourceHolder = null; try { - resourceHolder = ProducerFactoryUtils.getTransactionalResourceHolder(getProducerFactory()); + resourceHolder = ProducerFactoryUtils.getTransactionalResourceHolder(getProducerFactory(), + this.closeTimeout); if (logger.isDebugEnabled()) { logger.debug("Created Kafka transaction on producer [" + resourceHolder.getProducer() + "]"); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index b158c6a88e..2ca131feb3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Producer; @@ -96,14 +97,14 @@ protected Producer createTransactionalProducer() { inOrder.verify(producer).send(any(), any()); inOrder.verify(producer).commitTransaction(); inOrder.verify(producer).beginTransaction(); - inOrder.verify(producer).close(); + inOrder.verify(producer).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); inOrder.verifyNoMoreInteractions(); pf.destroy(); } @Test @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testResetSingle() throws Exception { + public void testResetSingle() { final Producer producer = mock(Producer.class); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { @@ -115,7 +116,7 @@ protected Producer createKafkaProducer() { }; Producer aProducer = pf.createProducer(); assertThat(aProducer).isNotNull(); - aProducer.close(); + aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNotNull(); Queue cache = KafkaTestUtils.getPropertyValue(pf, "cache", Queue.class); assertThat(cache.size()).isEqualTo(0); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 2135c693d1..ceda62da78 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -17,10 +17,13 @@ package org.springframework.kafka.core; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -30,10 +33,13 @@ import static org.springframework.kafka.test.assertj.KafkaConditions.key; import static org.springframework.kafka.test.assertj.KafkaConditions.value; +import java.time.Duration; import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.clients.consumer.Consumer; @@ -48,6 +54,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.Assertions; @@ -316,7 +323,7 @@ public void testDeadLetterPublisherWhileTransactionActive() { verify(producer1).beginTransaction(); verify(producer1).commitTransaction(); - verify(producer1).close(); + verify(producer1).close(anyLong(), any()); verify(producer2, never()).beginTransaction(); verify(template, never()).executeInTransaction(any()); } @@ -343,6 +350,69 @@ public void testNoAbortAfterCommitFailure() { assertThat(producer.transactionAborted()).isFalse(); assertThat(producer.closed()).isTrue(); verify(producer, never()).abortTransaction(); + verify(producer).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + } + + @Test + public void testQuickCloseAfterCommitTimeout() { + @SuppressWarnings("unchecked") + Producer producer = mock(Producer.class); + + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(Collections.emptyMap()) { + + @Override + public Producer createProducer() { + CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, getCache()); + return closeSafeProducer; + } + + }; + pf.setTransactionIdPrefix("foo"); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + willThrow(new TimeoutException()).given(producer).commitTransaction(); + assertThatExceptionOfType(TimeoutException.class) + .isThrownBy(() -> + template.executeInTransaction(t -> { + return null; + })); + verify(producer, never()).abortTransaction(); + verify(producer).close(Duration.ofMillis(0).toMillis(), TimeUnit.MILLISECONDS); + } + + @Test + public void testNormalCloseAfterCommitCacheFull() { + @SuppressWarnings("unchecked") + Producer producer = mock(Producer.class); + + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(Collections.emptyMap()) { + + @SuppressWarnings("unchecked") + @Override + public Producer createProducer() { + BlockingQueue> cache = new LinkedBlockingQueue<>(1); + try { + cache.put(new CloseSafeProducer<>(mock(Producer.class))); + } + catch (@SuppressWarnings("unused") InterruptedException e) { + Thread.currentThread().interrupt(); + } + CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, cache); + return closeSafeProducer; + } + + }; + pf.setTransactionIdPrefix("foo"); + + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(STRING_KEY_TOPIC); + + template.executeInTransaction(t -> { + return null; + }); + verify(producer).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); } @Test @@ -435,9 +505,9 @@ Producer createTransactionalProducerForPartition() { inOrder.verify(producer1).beginTransaction(); inOrder.verify(producer2).beginTransaction(); inOrder.verify(producer2).commitTransaction(); - inOrder.verify(producer2).close(); + inOrder.verify(producer2).close(anyLong(), any()); inOrder.verify(producer1).commitTransaction(); - inOrder.verify(producer1).close(); + inOrder.verify(producer1).close(anyLong(), any()); } finally { TransactionSupport.clearTransactionIdSuffix(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index e085b6d41f..1b47a23a6f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -161,7 +161,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl willAnswer(i -> { closeLatch.countDown(); return null; - }).given(producer).close(); + }).given(producer).close(anyLong(), any()); ProducerFactory pf = mock(ProducerFactory.class); given(pf.transactionCapable()).willReturn(true); final List transactionalIds = new ArrayList<>(); @@ -196,7 +196,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(0)), "group"); inOrder.verify(producer).commitTransaction(); - inOrder.verify(producer).close(); + inOrder.verify(producer).close(anyLong(), any()); inOrder.verify(producer).beginTransaction(); ArgumentCaptor captor = ArgumentCaptor.forClass(ProducerRecord.class); inOrder.verify(producer).send(captor.capture(), any(Callback.class)); @@ -204,7 +204,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(1)), "group"); inOrder.verify(producer).commitTransaction(); - inOrder.verify(producer).close(); + inOrder.verify(producer).close(anyLong(), any()); container.stop(); verify(pf, times(2)).createProducer(); verifyNoMoreInteractions(producer); @@ -244,7 +244,7 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { willAnswer(i -> { closeLatch.countDown(); return null; - }).given(producer).close(); + }).given(producer).close(anyLong(), any()); ProducerFactory pf = mock(ProducerFactory.class); given(pf.transactionCapable()).willReturn(true); given(pf.createProducer()).willReturn(producer); @@ -271,7 +271,7 @@ public void testConsumeAndProduceTransactionRollback() throws Exception { inOrder.verify(producer, never()).sendOffsetsToTransaction(anyMap(), anyString()); inOrder.verify(producer, never()).commitTransaction(); inOrder.verify(producer).abortTransaction(); - inOrder.verify(producer).close(); + inOrder.verify(producer).close(anyLong(), any()); verify(consumer).seek(topicPartition0, 0); verify(consumer).seek(topicPartition1, 0); verify(consumer, never()).commitSync(anyMap()); @@ -311,7 +311,7 @@ public void testConsumeAndProduceTransactionRollbackBatch() throws Exception { willAnswer(i -> { closeLatch.countDown(); return null; - }).given(producer).close(); + }).given(producer).close(anyLong(), any()); ProducerFactory pf = mock(ProducerFactory.class); given(pf.transactionCapable()).willReturn(true); given(pf.createProducer()).willReturn(producer); @@ -338,7 +338,7 @@ public void testConsumeAndProduceTransactionRollbackBatch() throws Exception { inOrder.verify(producer, never()).sendOffsetsToTransaction(anyMap(), anyString()); inOrder.verify(producer, never()).commitTransaction(); inOrder.verify(producer).abortTransaction(); - inOrder.verify(producer).close(); + inOrder.verify(producer).close(anyLong(), any()); verify(consumer).seek(topicPartition0, 0); verify(consumer).seek(topicPartition1, 0); verify(consumer, never()).commitSync(anyMap()); @@ -377,7 +377,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { willAnswer(i -> { closeLatch.countDown(); return null; - }).given(producer).close(); + }).given(producer).close(anyLong(), any()); final ProducerFactory pf = mock(ProducerFactory.class); given(pf.transactionCapable()).willReturn(true); @@ -405,7 +405,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception { inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(1)), "group"); inOrder.verify(producer).commitTransaction(); - inOrder.verify(producer).close(); + inOrder.verify(producer).close(anyLong(), any()); container.stop(); verify(pf).createProducer(); }