Skip to content

Commit

Permalink
GH-1615: Add maxAge to Producers
Browse files Browse the repository at this point in the history
Resolves #1615

**cherry-pick to 2.5.x**
  • Loading branch information
garyrussell authored and artembilan committed Nov 10, 2020
1 parent 7ad38c0 commit 078474e
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 13 deletions.
Expand Up @@ -152,6 +152,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory

private String clientIdPrefix;

private long maxAge;

private volatile CloseSafeProducer<K, V> producer;

/**
Expand Down Expand Up @@ -350,6 +352,16 @@ public List<ProducerPostProcessor<K, V>> getPostProcessors() {
return Collections.unmodifiableList(this.postProcessors);
}

/**
* Set the maximum age for a producer; useful when using transactions and the broker
* might expire a {@code transactional.id} due to inactivity.
* @param maxAge the maxAge to set
* @since 2.5.8
*/
public void setMaxAge(Duration maxAge) {
this.maxAge = maxAge.toMillis();
}

/**
* Add a listener.
* @param listener the listener.
Expand Down Expand Up @@ -499,7 +511,8 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
if (this.threadBoundProducerEpochs.get() == null) {
this.threadBoundProducerEpochs.set(this.epoch.get());
}
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
if (tlProducer != null
&& (this.epoch.get() != this.threadBoundProducerEpochs.get() || expire(tlProducer))) {
closeThreadBoundProducer();
tlProducer = null;
}
Expand All @@ -515,6 +528,9 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
return tlProducer;
}
synchronized (this) {
if (this.producer != null && expire(this.producer)) {
this.producer = null;
}
if (this.producer == null) {
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout, this.beanName);
Expand Down Expand Up @@ -553,14 +569,15 @@ protected Producer<K, V> createTransactionalProducerForPartition(String txIdPref
}
else {
synchronized (this.consumerProducers) {
if (!this.consumerProducers.containsKey(suffix)) {
CloseSafeProducer<K, V> consumerProducer = this.consumerProducers.get(suffix);
if (consumerProducer == null || expire(consumerProducer)) {
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(txIdPrefix, suffix,
this::removeConsumerProducer);
this.consumerProducers.put(suffix, newProducer);
return newProducer;
}
else {
return this.consumerProducers.get(suffix);
return consumerProducer;
}
}
}
Expand Down Expand Up @@ -617,7 +634,15 @@ protected Producer<K, V> createTransactionalProducer() {
protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
BlockingQueue<CloseSafeProducer<K, V>> queue = getCache(txIdPrefix);
Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
Producer<K, V> cachedProducer = queue.poll();
CloseSafeProducer<K, V> cachedProducer = queue.poll();
while (cachedProducer != null) {
if (expire(cachedProducer)) {
cachedProducer = queue.poll();
}
else {
break;
}
}
if (cachedProducer == null) {
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
}
Expand All @@ -626,6 +651,14 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
}
}

private boolean expire(CloseSafeProducer<K, V> producer) {
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
if (expired) {
producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
}
return expired;
}

boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
if (producerToRemove.closed) {
producerToRemove.closeDelegate(timeout, this.listeners);
Expand Down Expand Up @@ -728,6 +761,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

final String txIdPrefix; // NOSONAR

final long created;

private final Duration closeTimeout;

final String clientId; // NOSONAR
Expand Down Expand Up @@ -762,6 +797,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
id = "unknown";
}
this.clientId = factoryName + "." + id;
this.created = System.currentTimeMillis();
LOGGER.debug(() -> "Created new Producer: " + this);
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.Duration;
Expand Down Expand Up @@ -122,9 +123,9 @@ protected Producer createRawProducer(Map configs) {

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testResetSingle() {
void testResetSingle() throws InterruptedException {
final Producer producer = mock(Producer.class);
ProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
protected Producer createRawProducer(Map configs) {
Expand All @@ -134,13 +135,19 @@ protected Producer createRawProducer(Map configs) {
};
Producer aProducer = pf.createProducer();
assertThat(aProducer).isNotNull();
Producer bProducer = pf.createProducer();
assertThat(bProducer).isSameAs(aProducer);
aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNotNull();
pf.setMaxAge(Duration.ofMillis(10));
Thread.sleep(50);
aProducer = pf.createProducer();
assertThat(aProducer).isNotSameAs(bProducer);
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
assertThat(cache.size()).isEqualTo(0);
pf.reset();
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
verify(producer).close(any(Duration.class));
verify(producer, times(2)).close(any(Duration.class));
}

@Test
Expand All @@ -161,28 +168,34 @@ protected Producer createRawProducer(Map configs) {
Producer aProducer = pf.createProducer();
assertThat(aProducer).isNotNull();
aProducer.close();
Producer bProducer = pf.createProducer();
assertThat(bProducer).isSameAs(aProducer);
bProducer.close();
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
assertThat(cache.size()).isEqualTo(1);
Queue queue = (Queue) cache.get("foo");
assertThat(queue.size()).isEqualTo(1);
pf.setMaxAge(Duration.ofMillis(10));
Thread.sleep(50);
aProducer = pf.createProducer();
assertThat(aProducer).isNotSameAs(bProducer);
pf.onApplicationEvent(new ContextStoppedEvent(ctx));
assertThat(queue.size()).isEqualTo(0);
verify(producer).close(any(Duration.class));
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testThreadLocal() {
void testThreadLocal() throws InterruptedException {
final Producer producer = mock(Producer.class);
AtomicBoolean created = new AtomicBoolean();
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

boolean created;

@Override
protected Producer createKafkaProducer() {
assertThat(this.created).isFalse();
this.created = true;
assertThat(created.get()).isFalse();
created.set(true);
return producer;
}

Expand All @@ -196,9 +209,14 @@ protected Producer createKafkaProducer() {
bProducer.close();
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
assertThat(KafkaTestUtils.getPropertyValue(pf, "threadBoundProducers", ThreadLocal.class).get()).isNotNull();
pf.setMaxAge(Duration.ofMillis(10));
Thread.sleep(50);
created.set(false);
aProducer = pf.createProducer();
assertThat(aProducer).isNotSameAs(bProducer);
pf.closeThreadBoundProducer();
assertThat(KafkaTestUtils.getPropertyValue(pf, "threadBoundProducers", ThreadLocal.class).get()).isNull();
verify(producer).close(any(Duration.class));
verify(producer, times(3)).close(any(Duration.class));
}

@Test
Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -3137,6 +3137,11 @@ Also see <<transaction-id-prefix>>.

With Spring Boot, it is only necessary to set the `spring.kafka.producer.transaction-id-prefix` property - Boot will automatically configure a `KafkaTransactionManager` bean and wire it into the listener container.

IMPORTANT: Starting with version 2.5.8, you can now configure the `maxAge` property on the producer factory.
This is useful when using transactional producers that might lay idle for the broker's `transactional.id.expiration.ms`.
With current `kafka-clients`, this can cause a `ProducerFencedException` without a rebalance.
By setting the `maxAge` to less than `transactional.id.expiration.ms`, the factory will refresh the producer if it is past it's max age.

===== Using `KafkaTransactionManager`

The `KafkaTransactionManager` is an implementation of Spring Framework's `PlatformTransactionManager`.
Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Expand Up @@ -37,3 +37,8 @@ See <<seek>> for more information.

Subclasses of `FailedRecordProcessor` (e.g. `SeekToCurrentErrorHandler`, `DefaultAfterRollbackProcessor`, `RecoveringBatchErrorHandler`) can now be configured to reset the retry state if the exception is a different type to that which occurred previously with this record.
See <<seek-to-current>>, <<after-rollback>>, <<recovering-batch-eh>> for more information.

==== Producer Factory Changes

You can now set a maximum age for producers after which they will be closed and recreated.
See <<transactions>> for more information.

0 comments on commit 078474e

Please sign in to comment.