-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Affects Version(s): 2.7.6
When non-listener-container transaction is processed and exception is thrown during commit to kafka then:
- database transaction is commited (OK)
- error is logged (by
TransactionSynchronizationUtils.invokeAfterCompletion
) (OK) - client is unaware of exception (NOT OK)
So (real world scenario) e.g. when you process HTTP request then client will receive HTTP 200 even though Spring failed to commit Kafka transaction.
It seems that the problem is in KafkaResourceSynchronization
class. Can you explain why commit/rollback is performed during afterCompletion
and not during afterCommit
?
Please have a look at AbstractPlatformTransactionManager.processCommit()
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); // here
}
triggerAfterCompletion
does call TransactionSynchronizationUtils.invokeAfterCompletion()
which is swallowing the exception (thus client never knows about the problem during commit):
for (TransactionSynchronization synchronization : synchronizations) {
try {
synchronization.afterCompletion(completionStatus);
}
catch (Throwable ex) {
logger.debug("TransactionSynchronization.afterCompletion threw exception", ex); //here
}
}
If spring-kafka moves commit/rollback logic to afterCommit
then exception could be thrown to the caller.
One scenario where this problem occurs is to trigger ProducerFencedException
(due to misconfiguration of spring.kafka.producer.transaction-id-prefix
).
Even if you fix misconfiguration issue, kafka may still throw other exceptions, so the problem remains.
You can use this repo to test the issue https://github.com/MariuszCwikla/kafka-jpa-producerfencedexception/tree/producerfencedexception-demo:
docker compose up
mvn package
then run two instances of the application:
java -jar target/kafka-jpa-producerfencedexception-0.0.1-SNAPSHOT.jar | grep ProducerFencedException &
java -jar target/kafka-jpa-producerfencedexception-0.0.1-SNAPSHOT.jar | grep ProducerFencedException
At one moment one of them will output following error and continue exception without notyfing the caller (instead exception should be thrown from createPersonAndSend()
.
org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
org.apache.kafka.common.errors.ProducerFencedException: The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId