Skip to content

Commit

Permalink
GH-1196: Use close(Duration) instead of close()
Browse files Browse the repository at this point in the history
Resolves #1196

Add `closeTimeout` to `KafkaTemplate` and `KafkaTransactionManager` (default 5s).
Use a zero timeout if a transaction operation failed with a timeout.

Deprecate 1.3.x public APIs
  • Loading branch information
garyrussell authored and artembilan committed Aug 16, 2019
1 parent 09a805b commit fd2166e
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 49 deletions.
Expand Up @@ -16,6 +16,9 @@

package org.springframework.kafka.core;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -43,7 +46,9 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
Expand Down Expand Up @@ -147,8 +152,10 @@ public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
}

/**
* The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked).
* Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
* The time to wait when physically closing the producer via the factory rather than
* closing the producer itself (when {@link #reset()}, {@link #destroy() or
* #closeProducerFor(String)} are invoked). Specified in seconds; default
* {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
* @param physicalCloseTimeout the timeout in seconds.
* @since 1.0.7
*/
Expand Down Expand Up @@ -216,7 +223,7 @@ public boolean transactionCapable() {

@SuppressWarnings("resource")
@Override
public void destroy() throws Exception { //NOSONAR
public void destroy() {
CloseSafeProducer<K, V> producerToClose = this.producer;
this.producer = null;
if (producerToClose != null) {
Expand Down Expand Up @@ -400,6 +407,25 @@ public void closeProducerFor(String transactionIdSuffix) {
*/
protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0);

private static final Method CLOSE_WITH_DURATION;

static {
Method method = null;
String clientVersion = AppInfoParser.getVersion();
try {
if (!clientVersion.startsWith("1.") && !clientVersion.startsWith("2.0.")
&& !clientVersion.startsWith("2.1.")) {
method = KafkaProducer.class.getDeclaredMethod("close", Duration.class);
}
}
catch (NoSuchMethodException e) {
logger.error("Failed to get close(Duration) method for version: " + clientVersion, e);
}
CLOSE_WITH_DURATION = method;
}

private final Producer<K, V> delegate;

private final BlockingQueue<CloseSafeProducer<K, V>> cache;
Expand All @@ -408,7 +434,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

private final String txId;

private volatile boolean txFailed;
private volatile Exception txFailed;

CloseSafeProducer(Producer<K, V> delegate) {
this(delegate, null, null);
Expand Down Expand Up @@ -476,7 +502,7 @@ public void beginTransaction() throws ProducerFencedException {
if (logger.isErrorEnabled()) {
logger.error("beginTransaction failed: " + this, e);
}
this.txFailed = true;
this.txFailed = e;
throw e;
}
}
Expand All @@ -500,7 +526,7 @@ public void commitTransaction() throws ProducerFencedException {
if (logger.isErrorEnabled()) {
logger.error("commitTransaction failed: " + this, e);
}
this.txFailed = true;
this.txFailed = e;
throw e;
}
}
Expand All @@ -517,7 +543,7 @@ public void abortTransaction() throws ProducerFencedException {
if (logger.isErrorEnabled()) {
logger.error("Abort failed: " + this, e);
}
this.txFailed = true;
this.txFailed = e;
throw e;
}
}
Expand All @@ -530,17 +556,16 @@ public void close() {
@Override
public void close(long timeout, @Nullable TimeUnit unit) {
if (this.cache != null) {
if (this.txFailed) {
Duration closeTimeout = this.txFailed instanceof TimeoutException || unit == null
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
: Duration.ofMillis(unit.toMillis(timeout));
if (this.txFailed != null) {
if (logger.isWarnEnabled()) {
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
+ "broker restarted during transaction: " + this);
}
if (unit == null) {
this.delegate.close();
}
else {
this.delegate.close(timeout, unit);
logger.warn("Error during transactional operation; producer removed from cache; "
+ "possible cause: "
+ "broker restarted during transaction: " + this);
}
closeDelegate(closeTimeout);
if (this.removeConsumerProducer != null) {
this.removeConsumerProducer.accept(this);
}
Expand All @@ -550,19 +575,28 @@ public void close(long timeout, @Nullable TimeUnit unit) {
synchronized (this) {
if (!this.cache.contains(this)
&& !this.cache.offer(this)) {
if (unit == null) {
this.delegate.close();
}
else {
this.delegate.close(timeout, unit);
}
closeDelegate(closeTimeout);
}
}
}
}
}
}

private void closeDelegate(Duration closeTimeout) {
if (CLOSE_WITH_DURATION != null) {
try {
CLOSE_WITH_DURATION.invoke(this.delegate, closeTimeout);
}
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
logger.error("Failed to invoke close(Duration) with reflection", e);
}
}
else {
this.delegate.close(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
}

@Override
public String toString() {
return "CloseSafeProducer [delegate=" + this.delegate + ""
Expand Down
Expand Up @@ -16,9 +16,13 @@

package org.springframework.kafka.core;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Producer;

import org.springframework.transaction.support.ResourceHolderSupport;
import org.springframework.util.Assert;

/**
* Kafka resource holder, wrapping a Kafka producer. KafkaTransactionManager binds instances of this
Expand All @@ -33,12 +37,41 @@ public class KafkaResourceHolder<K, V> extends ResourceHolderSupport {

private final Producer<K, V> producer;

private final Duration closeTimeout;

/**
* Construct an instance for the producer.
* @param producer the producer.
*/
public KafkaResourceHolder(Producer<K, V> producer) {
this(producer, ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
}

/**
* Construct an instance for the producer.
* @param producer the producer.
* @param closeTimeout the close timeout.
* @deprecated in favor of {@link #KafkaResourceHolder(Producer, Duration)}
* @since 1.3.11
*/
@Deprecated
public KafkaResourceHolder(Producer<K, V> producer, long closeTimeout) {
Assert.notNull(producer, "'producer' cannot be null");
Assert.notNull(closeTimeout, "'closeTimeout' cannot be null");
this.producer = producer;
this.closeTimeout = Duration.ofMillis(closeTimeout);
}

/**
* Construct an instance for the producer.
* @param producer the producer.
* @param closeTimeout the close timeout.
*/
public KafkaResourceHolder(Producer<K, V> producer, Duration closeTimeout) {
Assert.notNull(producer, "'producer' cannot be null");
Assert.notNull(closeTimeout, "'closeTimeout' cannot be null");
this.producer = producer;
this.closeTimeout = closeTimeout;
}

public Producer<K, V> getProducer() {
Expand All @@ -50,7 +83,7 @@ public void commit() {
}

public void close() {
this.producer.close();
this.producer.close(this.closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
}

public void rollback() {
Expand Down
Expand Up @@ -16,8 +16,10 @@

package org.springframework.kafka.core;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -77,6 +79,7 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {

private volatile ProducerListener<K, V> producerListener = new LoggingProducerListener<K, V>();

private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;

/**
* Create an instance using the supplied producer factory and autoFlush false.
Expand Down Expand Up @@ -158,6 +161,27 @@ public boolean isTransactional() {
return this.transactional;
}

/**
* Set the maximum time to wait when closing a producer; default 5 seconds.
* @param closeTimeout the close timeout.
* @deprecated in favor of {@link #setCloseTimeout(Duration)}.
* @since 1.3.11
*/
@Deprecated
public void setCloseTimeout(long closeTimeout) {
setCloseTimeout(Duration.ofMillis(closeTimeout));
}

/**
* Set the maximum time to wait when closing a producer; default 5 seconds.
* @param closeTimeout the close timeout.
* @since 2.1.14
*/
public void setCloseTimeout(Duration closeTimeout) {
Assert.notNull(closeTimeout, "'closeTimeout' cannot be null");
this.closeTimeout = closeTimeout;
}

/**
* Return the producer factory used by this template.
* @return the factory.
Expand Down Expand Up @@ -354,9 +378,9 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
}

protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
if (!inLocalTx) {
producer.close();
protected void closeProducer(Producer<K, V> producer, boolean inTx) {
if (!inTx) {
producer.close(this.closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -441,7 +465,7 @@ private Producer<K, V> getTheProducer() {
return producer;
}
KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
.getTransactionalResourceHolder(this.producerFactory);
.getTransactionalResourceHolder(this.producerFactory, this.closeTimeout);
return holder.getProducer();
}
else {
Expand Down
Expand Up @@ -16,6 +16,9 @@

package org.springframework.kafka.core;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Producer;

import org.springframework.lang.Nullable;
Expand All @@ -35,6 +38,11 @@
*/
public final class ProducerFactoryUtils {

/**
* The default close timeout (5 seconds).
*/
public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(5);

private static ThreadLocal<String> groupIds = new ThreadLocal<>();

private ProducerFactoryUtils() {
Expand All @@ -51,6 +59,38 @@ private ProducerFactoryUtils() {
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
final ProducerFactory<K, V> producerFactory) {

return getTransactionalResourceHolder(producerFactory, DEFAULT_CLOSE_TIMEOUT);
}

/**
* Obtain a Producer that is synchronized with the current transaction, if any.
* @param producerFactory the ProducerFactory to obtain a Channel for
* @param closeTimeout the producer close timeout.
* @param <K> the key type.
* @param <V> the value type.
* @return the resource holder.
* @deprecated in favor of {@link #getTransactionalResourceHolder(ProducerFactory, Duration)}
* @since 1.3.11
*/
@Deprecated
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
final ProducerFactory<K, V> producerFactory, long closeTimeout) {

return getTransactionalResourceHolder(producerFactory, Duration.ofMillis(closeTimeout));
}

/**
* Obtain a Producer that is synchronized with the current transaction, if any.
* @param producerFactory the ProducerFactory to obtain a Channel for
* @param closeTimeout the producer close timeout.
* @param <K> the key type.
* @param <V> the value type.
* @return the resource holder.
* @since 2.1.14
*/
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
final ProducerFactory<K, V> producerFactory, Duration closeTimeout) {

Assert.notNull(producerFactory, "ProducerFactory must not be null");

@SuppressWarnings("unchecked")
Expand All @@ -63,19 +103,19 @@ public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
producer.beginTransaction();
}
catch (RuntimeException e) {
producer.close();
producer.close(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
throw e;
}

resourceHolder = new KafkaResourceHolder<K, V>(producer);
resourceHolder = new KafkaResourceHolder<K, V>(producer, closeTimeout);
bindResourceToTransaction(resourceHolder, producerFactory);
}
return resourceHolder;
}

public static <K, V> void releaseResources(@Nullable KafkaResourceHolder<K, V> resourceHolder) {
if (resourceHolder != null) {
resourceHolder.getProducer().close();
resourceHolder.close();
}
}

Expand Down

0 comments on commit fd2166e

Please sign in to comment.