-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Milestone
Description
In some specific use cases, it may be required to update the configuration entries of a transactional consumer before creating it.
To allow this, a slight update of the DefaultKafkaProducer#doCreateTxProducer method may be done as follow
// private -> protected
// new rawConfigs parameters
protected CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
BiPredicate<CloseSafeProducer<K, V>, Duration> remover, Map<String, Object> rawConfigs) {
// same code as now
}
This update would allow a pre-processing of configuration entries as follow
public class CustomDefaultKafkaProducerFactory<K, V> extends DefaultKafkaProducerFactory<K, V> {
@Override
protected CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix, BiPredicate<CloseSafeProducer<K, V>, Duration> remover, Map<String, Object> rawConfigs) {
final Map<String, Object> newProducerConfigs = new HashMap<>(rawConfigs);
// overrides
return super.doCreateTxProducer(prefix, suffix, remover, newProducerConfigs);
}
}
This issue follows a Stackoverflow discussion described here.