-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
I am using spring-kafka version: 2.6.3 (spring-boot-starter-parent:2.4.0) but the problem should be present in the current version as well.
Preconditions:
- The Kafka broker is not available.
- spring.kafka.producer.transaction-id-prefix is set in the properties
- A Kafka transaction is started, either with
@Transactional
or with KafkaTemplate.executeInTransaction()
Problem:
The DefaultKafkaProducerFactory will create a new KafkaProducer.
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
KafkaProducer<K, V> kafkaProducer =
new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
this.postProcessors.forEach(pp -> pp.apply(kafkaProducer));
return kafkaProducer;
}
This KafkaProducer automatically starts a new thread which runs in an infinite while loop until the producer is closed.
Right after, the KafkaProducer.initTransactions() method is called:
private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
Producer<K, V> newProducer;
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
String txId = prefix + suffix;
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
if (this.clientIdPrefix != null) {
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
}
checkBootstrap(newProducerConfigs);
newProducer = createRawProducer(newProducerConfigs);
newProducer.initTransactions();
CloseSafeProducer<K, V> closeSafeProducer =
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName);
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
return closeSafeProducer;
}
If the Kafka broker is not available, the initTransactions() method will throw a TimeoutException. As a result, this Producer is never closed and the aforementioned thread runs infinitely.
The application does not recover from this even if the Kafka broker comes back, because the DefaultKafkaProducerFactory is not aware of them. This results in "dead" threads running in the background.
Solution:
Make a try-catch for any RuntimeException around newProducer.initTransactions(). In the catch block, close the producer and rethrow the exception.
private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
Producer<K, V> newProducer;
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
String txId = prefix + suffix;
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
if (this.clientIdPrefix != null) {
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
}
checkBootstrap(newProducerConfigs);
newProducer = createRawProducer(newProducerConfigs);
try {
newProducer.initTransactions();
} catch (RuntimeException e) {
newProducer.close();
throw e;
}
CloseSafeProducer<K, V> closeSafeProducer =
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName);
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
return closeSafeProducer;
}