Skip to content

Commit

Permalink
Don't cache tx producers after reset()
Browse files Browse the repository at this point in the history
Producers were incorrectly returned to the cache after a `reset()`.

**I will back-port; conflicts expected**
  • Loading branch information
garyrussell committed Nov 24, 2020
1 parent e9c5203 commit 2b2bb89
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();

private final ThreadLocal<Integer> threadBoundProducerEpochs = new ThreadLocal<>();

private final AtomicInteger epoch = new AtomicInteger();

private final AtomicInteger clientIdCounter = new AtomicInteger();
Expand Down Expand Up @@ -391,25 +389,21 @@ public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
}
if (this.producerPerThread) {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
if (this.threadBoundProducerEpochs.get() == null) {
this.threadBoundProducerEpochs.set(this.epoch.get());
}
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
if (tlProducer != null && this.epoch.get() != tlProducer.epoch) {
closeThreadBoundProducer();
tlProducer = null;
}
if (tlProducer == null) {
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout);
this.physicalCloseTimeout, this.epoch);
this.threadBoundProducers.set(tlProducer);
this.threadBoundProducerEpochs.set(this.epoch.get());
}
return tlProducer;
}
synchronized (this) {
if (this.producer == null) {
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout);
this.physicalCloseTimeout, this.epoch);
}
return this.producer;
}
Expand Down Expand Up @@ -516,7 +510,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
newProducer = createRawProducer(newProducerConfigs);
newProducer.initTransactions();
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout);
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout,
this.epoch);
}

protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
Expand Down Expand Up @@ -585,37 +580,57 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {

private final Duration closeTimeout;

final int epoch; // NOSONAR

private final AtomicInteger factoryEpoch;

private volatile Exception producerFailed;

private volatile boolean closed;

CloseSafeProducer(Producer<K, V> delegate, Consumer<CloseSafeProducer<K, V>> removeProducer,
Duration closeTimeout) {

this(delegate, null, removeProducer, null, closeTimeout);
this(delegate, null, removeProducer, null, closeTimeout, new AtomicInteger());
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
}

CloseSafeProducer(Producer<K, V> delegate, Consumer<CloseSafeProducer<K, V>> removeProducer,
Duration closeTimeout, AtomicInteger epoch) {

this(delegate, null, removeProducer, null, closeTimeout, epoch);
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
}

CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
Duration closeTimeout) {
this(delegate, cache, null, closeTimeout);
this(delegate, cache, null, null, closeTimeout, new AtomicInteger());
}

CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer, Duration closeTimeout) {

this(delegate, cache, removeConsumerProducer, null, closeTimeout);
this(delegate, cache, removeConsumerProducer, null, closeTimeout, new AtomicInteger());
}

CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeProducer, @Nullable String txId,
Duration closeTimeout) {

this(delegate, cache, removeProducer, txId, closeTimeout, new AtomicInteger());
}

CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
@Nullable Consumer<CloseSafeProducer<K, V>> removeProducer, @Nullable String txId,
Duration closeTimeout, AtomicInteger epoch) {

this.delegate = delegate;
this.cache = cache;
this.removeProducer = removeProducer;
this.txId = txId;
this.closeTimeout = closeTimeout;
this.epoch = epoch.get();
this.factoryEpoch = epoch;
LOGGER.debug(() -> "Created new Producer: " + this);
}

Expand Down Expand Up @@ -749,8 +764,8 @@ public void close(@Nullable Duration timeout) {
else {
if (this.cache != null && this.removeProducer == null) { // dedicated consumer producers are not cached
synchronized (this) {
if (!this.cache.contains(this)
&& !this.cache.offer(this)) {
if (this.epoch != this.factoryEpoch.get()
|| (!this.cache.contains(this) && !this.cache.offer(this))) {
this.closed = true;
this.delegate.close(timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,42 @@ protected Producer createTransactionalProducer(String txIdPrefix) {

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testThreadLocal() {
void dontReturnToCacheAfterReset() {
final Producer producer = mock(Producer.class);
ApplicationContext ctx = mock(ApplicationContext.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
protected Producer createRawProducer(Map configs) {
return producer;
}

};
pf.setApplicationContext(ctx);
pf.setTransactionIdPrefix("foo");
Producer aProducer = pf.createProducer();
assertThat(aProducer).isNotNull();
aProducer.close();
Producer bProducer = pf.createProducer();
assertThat(bProducer).isSameAs(aProducer);
bProducer.close();
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
assertThat(cache.size()).isEqualTo(1);
Queue queue = (Queue) cache.get("foo");
assertThat(queue.size()).isEqualTo(1);
bProducer = pf.createProducer();
assertThat(bProducer).isSameAs(aProducer);
assertThat(queue.size()).isEqualTo(0);
pf.reset();
bProducer.close();
assertThat(queue.size()).isEqualTo(0);
pf.destroy();
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testThreadLocal() throws InterruptedException {
final Producer producer = mock(Producer.class);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

Expand Down

0 comments on commit 2b2bb89

Please sign in to comment.