-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
spring-kafka 3.16 but I believe most versions have the same issue
Describe the bug
When kafka template is called for the first time from a thread (in my case an SQS listerner), then the client is unable to load a custom authentication callback (in my case a custom OAUTH2).
I get the error:
org.apache.kafka.common.config.ConfigException: Invalid value xxx.yyy.zzz.XyzAuthenticateCallbackHandler for configuration sasl.login.callback.handler.class: Class xxx.yyy.zzz.XyzAuthenticateCallbackHandler could not be found.
To Reproduce
use a custom callback handler, packaged with the fat jar . Then call the kafka template from within an SQS listener.
Possibly also reproducible in @async calls? Or code running in a thread?
IF the producer is created from the "main" SB thread, then the template has no issues (apparently cached poroducer).
The following code:
@Bean
ApplicationRunner warmProducer(ProducerFactory<?, ?> pf) {
return args -> {
// Trigger producer instantiation once at startup
try (var p = pf.createProducer()) {
// no-op; producer is created and cached by the factory
}
};
}
forces the creation of the producer early.
An alternative:
@Bean
ProducerFactory<Object, Object> producerFactory(KafkaProperties kafkaProperties) {
Map<String, Object> cfg = kafkaProperties.buildProducerProperties(null);
return new DefaultKafkaProducerFactory<>(cfg) {
// This method is on the hot path shown in your stack trace
@Override
protected Producer<Object, Object> createRawProducer(Map<String, Object> configs) {
ClassLoader original = Thread.currentThread().getContextClassLoader();
try {
// Use the Boot loader that can see BOOT-INF/lib/* (your nested jars)
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
return super.createRawProducer(configs); // will new KafkaProducer<>(...)
} finally {
Thread.currentThread().setContextClassLoader(original);
}
}
};
}
or even:
method doSend(Data data){
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
kafkaTemplate.send(data);
}
fixes the issue.
Expected behavior
the jaas module class should be loadable by the producer factory/template, regardless of when/where the template is used....
Sample
See code above