Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a jooq-kotlin-coroutines module to support transactional coroutines based on reactive streams Publishers #9335

Closed
lukaseder opened this issue Oct 8, 2019 · 28 comments

Comments

@lukaseder
Copy link
Member

lukaseder commented Oct 8, 2019

It should be possible to add some Kotlin extensions directly in the jOOQ core library and ship them to users integrating with Kotlin, without affecting Java users. For example, we could support coroutines as DSLContext extensions or ResultQuery extensions

A proof of concept has been made here:
marshallpierce@72b0b3b

As suggested on the user group:
https://groups.google.com/d/msg/jooq-user/NS5anAYmIgI/FlV3H6gvAwAJ

@marshallpierce
Copy link

I think this shouldn't even be too nasty from a dependency perspective -- simply mark the kotlin stdlib and coroutine dependencies as optional in maven parlance, and users who want to use them will already have those things on their classpath anyway.

@lukaseder
Copy link
Member Author

We have a new module starting from jOOQ 3.14: jooq-kotlin (#6256). Let's see what can be done in there.

@lukaseder
Copy link
Member Author

Looking into this now. It seems that the main purpose of the suggested approach is to replace our call to blocking(() -> { ... }), which invokes ForkJoinPool::managedBlock by Kotlin's suspend syntax. The content that is inside of the transactionResult method here: marshallpierce@72b0b3b#diff-588eae5d26e55b16e058f61c3a1ce658 is the same as that of DefaultDSLContext.

At least, that seems to be it for these DSLContext.transaction() methods. But wouldn't the expectation here be for any database interaction to be "suspending", even without a transaction? Surely, leveraging this language feature for jOOQ's transaction API is better than nothing, but I'd say that a lot of people upvoting this issue would expect more.

@lukaseder
Copy link
Member Author

I don't want to rush anything here (as with reactive support, too: #6298). There seems to be a significant investment, making all sorts of API suspend-enabled for Kotlin users, and I don't fully understand what this means yet.

@marshallpierce
Copy link

My initial desire was to be able to call suspend functions inside a closure passed to transactionResult (and ideally propagate coroutine context from the caller as well).

Use case:

  • in a transaction, query some data
  • call some other thing (which suspends) with that data
  • update data with the response from the suspending call
  • commit

I agree that probably any closure-flavored API like that would benefit from taking a suspend closure, transaction or not.

@lukaseder
Copy link
Member Author

I agree that probably any closure-flavored API like that would benefit from taking a suspend closure, transaction or not.

Exactly. I think this is an all-or-nothing discussion. Supporting coroutines only in a few areas will not be enough. It's never ending, too. Because coroutines are not the same thing as asynchronous calls (e.g. using CompletionStage), which are again not the same thing as reactive Publisher types. For example, we should adapt DAO types to support the existing async/reactive APIs currently supported by jOOQ: #5916. Does that mean that we also need a SuspendingDAO, for Kotlin?

I personally have high hopes that Loom will clean this up for all JVM languages. These things shouldn't be solved on an API level. What's the take on Loom in the kotlin ecosystem?

@marshallpierce
Copy link

Re: Loom, I couldn't speak for the whole Kotlin community, but at least year's JCrete nobody seemed very optimistic. Even if it was shipping today, I at least would still prefer coroutines: APIs like coroutine context and scoped concurrency make them (to my mind) a better eventual goal than "threads, but more lightweight". We use coroutine context to propagate things like "current request id" across fanned out coroutines, something that requires clumsy and error prone manual work to do with threads and threadlocals.

@lukaseder
Copy link
Member Author

Re: Loom, I couldn't speak for the whole Kotlin community, but at least year's JCrete nobody seemed very optimistic

I personally think that Loom lacks the marketing and hype that other asynchronous and/or reactive APIs received for reasons I still don't fully understand.

Even if it was shipping today, I at least would still prefer coroutines: APIs like coroutine context and scoped concurrency make them (to my mind) a better eventual goal than "threads, but more lightweight"

I think this part of Loom is the most widely misunderstood. Loom is much more than "just" about lightweight threads. What it means specifically for the entire ecosystem and adoption is still ... looming, because it develops much slower than API based approaches, and there are no "success stories" yet. (See above comment on marketing and hype).

However, the idea that asynchronous models, reactive models, and suspending models would all be possible on existing APIs (at least, that's the promise of Loom) seems very interesting, because the ultimate show-stopper for any other approach that is the currently blocking JDBC API could be re-used, and with it, an entire ecosystem of proven technology. At least Oracle has been betting on this, and thus has abandoned ADBA.

What I meant by my previous doubts is that by using any of the other API alternatives, jOOQ has kept offering a wishy-washy solution to integrating jOOQ in the programming models (e.g. async via CompletionStage or reactive via Publisher) without being able to truly leverage the model on the implementation level. At the same time, 95% of the jOOQ API is blocking while only a few API elements pretend to be non-blocking. This is just not a good solution for jOOQ. When maintaining jOOQ, this incompleteness of vision just to be able to superficially check some boxes has been quite the source of frustration.

Add to that yet another approach, which are Kotlin co-routines, which attempt to solve the problems again on an API level, on a per-function basis. What is being requested here is again to be able to check some box on a superficial level, because in order to truly offer Kotlin coroutine support, I'm afraid the entirety of jOOQ's API would have to be offered in a variety of "red functions" flavours (see: https://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function)

Having said so, if a user is happy accepting suspending Kotlin functions to fulfil programming model needs, without really getting the benefit throughout the stack, including within the database, then it would be much easier to just wrap jOOQ API calls (the query building) in some suspend services. If this is really "just" about the transaction API in jOOQ, then that is a part that can be very easily replaced by a few lines of custom code, or a Kotlin native transaction management library that leverages suspend.

We use coroutine context to propagate things like "current request id" across fanned out coroutines, something that requires clumsy and error prone manual work to do with threads and threadlocals.

I'm not sure what passing around context has to do with the thread model, though? For example, with the existing jOOQ transaction API, it would be possible to implement suspension in a jOOQ TransactionProvider, and pass on any context from the surrounding DSLContext to the TransactionCallable's Configuration object:

// Using Java syntax...

// Here, we're in some thread/fiber/execution context 1
ctx1.transactionResult(ctx2 -> {
  // Here, we're transparently in some thread/fiber/execution context 2
  ctx2.dsl().select(...).fetch();
});

We're not offering this out of the box yet, but again, it should be possible to achieve using SPIs. It's not easy to do (it's probably easier to use a third party library for suspending transaction support), but I'm trying to argue against the various API based solutions to these asynchronous execution models, which ripple through the entire stack, making existing tools like jOOQ or JDBC hard to use.

@lukaseder
Copy link
Member Author

Related topic: #6298

@lukaseder
Copy link
Member Author

For the record, great slide from a great talk
image

https://www.youtube.com/watch?v=23HjZBOIshY

@marshallpierce
Copy link

I think we might be talking past each other... I'm all in favor of Oracle (and Ron, who is very capable) pressing forward on Loom. It's just not all that relevant to me right now, even if it may end up being the bees knees for when it ships.

In projects I touch, I don't have a need for jOOQ (or JDBC) to be all nonblocking under the hood because with typical small DB connection pool pool sizes, it's no burden to maintain a similarly small thread pool to handle blocking i/o on those connections. While it would be nice, I suppose, to avoid that, it's not really an issue in the same way that HTTP clients and servers really benefit from multiplexing many connections onto a few threads: it is more typical to have 1000 (or 100,000) HTTP connections slowly trickling data back and forth than it is to do the same with a database, at least in my experience.

I don't follow how using suspending code in a TransactionalCallable could be addressed via SPI since TransactionalCallable's run() isn't suspending... what am I missing?

@lukaseder
Copy link
Member Author

It's just not all that relevant to me right now

I understand you'd like a quick win. My point is, jOOQ likely won't offer one for the reasons I've mentioned.

Here's an example of an extremely simple, alternative transaction manager that can be used with jOOQ: https://github.com/witoldsz/ultm

Here's another example of someone doing similar things for jOOQ/ZIO:
https://github.com/Shastick/zio-jooq

If the problem this feature request is trying to solve is isolated out of jOOQ and into a dedicated library like the above, then it would be really simple to implement. Adding support for Kotlin suspend functions inside of jOOQ is much more design work, which I find, at this point premature. I'm not saying it can't be done, and I'm not saying it shouldn't be done. I'm saying that these things should be addressed (if at all) in a much broader context of addressing alternative execution models in general, which include reactive models (#6298).

While this seems like a simple thing for you (and it probably is), it is absolutely not for jOOQ.

I don't follow how using suspending code in a TransactionalCallable could be addressed via SPI since TransactionalCallable's run() isn't suspending... what am I missing?

I was not discussing a Kotlin specific way of doing this. I was trying to hint at how this could be achieved with a Loom style suspension...

@marshallpierce
Copy link

I totally understand your reluctance to sprinkle suspend all over jOOQ. :) I think zio-jooq is basically the equivalent of withContext(Dispatcher.IO) { ... } for Kotlin coroutines (koroutines?), so that much at least is done. I suppose that brings us back to copy and pasting transactionResult as a Kotlin extension function that takes a suspend function type? 🤷‍♂️

@PedroSena
Copy link

I totally understand your reluctance to sprinkle suspend all over jOOQ. :) I think zio-jooq is basically the equivalent of withContext(Dispatcher.IO) { ... } for Kotlin coroutines (koroutines?), so that much at least is done. I suppose that brings us back to copy and pasting transactionResult as a Kotlin extension function that takes a suspend function type? man_shrugging

@marshallpierce Do you mind sharing the workaround you are using ? I'm on the same situation and not sure I fully understood what you are suggesting

@hantsy
Copy link

hantsy commented Mar 3, 2022

I browsed the PR, there is a fetchAwait fun to convert action to a Kotlin Coroutines. What is the difference of the following usage:

  • The dslContext.select().from().where().fetchAwait fun
  • dslContext.select().from().where().awaitSingle() , I am using this in my Spring WebFlux + Kotlin Coroutines project.

@lukaseder
Copy link
Member Author

@hantsy How does the latter work? Is that an extension method on Publisher?

@hantsy
Copy link

hantsy commented Mar 4, 2022

@hantsy How does the latter work? Is that an extension method on Publisher?

Reactor provides Kotlin extensions for ReactiveStreams and Reactor API.

@lukaseder
Copy link
Member Author

I've looked into these things now. Adding these dependencies:

<dependency>
    <groupId>io.projectreactor.kotlin</groupId>
    <artifactId>reactor-kotlin-extensions</artifactId>
    <version>1.1.5</version>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core-jvm</artifactId>
    <version>1.5.2</version>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-reactor</artifactId>
    <version>1.5.2</version>
</dependency>

It seems possible to do something like this:

public suspend fun x(): Int = coroutineScope {
    ctx.insertInto(T4).columns(T4.T4_ID, T4.T3_ID, T4.VAL).values(1, 1, "a").awaitSingle()
}

That seems to be sufficient in terms of bridging between the two worlds (reactive and coroutines). People will want to have reactive transactions in coroutine form, too, which currently isn't easy with jOOQ. It's necessary to revert to using R2DBC transaction API directly. It's a pending task here: #11717

As such, I think there's nothing more to do for jOOQ. The recommendation is:

  • To use jOOQ's R2DBC integration
  • To use the kotlinx-coroutines-reactor extensions

3.14 Better Kotlin and Scala Support automation moved this from To do to Done Mar 8, 2022
@lukaseder
Copy link
Member Author

I'm adding some coroutine demo code to the new demo here: https://github.com/jOOQ/demo

Can confirm, it's really as simple as adding those kotlinx-coroutines-xyz dependencies. Well done, kotlin!

@lukaseder lukaseder reopened this Jun 1, 2022
3.14 Better Kotlin and Scala Support automation moved this from Done to In progress Jun 1, 2022
@lukaseder
Copy link
Member Author

OK, I get it now. Some dogfooding never hurts. After implementing that demo and answering this question here: https://stackoverflow.com/a/72457496/521799, I can tell that even with the kotlinx-coroutines-reactive extensions, the amount of glue code one has to write to bridge between the reactive world and the coroutines world seems excessive if this has to be done all the time.

So, how about a jooq-kotlin-coroutines extensions module, which can be pulled in optionally, and which:

  • Adds the bridge extensions as dependencies
  • Adds extension functions such as transactionCoroutine

Specifically for transactions, it can be annoying to write this all the time:

suspend fun mySuspendFunction(jooqContext: DSLContext): Any {
    return jooqContext.transactionPublisher { config ->

        // Turn the suspension result into a Mono, which implements the reactive
        // streams Publisher<T> SPI, which jOOQ expects as a result from a 
        // TransactionalPublishable
        mono {
            anotherSuspendFunction(config)
        }
    }

    // Turn the Publisher<T> that is returned from transactionPublisher() back 
    // into a suspension result
    .awaitFirst()
}

Probably better:

suspend fun mySuspendFunction(jooqContext: DSLContext): Any {
    return jooqContext.transactionCoroutine { config ->
        anotherSuspendFunction(config)
    }
}

@lukaseder
Copy link
Member Author

In principle, the Publisher based implementation should work also if we don't have an R2DBC ConnectionFactory available. All of jOOQ's Publisher implementations have a blocking, JDBC backed fallback implementation. E.g. the R2DBC.BlockingTransactionSubscription looks like this:

static final class BlockingTransactionSubscription<T> extends AbstractSubscription<T> {
    final DSLContext                  ctx;
    final TransactionalPublishable<T> transactional;

    BlockingTransactionSubscription(
        DSLContext ctx,
        Subscriber<? super T> subscriber,
        TransactionalPublishable<T> transactional
    ) {
        super(subscriber);

        this.ctx = ctx;
        this.transactional = transactional;
    }

    @Override
    final void request0() {
        try {
            subscriber.onNext(ctx.transactionResult(c -> block(transactional.run(c))));
            subscriber.onComplete();
        }
        catch (Throwable t) {
            subscriber.onError(t);
        }
    }
}

@lukaseder lukaseder changed the title Support Kotlin coroutines as DSLContext extensions Add a jooq-kotlin-coroutines module to support transactional coroutines based on reactive streams Publishers Jun 1, 2022
@lukaseder
Copy link
Member Author

I still think that the bridge libraries are sufficient for ResultQuery and other usage (i.e. just add the awaitXYZ() method call to the end of a query). So, this issue here only adds convenience to transactions, for now..

3.14 Better Kotlin and Scala Support automation moved this from In progress to Done Jun 1, 2022
@hantsy

This comment was marked as off-topic.

@lukaseder

This comment was marked as resolved.

@rocketraman
Copy link

In case anyone stumbles on this issue and uses the code from comment #9335 (comment), please beware that code has a big issue in that it makes blocking calls (assuming the underlying driver is a standard JDBC driver) in a suspending function. For example, provider.begin(ctx) obtains connections from a connection pool, and can block. The transaction commit and rollback may block as well.

This will almost certainly cause hangs in a Kotlin application under load that uses coroutines, as those calls will block threads in the DefaultDispatcher.

This variation ensures those blocking calls are run on the IO dispatcher instead:

suspend fun <T> DSLContext.suspendingTransactionResult(block: suspend DSLContext.() -> T): T {
  val derivedConfiguration = configuration().derive()
  val ctx = DefaultTransactionContext(derivedConfiguration)
  val provider = ctx.configuration().transactionProvider()
  val listeners = TransactionListeners(ctx.configuration())
  var committed = false

  val result: T

  @Suppress("TooGenericExceptionCaught")
  try {
    // this can block (especially provider.begin because it retrieves a connection from the pool), move it to an IO thread
    withContext(Dispatchers.IO) {
      try {
        listeners.beginStart(ctx)
        provider.begin(ctx)
      } finally {
        listeners.beginEnd(ctx)
      }
    }

    result = block(DSL.using(derivedConfiguration))

    // this can block due to savepoints and stuff, move it to an IO thread
    withContext(Dispatchers.IO) {
      try {
        listeners.commitStart(ctx)
        provider.commit(ctx)
        committed = true
      } finally {
        listeners.commitEnd(ctx)
      }
    }
  }
  // [#6608] [#7167] Errors are no longer handled differently
  catch (cause: Throwable) {
    // [#8413] Avoid rollback logic if commit was successful (exception in commitEnd())
    if (!committed) {
      if (cause is Exception) {
        ctx.cause(cause)
      } else {
        ctx.causeThrowable(cause)
      }

      withContext(Dispatchers.IO) {
        listeners.rollbackStart(ctx)
        try {
          provider.rollback(ctx)
        } catch (suppress: Exception) {
          cause.addSuppressed(suppress)
        }
        listeners.rollbackEnd(ctx)
      }
    }

    // [#6608] [#7167] Errors are no longer handled differently
    when (cause) {
      is RuntimeException, is Error -> throw cause
      else -> throw DataAccessException(
        if (committed) "Exception after commit" else "Rollback caused",
        cause
      )
    }
  }

  return result
}

@lukaseder
Copy link
Member Author

Documentation has been added: https://www.jooq.org/doc/latest/manual/sql-building/kotlin-sql-building/kotlin-coroutines/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

6 participants