Skip to content

Commit

Permalink
Close thread bound producers after reset
Browse files Browse the repository at this point in the history
**cherry-pick to 2.4.x, 2.3.x**
  • Loading branch information
garyrussell authored and artembilan committed Apr 27, 2020
1 parent ffce143 commit cef665e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,

private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();

private final ThreadLocal<Integer> threadBoundProducerEpochs = new ThreadLocal<>();

private final AtomicInteger epoch = new AtomicInteger();

private final AtomicInteger clientIdCounter = new AtomicInteger();

private Supplier<Serializer<K>> keySerializerSupplier;
Expand Down Expand Up @@ -331,6 +335,7 @@ public void destroy() {
(k, v) -> v.getDelegate().close(this.physicalCloseTimeout));
this.consumerProducers.clear();
}
this.epoch.incrementAndGet();
}

@Override
Expand Down Expand Up @@ -382,10 +387,18 @@ public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
}
if (this.producerPerThread) {
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
if (this.threadBoundProducerEpochs.get() == null) {
this.threadBoundProducerEpochs.set(this.epoch.get());
}
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
closeThreadBoundProducer();
tlProducer = null;
}
if (tlProducer == null) {
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
this.physicalCloseTimeout);
this.threadBoundProducers.set(tlProducer);
this.threadBoundProducerEpochs.set(this.epoch.get());
}
return tlProducer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,35 @@ protected Producer createKafkaProducer() {
verify(producer).close(any(Duration.class));
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testThreadLocalReset() {
Producer producer1 = mock(Producer.class);
Producer producer2 = mock(Producer.class);
ProducerFactory mockPf = mock(ProducerFactory.class);
given(mockPf.createProducer()).willReturn(producer1, producer2);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {

@Override
protected Producer createKafkaProducer() {
return mockPf.createProducer();
}

};
pf.setProducerPerThread(true);
Producer aProducer = pf.createProducer();
assertThat(aProducer).isNotNull();
aProducer.close();
Producer bProducer = pf.createProducer();
assertThat(bProducer).isSameAs(aProducer);
bProducer.close();
pf.reset();
bProducer = pf.createProducer();
assertThat(bProducer).isNotSameAs(aProducer);
bProducer.close();
verify(producer1).close(any(Duration.class));
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testCleanUpAfterTxFence() {
Expand Down

0 comments on commit cef665e

Please sign in to comment.