Skip to content

turing85/quarkus-camel-transactions

Repository files navigation

quarkus-camel-transactions

This repository demonstrates a breaking change from quarkus 3.8.1 to 3.8.2.

Description

The application consists of a single camel route that transfers data from one database to another database:

Camel route to transfer data from one database to another database
  @Override
  public void configure() {
    // @formatter:off
    from(
        scheduler("read-clean-write")
            .delay(Duration.ofSeconds(10).toMillis()))
        .id("scheduler -> db read -> db clean -> db write")
        .log("reading...")
        .transacted()
        .to(sql("SELECT * FROM data")
            .dataSource(source)
            .outputType(SqlOutputType.SelectList))
        .log("done")
        .choice()
            .when(header(SqlConstants.SQL_ROW_COUNT).isGreaterThan(0))
                .log("${headers.%s} entries to transfer".formatted(SqlConstants.SQL_ROW_COUNT))
                .process(toQueryTransformer)
                .setProperty(QUERY, simple("${body}"))
                .log("deleting...")
                .to(sql("DELETE FROM data")
                    .dataSource(source))
                .log("done")
                .log("transferring...")
                .setBody(exchangeProperty(QUERY))
                .removeProperty(QUERY)
                .to(sql("query-in-body")
                    .dataSource(target)
                    .useMessageBodyForSql(true))
                .log("done")
            .otherwise()
                .log("No entries to transfer");
    // @formatter:on
  }

And the datasources are both configured with …​.jdbc.transactions = enabled (the default):

application.properties
quarkus.datasource."source".db-kind = postgresql
quarkus.datasource."source".devservices.enabled = true
quarkus.datasource."source".devservices.image-name = docker.io/postgres:16.2-alpine3.18
quarkus.datasource."source".devservices.command = --max_prepared_transactions=64
quarkus.datasource."source".devservices.db-name = source
quarkus.datasource."source".devservices.init-script-path = source.sql
quarkus.datasource."source".devservices.port = 15432
quarkus.datasource."source".devservices.reuse = false
quarkus.datasource."source".devservices.username = quarkus
quarkus.datasource."source".devservices.password = quarkus
quarkus.datasource."source".jdbc.transactions = enabled

quarkus.datasource."target".db-kind = postgresql
quarkus.datasource."target".devservices.enabled = true
quarkus.datasource."target".devservices.image-name = docker.io/postgres:16.2-alpine3.18
quarkus.datasource."target".devservices.command = --max_prepared_transactions=64
quarkus.datasource."target".devservices.db-name = target
quarkus.datasource."target".devservices.init-script-path = target.sql
quarkus.datasource."target".devservices.port = 25432
quarkus.datasource."target".devservices.reuse = false
quarkus.datasource."target".devservices.username = quarkus
quarkus.datasource."target".devservices.password = quarkus
quarkus.datasource."target".jdbc.transactions = enabled

When executed with quarkus 3.8.1, the route works as expected. When executed with quarkus 3.8.2, however, it fails with:

Exception thrown in quarkus 3.8.2
2024-03-23 03:00:05,016 WARN  [com.arj.ats.jta] (Camel (camel-1) thread #1 - scheduler://read-clean-write) ARJUNA016045: attempted rollback of < formatId=131077, gtrid_length=35, bqual_length=36, tx_uid=0:ffff7f000101:b349:65fe37a4:0, node_name=quarkus, branch_uid=0:ffff7f000101:b349:65fe37a4:5, subordinatenodename=null, eis_name=0 > (io.agroal.narayana.LocalXAResource@d01bbed) failed with exception code XAException.XAER_RMERR: javax.transaction.xa.XAException: Error trying to transactionRollback local transaction: Enlisted connection used without active transaction
	at io.agroal.narayana.XAExceptionUtils.xaException(XAExceptionUtils.java:20)
	at io.agroal.narayana.XAExceptionUtils.xaException(XAExceptionUtils.java:8)
	at io.agroal.narayana.LocalXAResource.rollback(LocalXAResource.java:89)
	at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelAbort(XAResourceRecord.java:338)
	at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.enlistResource(TransactionImple.java:644)
	at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.enlistResource(TransactionImple.java:398)
	at io.agroal.narayana.NarayanaTransactionIntegration.associate(NarayanaTransactionIntegration.java:120)
	at io.agroal.pool.ConnectionPool.getConnection(ConnectionPool.java:257)
	at io.agroal.pool.DataSource.getConnection(DataSource.java:86)
	at io.agroal.api.AgroalDataSource_syYGL47zwCt2LQzSIsPp-v9wXhg_Synthetic_ClientProxy.getConnection(Unknown Source)
	at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:160)
	at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:118)
	at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81)
	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:653)
	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:695)
	at org.apache.camel.component.sql.SqlProducer.processInternal(SqlProducer.java:150)
	at org.apache.camel.component.sql.SqlProducer.process(SqlProducer.java:137)
	at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:65)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:210)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.handleFirst(RedeliveryErrorHandler.java:462)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:438)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeFromQueue(DefaultReactiveExecutor.java:240)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.executeFromQueue(DefaultReactiveExecutor.java:77)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.await(DefaultAsyncProcessorAwaitManager.java:95)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:84)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:200)
	at org.apache.camel.impl.engine.CamelInternalProcessor.processTransacted(CamelInternalProcessor.java:397)
	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:327)
	at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:102)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeFromQueue(DefaultReactiveExecutor.java:240)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.executeFromQueue(DefaultReactiveExecutor.java:77)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.await(DefaultAsyncProcessorAwaitManager.java:95)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:84)
	at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:32)
	at org.apache.camel.impl.engine.CamelInternalProcessor.processTransacted(CamelInternalProcessor.java:397)
	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:327)
	at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:102)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeFromQueue(DefaultReactiveExecutor.java:240)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.executeFromQueue(DefaultReactiveExecutor.java:77)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.await(DefaultAsyncProcessorAwaitManager.java:95)
	at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:84)
	at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:32)
	at org.apache.camel.jta.TransactionErrorHandler.processByErrorHandler(TransactionErrorHandler.java:234)
	at org.apache.camel.jta.TransactionErrorHandler$1.run(TransactionErrorHandler.java:197)
	at org.apache.camel.quarkus.component.jta.TransactionalJtaTransactionPolicy.runWithTransaction(TransactionalJtaTransactionPolicy.java:47)
	at org.apache.camel.quarkus.component.jta.RequiredJtaTransactionPolicy.run(RequiredJtaTransactionPolicy.java:26)
	at org.apache.camel.jta.TransactionErrorHandler.doInTransactionTemplate(TransactionErrorHandler.java:187)
	at org.apache.camel.jta.TransactionErrorHandler.processInTransaction(TransactionErrorHandler.java:138)
	at org.apache.camel.jta.TransactionErrorHandler.process(TransactionErrorHandler.java:102)
	at org.apache.camel.jta.TransactionErrorHandler.process(TransactionErrorHandler.java:111)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.handleFirst(RedeliveryErrorHandler.java:462)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:438)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
	at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:163)
	at org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:354)
	at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:330)
	at org.apache.camel.component.scheduler.SchedulerConsumer.sendTimerExchange(SchedulerConsumer.java:70)
	at org.apache.camel.component.scheduler.SchedulerConsumer.poll(SchedulerConsumer.java:50)
	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:204)
	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:118)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.sql.SQLException: Enlisted connection used without active transaction
	at io.agroal.pool.ConnectionHandler.verifyEnlistment(ConnectionHandler.java:381)
	at io.agroal.pool.ConnectionHandler.transactionRollback(ConnectionHandler.java:352)
	at io.agroal.narayana.LocalXAResource.rollback(LocalXAResource.java:86)
	... 67 more

Furthermore, if we set quarkus.datasource."target".jdbc.transactions = disabled, the application also fails with quarkus 3.8.2, but succeeds with 3.8.1.

To trigger the exception, we can run

Run application
./mvnw quarkus:dev

Contributors ✨

Thanks goes to these wonderful people (emoji key):

Marco Bungart
Marco Bungart

💻 🚧 📖
Clemens Quoss
Clemens Quoss

🚇

This project follows the all-contributors specification. Contributions of any kind welcome!

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages