Skip to content

Commit

Permalink
Improve Coroutines transaction API
Browse files Browse the repository at this point in the history
As a follow-up of gh-22915, the purpose of this commit is to improve
Coroutines programmatic transaction API to make it more consistent with
the Java one and more idiomatic.

For suspending functions, this commit changes the
TransactionalOperator.transactional extension with a suspending lambda
parameter to a TransactionalOperator.executeAndAwait one which is
conceptually closer to TransactionalOperator.execute Java API so more
consistent.

For Flow, the TransactionalOperator.transactional extension is correct
but would be more idiomatic as a Flow extension.

This commit also adds code samples to the reference documentation.

Closes gh-23627
  • Loading branch information
sdeleuze committed Sep 12, 2019
1 parent e62cb6b commit fc64806
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@ import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
import org.springframework.transaction.ReactiveTransaction

/**
* Coroutines variant of [TransactionalOperator.transactional] with a [Flow] parameter.
* Coroutines variant of [TransactionalOperator.transactional] as a [Flow] extension.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@ExperimentalCoroutinesApi
fun <T : Any> TransactionalOperator.transactional(flow: Flow<T>): Flow<T> =
transactional(flow.asFlux()).asFlow()
fun <T : Any> Flow<T>.transactional(operator: TransactionalOperator): Flow<T> =
operator.transactional(asFlux()).asFlow()

/**
* Coroutines variant of [TransactionalOperator.transactional] with a suspending lambda
* Coroutines variant of [TransactionalOperator.execute] with a suspending lambda
* parameter.
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun <T : Any> TransactionalOperator.transactional(f: suspend () -> T?): T? =
transactional(mono(Dispatchers.Unconfined) { f() }).awaitFirstOrNull()
suspend fun <T : Any> TransactionalOperator.executeAndAwait(f: suspend (ReactiveTransaction) -> T?): T? =
execute { status -> mono(Dispatchers.Unconfined) { f(status) } }.awaitFirstOrNull()
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TransactionalOperatorExtensionsTests {
fun commitWithSuspendingFunction() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
operator.transactional {
operator.executeAndAwait {
delay(1)
true
}
Expand All @@ -48,7 +48,7 @@ class TransactionalOperatorExtensionsTests {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
try {
operator.transactional {
operator.executeAndAwait {
delay(1)
throw IllegalStateException()
}
Expand All @@ -72,7 +72,7 @@ class TransactionalOperatorExtensionsTests {
emit(4)
}
runBlocking {
val list = operator.transactional(flow).toList()
val list = flow.transactional(operator).toList()
assertThat(list).hasSize(4)
}
assertThat(tm.commit).isTrue()
Expand All @@ -89,7 +89,7 @@ class TransactionalOperatorExtensionsTests {
}
runBlocking {
try {
operator.transactional(flow).toList()
flow.transactional(operator).toList()
} catch (ex: IllegalStateException) {
assertThat(tm.commit).isFalse()
assertThat(tm.rollback).isTrue()
Expand Down
47 changes: 45 additions & 2 deletions src/docs/asciidoc/languages/kotlin.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,51 @@ class UserHandler(builder: WebClient.Builder) {
=== Transactions

Transactions on Coroutines are supported via the programmatic variant of the Reactive
transaction management provided as of Spring Framework 5.2. `TransactionalOperator.transactional`
extensions with suspending lambda and Kotlin `Flow` parameter are provided for that purpose.
transaction management provided as of Spring Framework 5.2.

For suspending functions, a `TransactionalOperator.executeAndAwait` extension is provided.

[source,kotlin,indent=0]
----
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
----

For Kotlin `Flow`, a `Flow<T>.transactional` extension is provided.

[source,kotlin,indent=0]
----
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}
----


[[kotlin-spring-projects-in-kotlin]]
== Spring Projects in Kotlin
Expand Down

0 comments on commit fc64806

Please sign in to comment.