From 7b492f5cd30500d50000c16c09d2b34efded5ae6 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 13 Apr 2020 13:11:35 -0400 Subject: [PATCH] GH-1441: Close producer after fatal send error Resolves https://github.com/spring-projects/spring-kafka/issues/1441 Backport. --- .../core/DefaultKafkaProducerFactory.java | 181 +++++++++++++----- .../DefaultKafkaProducerFactoryTests.java | 38 +++- .../core/KafkaTemplateTransactionTests.java | 10 +- 3 files changed, 172 insertions(+), 57 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 944ef6b50d..fe2615b874 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serializer; @@ -190,15 +191,18 @@ public boolean transactionCapable() { @SuppressWarnings("resource") @Override public void destroy() { - CloseSafeProducer producer = this.producer; - this.producer = null; - if (producer != null) { - producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS); + CloseSafeProducer producerToClose; + synchronized (this) { + producerToClose = this.producer; + this.producer = null; + } + if (producerToClose != null) { + producerToClose.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS); } producer = this.cache.poll(); while (producer != null) { try { - producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS); + producerToClose.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS); } catch (Exception e) { logger.error("Exception while closing producer", e); @@ -247,14 +251,13 @@ public Producer createProducer() { return createTransactionalProducer(); } } - if (this.producer == null) { - synchronized (this) { - if (this.producer == null) { - this.producer = new CloseSafeProducer(createKafkaProducer()); - } + synchronized (this) { + if (this.producer == null) { + this.producer = new CloseSafeProducer(createKafkaProducer(), standardProducerRemover(), + this.physicalCloseTimeout); } + return this.producer; } - return this.producer; } /** @@ -293,6 +296,19 @@ Producer createTransactionalProducerForPartition() { } } + /** + * Remove the single shared producer if present. + * @param producerToRemove the producer; + * @since 1.3.11 + */ + protected final synchronized void removeProducer( + @SuppressWarnings("unused") CloseSafeProducer producerToRemove) { + + if (producerToRemove.equals(this.producer)) { + this.producer = null; + } + } + /** * Subclasses must return a producer from the {@link #getCache()} or a * new raw producer wrapped in a {@link CloseSafeProducer}. @@ -319,9 +335,46 @@ private CloseSafeProducer doCreateTxProducer(String suffix, boolean isCons } producer = new KafkaProducer(configs, this.keySerializer, this.valueSerializer); producer.initTransactions(); - return new CloseSafeProducer(producer, this.cache, - isConsumerProducer ? this.consumerProducers : null, - (String) configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + Remover remover = isConsumerProducer + ? consumerProducerRemover() + : null; + return new CloseSafeProducer(producer, this.cache, remover, + (String) configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout); + } + + private Remover standardProducerRemover() { + return new Remover() { + + @Override + public void remove(CloseSafeProducer producer) { + removeProducer(producer); + } + + }; + } + + private Remover consumerProducerRemover() { + return new Remover() { + + @Override + public void remove(CloseSafeProducer producer) { + removeConsumerProducer(producer); + } + + }; + } + + private void removeConsumerProducer(CloseSafeProducer producer) { + synchronized (this.consumerProducers) { + Iterator>> iterator = this.consumerProducers.entrySet() + .iterator(); + while (iterator.hasNext()) { + if (iterator.next().getValue().equals(producer)) { + iterator.remove(); + break; + } + } + } } protected BlockingQueue> getCache() { @@ -339,6 +392,19 @@ public void closeProducerFor(String transactionIdSuffix) { } } + /** + * Internal interface to remove a failed producer. + * + * @param the key type. + * @param the value type. + * + */ + interface Remover { + + void remove(CloseSafeProducer producer); + + } + /** * A wrapper class for the delegate. * @@ -352,34 +418,46 @@ protected static class CloseSafeProducer implements Producer { private final BlockingQueue> cache; - private final Map> consumerProducers; - private final String txId; - private volatile Exception txFailed; + private final Remover remover; + + private final int closeTimeout; + + private volatile Exception producerFailed; + + private volatile boolean closed; + + CloseSafeProducer(Producer delegate, Remover remover, int closeTimeout) { - CloseSafeProducer(Producer delegate) { - this(delegate, null, null); + this(delegate, null, remover, null, closeTimeout); Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer"); } - CloseSafeProducer(Producer delegate, BlockingQueue> cache) { - this(delegate, cache, null); + CloseSafeProducer(Producer delegate, BlockingQueue> cache, + int closeTimeout) { + + this(delegate, cache, null, closeTimeout); } CloseSafeProducer(Producer delegate, BlockingQueue> cache, - Map> consumerProducers) { + Remover remover, int closeTimeout) { - this(delegate, cache, consumerProducers, null); + this(delegate, cache, remover, null, closeTimeout); } CloseSafeProducer(Producer delegate, BlockingQueue> cache, - Map> consumerProducers, String txId) { + Remover remover, String txId, int closeTimeout) { this.delegate = delegate; this.cache = cache; - this.consumerProducers = consumerProducers; + this.remover = remover; this.txId = txId; + this.closeTimeout = closeTimeout; + } + + Producer getDelegate() { + return this.delegate; } @Override @@ -388,8 +466,19 @@ public Future send(ProducerRecord record) { } @Override - public Future send(ProducerRecord record, Callback callback) { - return this.delegate.send(record, callback); + public Future send(ProducerRecord record, final Callback callback) { + return this.delegate.send(record, new Callback() { + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception instanceof OutOfOrderSequenceException) { + CloseSafeProducer.this.producerFailed = exception; + close(CloseSafeProducer.this.closeTimeout, TimeUnit.MILLISECONDS); + } + callback.onCompletion(metadata, exception); + } + + }); } @Override @@ -424,7 +513,7 @@ public void beginTransaction() throws ProducerFencedException { if (logger.isErrorEnabled()) { logger.error("beginTransaction failed: " + this, e); } - this.txFailed = e; + this.producerFailed = e; throw e; } } @@ -448,7 +537,7 @@ public void commitTransaction() throws ProducerFencedException { if (logger.isErrorEnabled()) { logger.error("commitTransaction failed: " + this, e); } - this.txFailed = e; + this.producerFailed = e; throw e; } } @@ -465,7 +554,7 @@ public void abortTransaction() throws ProducerFencedException { if (logger.isErrorEnabled()) { logger.error("Abort failed: " + this, e); } - this.txFailed = e; + this.producerFailed = e; throw e; } } @@ -477,26 +566,27 @@ public void close() { @Override public void close(long timeout, TimeUnit unit) { - if (this.cache != null) { - long closeTimeout = this.txFailed instanceof TimeoutException || unit == null - ? 0L - : timeout; - if (this.txFailed != null) { + if (!this.closed) { + if (this.producerFailed != null) { if (logger.isWarnEnabled()) { logger.warn("Error during transactional operation; producer removed from cache; " + "possible cause: " + "broker restarted during transaction: " + this); } - this.delegate.close(closeTimeout, unit); - if (this.consumerProducers != null) { - removeConsumerProducer(); + this.closed = true; + this.delegate.close(this.producerFailed instanceof TimeoutException || unit == null + ? 0L + : timeout, unit); + if (this.remover != null) { + this.remover.remove(this); } } else { - if (this.consumerProducers == null) { // dedicated consumer producers are not cached + if (this.cache != null && this.remover == null) { // dedicated consumer producers are not cached synchronized (this) { if (!this.cache.contains(this) && !this.cache.offer(this)) { + this.closed = true; this.delegate.close(closeTimeout, unit); } } @@ -505,19 +595,6 @@ public void close(long timeout, TimeUnit unit) { } } - private void removeConsumerProducer() { - synchronized (this.consumerProducers) { - Iterator>> iterator = this.consumerProducers.entrySet() - .iterator(); - while (iterator.hasNext()) { - if (iterator.next().getValue().equals(this)) { - iterator.remove(); - break; - } - } - } - } - @Override public String toString() { return "CloseSafeProducer [delegate=" + this.delegate + "" diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java index 0936285b07..5de0c71a56 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java @@ -22,14 +22,18 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.junit.Test; import org.mockito.InOrder; @@ -58,9 +62,9 @@ protected Producer createTransactionalProducer() { producer.initTransactions(); BlockingQueue cache = getCache(); Producer cached = cache.poll(); - return cached == null ? new CloseSafeProducer(producer, cache) : cached; + return cached == null ? new CloseSafeProducer(producer, cache, null, + (int) ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT) : cached; } - }; pf.setTransactionIdPrefix("foo"); @@ -102,4 +106,34 @@ protected Producer createTransactionalProducer() { pf.destroy(); } + @Test + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testUnknownProducerIdException() { + final Producer producer1 = mock(Producer.class); + willAnswer(inv -> { + ((Callback) inv.getArguments()[1]).onCompletion(null, new OutOfOrderSequenceException("test")); + return null; + }).given(producer1).send(any(), any()); + final Producer producer2 = mock(Producer.class); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) { + + private final AtomicBoolean first = new AtomicBoolean(true); + + @Override + protected Producer createKafkaProducer() { + return this.first.getAndSet(false) ? producer1 : producer2; + } + + }; + pf.setPhysicalCloseTimeout((int) ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); + final Producer aProducer = pf.createProducer(); + assertThat(aProducer).isNotNull(); + aProducer.send(null, (meta, ex) -> { }); + aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS); + assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull(); + verify(producer1).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS); + Producer bProducer = pf.createProducer(); + assertThat(bProducer).isNotSameAs(aProducer); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 6c2c5a69a3..a910692570 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -283,7 +283,8 @@ public void testQuickCloseAfterCommitTimeout() { @Override public Producer createProducer() { - CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, getCache()); + CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, getCache(), + null, (int) ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); return closeSafeProducer; } @@ -320,12 +321,15 @@ public void testNormalCloseAfterCommitCacheFull() { public Producer createProducer() { BlockingQueue> cache = new LinkedBlockingQueue<>(1); try { - cache.put(new CloseSafeProducer<>(mock(Producer.class))); + cache.put(new CloseSafeProducer<>(mock(Producer.class), null, + null, (int) ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT)); } catch (@SuppressWarnings("unused") InterruptedException e) { Thread.currentThread().interrupt(); } - CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, cache); + CloseSafeProducer closeSafeProducer = + new CloseSafeProducer<>(producer, cache, null, + (int) ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT); return closeSafeProducer; }