New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
No means to synchronize when cancelling a TransactionalOperator Subscription (Publisher) #23304
Comments
Thanks Mark for raising this ticket. We have the exact same problem in SDN-RX: We solved this by wrapping the implicit transactions on the repositories in another transactional operator (neo4j/sdn-rx@44f5937) but apparently, this fix works just by coincidence (changing the timing of things). The implementation of our reactive transaction manager (see here https://github.com/neo4j/sdn-rx/blob/master/spring-data-neo4j-rx/src/main/java/org/neo4j/springframework/data/core/transaction/ReactiveNeo4jTransactionManager.java#L185-L200) uses the new Neo4j driver. The driver provides publishers for both commit and rollback, which eventually issue asynchronous calls to the database (for reference https://github.com/neo4j/neo4j-java-driver/blob/2.0/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java#L67-L91 and https://github.com/neo4j/neo4j-java-driver/blob/2.0/driver/src/main/java/org/neo4j/driver/internal/async/ExplicitTransaction.java#L220-L228). We observed this on the wire as well: Our step verifiers finished, our separate verification of the data much like Marks call with the JDBC template started, began a new transaction and depending on the load of the machine, the acknowledgement of the previous transaction was received on the client. |
TransactionOperator.as(Mono) now no longer short-cuts via a Flux.next() but provides an implementation via Mono.usingWhen(…). The short-cut previously issued a cancellation signal to the transactional Mono causing the transaction cleanup to happen without a handle for synchronization. Using Mono.usingWhen(…) initiates transaction cleanup when the Mono completes eliminating the need for cancellation of the transactional Publisher. This change does not fully fix spring-projectsgh-23304 but it softens its impact because TransactionalOperator.transactional(Mono) avoids cancellation.
TransactionOperator.as(Mono) now no longer short-cuts via a Flux.next() but provides an implementation via Mono.usingWhen(…). The short-cut previously issued a cancellation signal to the transactional Mono causing the transaction cleanup to happen without a handle for synchronization. Using Mono.usingWhen(…) initiates transaction cleanup when the Mono completes eliminating the need for cancellation of the transactional Publisher. This change does not fully fix gh-23304 but it softens its impact because TransactionalOperator.transactional(Mono) avoids cancellation.
Canceling a
Subscription
(in reactive transactions) leads to a state where transaction cleanup happens asynchronously and detached from completion signals.Consider the following code:
Initially, the table is empty and both,
DatabaseClient
andJdbcTemplate
are configured to point to the same database.The assertion with
queryForMap
typically fails withEmptyResultDataAccessException
. This is, because callingintegerFlux.next()
cancels the upstream subscription while emitting completion as soon as an element was emitted.TransactionalOperator
and its backingReactiveTransactionManager
implementations issue a commit to clean up the transaction that happens asynchronously, without an ability to await commit completion.Not sure whether we can fix the problem at all or whether we can mitigate it. One approach could be with
TransactionalOperator.transactional(Mono)
to cancel the innermostPublisher
and hand out aMono
. This change would return a properly boundedMono
and cancellation would happen on the innermostPublisher
and preventing cancellation of thePublisher
that is returned fromTransactionalOperator
.This ticket is an opportunity to discuss that effect and its potential impact on cancellation of
Publishers
which are enhanced for transactions./cc @smaldini @simonbasle
The text was updated successfully, but these errors were encountered: