Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First Request to Reactive SQL fail #14709

Closed
hernael opened this issue Jan 29, 2021 · 69 comments
Closed

First Request to Reactive SQL fail #14709

hernael opened this issue Jan 29, 2021 · 69 comments
Labels
area/hibernate-reactive Hibernate Reactive kind/bug Something isn't working

Comments

@hernael
Copy link

hernael commented Jan 29, 2021

Describe the bug
at first time that request an concurrency reactive sql client, it fail.
But the second time it work fine

Expected behavior
that also works fine the first time

Actual behavior
after the application startup and receives the first request to a method with reactive sql concurrent calls, the connection fail.

To Reproduce
In a Reactive SQL Cient Application with resteasy,reactive-pg-client,resteasy-mutiny extensions

  1. create a methos like this:
var query = findAll();
var list = query.page(Page.of(1, 10)).list();
var total = query.count();

return Uni.combine().all().unis(list, total).asTuple().onItem().transform((tuple) -> {
    return PageResponse.newBuilder().setElements(tuple.getItem1()).setTotalCount(tuple.getItem2()).build();
});
  1. statup quarkus
  2. genered an request
  3. this will fail on the line "var total = query.count();"

The error is:

Caused by: java.lang.IllegalStateException: session is currently connecting to database
	at org.hibernate.reactive.pool.impl.ProxyConnection.withConnection(ProxyConnection.java:52)
	at org.hibernate.reactive.pool.impl.ProxyConnection.selectJdbc(ProxyConnection.java:109)
	at org.hibernate.reactive.loader.ReactiveLoader.executeReactiveQueryStatement(ReactiveLoader.java:106)
	at org.hibernate.reactive.loader.ReactiveLoader.doReactiveQueryAndInitializeNonLazyCollections(ReactiveLoader.java:63)
	at org.hibernate.reactive.loader.CachingReactiveLoader.doReactiveList(CachingReactiveLoader.java:62)
	at org.hibernate.reactive.loader.CachingReactiveLoader.reactiveListIgnoreQueryCache(CachingReactiveLoader.java:80)
	at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader.reactiveList(ReactiveQueryLoader.java:113)
	at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader.reactiveList(ReactiveQueryLoader.java:87)
	at org.hibernate.reactive.session.impl.ReactiveQueryTranslatorImpl.reactiveList(ReactiveQueryTranslatorImpl.java:122)
	at org.hibernate.reactive.session.impl.ReactiveHQLQueryPlan.performReactiveList(ReactiveHQLQueryPlan.java:114)
	at org.hibernate.reactive.session.impl.ReactiveSessionImpl.lambda$reactiveList$6(ReactiveSessionImpl.java:389)
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1106)
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235)
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:143)
	at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactiveList(ReactiveSessionImpl.java:389)
	at org.hibernate.reactive.session.impl.ReactiveQueryImpl.doReactiveList(ReactiveQueryImpl.java:133)
	at org.hibernate.reactive.session.impl.ReactiveQueryImpl.getReactiveResultList(ReactiveQueryImpl.java:109)
	at org.hibernate.reactive.session.impl.ReactiveQueryImpl.getReactiveSingleResult(ReactiveQueryImpl.java:84)
	at org.hibernate.reactive.mutiny.impl.MutinyQueryImpl.getSingleResult(MutinyQueryImpl.java:168)
	at io.quarkus.hibernate.reactive.panache.common.runtime.CommonPanacheQueryImpl.lambda$count$3(CommonPanacheQueryImpl.java:204)
	at io.quarkus.hibernate.reactive.panache.common.runtime.CommonPanacheQueryImpl.applyFilters(CommonPanacheQueryImpl.java:323)
	at io.quarkus.hibernate.reactive.panache.common.runtime.CommonPanacheQueryImpl.count(CommonPanacheQueryImpl.java:204)
	at io.quarkus.hibernate.reactive.panache.runtime.PanacheQueryImpl.count(PanacheQueryImpl.java:144)
	at com.example.template.repository.reactive.panache.CotPanacheRepository.getPage(CotPanacheRepository.java:46)
	at com.example.template.repository.reactive.panache.CotPanacheRepository_ClientProxy.getPage(CotPanacheRepository_ClientProxy.zig:1614)
	at com.example.template.core.impl.CotCoreImpl.getPage(CotCoreImpl.java:37)
	at com.example.template.core.impl.CotCoreImpl_ClientProxy.getPage(CotCoreImpl_ClientProxy.zig:349)
	at com.example.template.api.CotApi.getPage(CotApi.java:48)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:170)
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:130)
	at org.jboss.resteasy.core.ResourceMethodInvoker.internalInvokeOnTarget(ResourceMethodInvoker.java:643)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:507)
	... 25 more

Environment (please complete the following information):

  • Output of uname -a or ver: Linux 5.3.18-lp152.60-default x86_64 x86_64 x86_64 GNU/Linux
  • Output of java -version: OpenJDK 64-Bit Server VM Corretto-11.0.10.9.1 (build 11.0.10+9-LTS, mixed mode)
  • GraalVM version (if different from Java):
  • Quarkus version or git rev: 1.10.5, 1.11.0 and 1.11.1
  • Build tool (ie. output of mvnw --version or gradlew --version): 3.6.3
@hernael hernael added the kind/bug Something isn't working label Jan 29, 2021
@ghost ghost added the triage/needs-triage label Jan 29, 2021
@cescoffier cescoffier added area/hibernate-reactive Hibernate Reactive and removed triage/needs-triage labels Feb 1, 2021
@gsmet
Copy link
Member

gsmet commented Feb 1, 2021

/cc @Sanne @gavinking @DavideD

@DavideD
Copy link
Contributor

DavideD commented Feb 1, 2021

I'm having a look

@DavideD
Copy link
Contributor

DavideD commented Feb 1, 2021

Is there a test project somewhere that I can look at?

@hernael
Copy link
Author

hernael commented Feb 1, 2021

not yet, but it is easy to reproduce. Only make two concurrent reactive request to database (postgresql 13 in my case), startup the application and execute a reactive service.
If you wish i could create a demo test repo

@DavideD
Copy link
Contributor

DavideD commented Feb 1, 2021

If you wish i could create a demo test repo

That would be great and save me a lot of time

@hernael
Copy link
Author

hernael commented Feb 1, 2021

ok, give me about 10 min

@DavideD
Copy link
Contributor

DavideD commented Feb 1, 2021

Thanks

@Sanne
Copy link
Member

Sanne commented Feb 1, 2021

This looks like the same issue as others have reported: not using a transaction and/or not delegating to the right thread.

IMO it's Panache Reactive which should not allow this usage and throw a better exception.

@DavideD
Copy link
Contributor

DavideD commented Feb 1, 2021

This looks like the same issue as others have reported: not using a transaction and/or not delegating to the right thread.

I think so, but I prefer to check with the test case

@Sanne
Copy link
Member

Sanne commented Feb 1, 2021

(I don't know for sure and will let @DavideD reprodice and confirm, I'm just suspecting this might be the case since I see RestEasy being used in the stacktrace)

@hernael
Copy link
Author

hernael commented Feb 1, 2021

@hernael
Copy link
Author

hernael commented Feb 1, 2021

in repository example is even worse, since it gives error in all requests :(

2021-02-01 11:59:08,151 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-23) could not execute query: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Session/EntityManager is closed
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1081)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
	at org.hibernate.reactive.pool.impl.Handlers.lambda$toCompletionStage$0(Handlers.java:26)
	at io.vertx.sqlclient.impl.SqlResultHandler.complete(SqlResultHandler.java:98)
	at io.vertx.sqlclient.impl.SqlResultHandler.handle(SqlResultHandler.java:87)
	at io.vertx.sqlclient.impl.SqlResultHandler.handle(SqlResultHandler.java:33)
	at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:241)
	at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:88)
	at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:101)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
	at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:357)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:163)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:78)
	at io.vertx.pgclient.impl.codec.PgCommandCodec.handleReadyForQuery(PgCommandCodec.java:138)
	at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:226)
	at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:86)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: Session/EntityManager is closed
	at org.hibernate.internal.AbstractSharedSessionContract.checkOpen(AbstractSharedSessionContract.java:375)
	at org.hibernate.engine.spi.SharedSessionContractImplementor.checkOpen(SharedSessionContractImplementor.java:148)
	at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1394)
	at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:381)
	at org.hibernate.internal.SessionImpl.getEntityUsingInterceptor(SessionImpl.java:563)
	at org.hibernate.loader.Loader.getRow(Loader.java:1609)
	at org.hibernate.loader.Loader.getRowFromResultSet(Loader.java:740)
	at org.hibernate.loader.Loader.getRowsFromResultSet(Loader.java:1039)
	at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader.getRowsFromResultSet(ReactiveQueryLoader.java:203)
	at org.hibernate.reactive.loader.ReactiveLoaderBasedResultSetProcessor.reactiveExtractResults(ReactiveLoaderBasedResultSetProcessor.java:73)
	at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader$1.reactiveExtractResults(ReactiveQueryLoader.java:69)
	at org.hibernate.reactive.loader.ReactiveLoader.reactiveProcessResultSet(ReactiveLoader.java:123)
	at org.hibernate.reactive.loader.ReactiveLoader.lambda$doReactiveQueryAndInitializeNonLazyCollections$0(ReactiveLoader.java:71)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
	... 39 more


Process finished with exit code 137 (interrupted by signal 9: SIGKILL)

@hernael
Copy link
Author

hernael commented Feb 1, 2021

please note that just created a service called hello/first. It does not have initial concurrency. Now, if service "hello/first" is executed at first and then execute "hello" service, then no error is present

@DavideD
Copy link
Contributor

DavideD commented Feb 1, 2021

Yes, this is the problem @Sanne was talking about.
When I run the test I have the following exception:

021-02-01 17:25:34,176 ERROR [org.jbo.res.res.i18n] (executor-thread-5) RESTEASY002020: Unhandled asynchronous exception, sending back 500: org.jboss.resteasy.spi.ApplicationException: java.lang.AssertionError: This needs to be run on the Vert.x event loop
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:183)
	at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:130)
	at org.jboss.resteasy.core.ResourceMethodInvoker.internalInvokeOnTarget(ResourceMethodInvoker.java:643)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:507)
	at org.jboss.resteasy.core.ResourceMethodInvoker.lambda$invokeOnTarget$2(ResourceMethodInvoker.java:457)
	at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:364)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTarget(ResourceMethodInvoker.java:459)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:419)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:393)
	at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:68)
	at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:492)
	at org.jboss.resteasy.core.SynchronousDispatcher.lambda$invoke$4(SynchronousDispatcher.java:261)
	at org.jboss.resteasy.core.SynchronousDispatcher.lambda$preprocess$0(SynchronousDispatcher.java:161)
	at org.jboss.resteasy.core.interception.jaxrs.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:364)
	at org.jboss.resteasy.core.SynchronousDispatcher.preprocess(SynchronousDispatcher.java:164)
	at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:247)
	at io.quarkus.resteasy.runtime.standalone.RequestDispatcher.service(RequestDispatcher.java:73)
	at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler.dispatch(VertxRequestHandler.java:138)
	at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler.access$000(VertxRequestHandler.java:41)
	at io.quarkus.resteasy.runtime.standalone.VertxRequestHandler$1.run(VertxRequestHandler.java:93)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2415)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
	at java.base/java.lang.Thread.run(Thread.java:832)
	at org.jboss.threads.JBossThread.run(JBossThread.java:501)
Caused by: java.lang.AssertionError: This needs to be run on the Vert.x event loop
	at org.hibernate.reactive.session.impl.ReactiveSessionImpl.<init>(ReactiveSessionImpl.java:129)
	at org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl.openSession(MutinySessionFactoryImpl.java:67)
	at io.quarkus.hibernate.reactive.runtime.ReactiveSessionProducer.createMutinySession(ReactiveSessionProducer.java:32)

@hernael
Copy link
Author

hernael commented Feb 1, 2021

You say that the problem is resteasy? even if concurrent panache reactive code work fine with just before execute a call to non-concurrent panache code?
Note: an unit test only to repository layer also fails

@DavideD
Copy link
Contributor

DavideD commented Feb 1, 2021

This will work if you wrap everything in a transaction:

   public Uni<PageResponse<Entity>> getPage() {
        return Panache.withTransaction(() -> {
            var query = this.findAll();
            var list = query.page( Page.of( 1, 10 ) ).list();
            var total = query.count();
            return Uni.combine().all().unis(list, total).asTuple().onItem().transform((tuple) ->
                PageResponse.<Entity>newBuilder()
                    .setElements(tuple.getItem1())
                    .setTotalCount(tuple.getItem2())
                    .build() );
        });
    }

@hernael
Copy link
Author

hernael commented Feb 1, 2021

Ok, i will use this way.
But i still think there are an issue in somewhere. Maybe in future releases no longer have to use transaction in this case.
In any case, thank you very much!!!

@Sanne
Copy link
Member

Sanne commented Feb 1, 2021

You say that the problem is resteasy? even if concurrent panache reactive code work fine with just before execute a call to non-concurrent panache code?

Well it's not RESTEasy's fault, but a limitation of our current integration: it's expected to run all Hibernate Reactive code within a reactive context - which at this time means within the same Vert.x eventloop as the vert.x SQL client which it's using underneath.

Failing to run it on the right thread will produce such obscure errors; we certainly need to fix this.

N.B. if you run you reproducer as a test it will fail, as there are assertions which explicitly check for this code to be run on the right thread. One way we could fix this is to promote such assertions to hard requirements - haven't done so yet as we were exploring options, and trying to figure if there was a way to allow this without being that restrictive.

@hernael
Copy link
Author

hernael commented Feb 1, 2021

Now, after calling a simple request to panache reactive, all subsequent concurrent request to database work fine!
I do not know how initialization connections works in panache reactive, but from the behavior and the error "Session/EntityManager is closed" all this looks like initialization connections trouble in hibernete reactive.
Isn't it?

but anyway, thanks for helping me and giving me a solution to my issue

@Sanne
Copy link
Member

Sanne commented Feb 1, 2021

I do not know how initialization connections works in panache reactive, but from the behavior and the error "Session/EntityManager is closed" all this looks like initialization connections trouble in hibernete reactive.
Isn't it?

The code is fine but a single Session is not designed to be threadsafe; a consequence of running this on the wrong thread is that both your original thread and the Vert.x thread make state changes, leading likely to illegal state (and complex to diagnose issues).

So we need to ensure all operations are being scheduled on the right thread.

@geoand
Copy link
Contributor

geoand commented Feb 17, 2021

Is this still an issue?

@hernael
Copy link
Author

hernael commented Feb 17, 2021

this still fails for me in version 1.11.3

@FroMage
Copy link
Member

FroMage commented Apr 14, 2021

So, I'm a bit puzzled by the state of this issue. What is wrong and what can we fix?

It looks like we're getting cryptic exceptions such as java.lang.IllegalStateException: session is currently connecting to database which is a totally bogus error message which does not at all reflect what's going on.

I was sorta believing that the issue was that we were invoking an operation on a worker thread which is forbidden, but I tried it in resteasy-reactive and got the same issue on an IO thread.

So let's ask this very clearly @Sanne @DavideD @gavinking:

  • what operations can I do on the reactive session without a transaction?
  • is there a different outcome depending on which thread is calling the operation? As in: should be it a worker thread or an IO thread?
  • who is going to fix the error message? I could fix it in the Panache layer when I access a reactive session: I can check the current thread, not sure if I can check if there's a current transaction, though (how?). But, won't this affect even users of the reactive session without the Panache layer? If yes, then that error check belongs in the HR quarkus extension, or the HR session itself, no?

@DavideD
Copy link
Contributor

DavideD commented Apr 20, 2021

what operations can I do on the reactive session without a transaction?

I don't think in this case the problem is the lack of a transaction, I think the issue get solved because when one calls withTransaction the block of code runs in the right Vert.x context. As far as I know one could do some reads without a transaction. What happens when we call Panache.withTransaction?

is there a different outcome depending on which thread is calling the operation? As in: should be it a worker thread or an IO thread?

In Hibernate Reactive we check two things:

  • The thread must be an eventLoopThread using io.vertx.core.ContextContext.isOnEventLoopThread()
  • The thread that has created the session is the same thread that is now doing the operation

I don't know if this answer your question.

who is going to fix the error message? I could fix it in the Panache layer when I access a reactive session: I can check the current thread, not sure if I can check if there's a current transaction, though (how?). But, won't this affect even users of the reactive session without the Panache layer? If yes, then that error check belongs in the HR quarkus extension, or the HR session itself, no?

We probably need to have better error messages on the Hibernate Reactive side. But first we need to understand a bit better why this is happening

I didn't have much time to investigate this issue further, but I should be able to get back to it soon

@geoand
Copy link
Contributor

geoand commented Jan 26, 2022

Is this still a problem with the latest Quarkus?

@mkouba
Copy link
Contributor

mkouba commented Jan 26, 2022

It's still a "problem". I can confirm that Sanne's solution works perfectly but it's not very obvious for beginners (like me ;-) and the error message is completely useless.

@hernael
Copy link
Author

hernael commented Jan 26, 2022

Is this still a problem with the latest Quarkus?

Hi @geoand, yes this is still a problem.
Although if we add @ReactiveTransactional annotation to the method then it already works correctly.
However I think that this "trick" should not be done, since there are two "SELECT" queries and there is no transaction.

@geoand
Copy link
Contributor

geoand commented Jan 26, 2022

Thanks for the info folks

@FroMage
Copy link
Member

FroMage commented Jan 27, 2022

Pfff, to get back to @Sanne's issue, which is:

Without a transaction, if we obtain two Uni executing an operation, and execute them without chaining them, something gets broken in HR.

I mean, correct me if I'm wrong, but this seems to be a problem entirely unrelated with Quarkus or Panache.

In any case, to get back to the drawing board, I recall that Julien Viet and @cescoffier told me that with things like Netty's async operations, even if you don't chain the returned promises, they should still all be executed in the sequence that you obtained/invoked them in. Because Netty keeps a queue underneath, so invocation order is always preserved.

Now, this is true in the context of eager promises. But Uni are lazy, and only execute on subscription, which means it's the subscription order that should order the operations. OTOH, I'm pretty sure that HR uses CompletionStage underneath, which are eager, so I'm not sure about the order semantics of HR calls.

This really looks like a more fundamental question than this issue illustrates.

@Sanne
Copy link
Member

Sanne commented Jan 27, 2022

Pfff, to get back to @Sanne's issue, which is:

Without a transaction, if we obtain two Uni executing an operation, and execute them without chaining them, something gets broken in HR.

This is most likely relating with the ordering of events. When using the Transaction to wrap all operations, the beginning of the transaction enforces opening of a connection and at that point HR will enforce that such connection-opening work and all subsequent operations are executed within the same vertx. context.

I mean, correct me if I'm wrong, but this seems to be a problem entirely unrelated with Quarkus or Panache.

It relates to Quarkus as it should ensure different tasks are not re-ordered or dispatched to multiple vertx. context, multiple verticles, other threads, etc...

In any case, to get back to the drawing board, I recall that Julien Viet and @cescoffier told me that with things like Netty's async operations, even if you don't chain the returned promises, they should still all be executed in the sequence that you obtained/invoked them in. Because Netty keeps a queue underneath, so invocation order is always preserved.

Sure. But Quarkus doesn't seem to honour the premise: there are multiple parallel threads executing multiple vertx contexts. Need to ensure all end user tasks are dispatched consistently and respecting the expected order.

Now, this is true in the context of eager promises. But Uni are lazy, and only execute on subscription, which means it's the subscription order that should order the operations. OTOH, I'm pretty sure that HR uses CompletionStage underneath, which are eager, so I'm not sure about the order semantics of HR calls.

It's been a long time since I made the above "spot the difference" comment so TBH I don't remember it all, but I think the gist was that one of the code examples triggers use of executeInVertxEventLoop in such a way to throw off the sequential ordering guarantees one would expect.

@DavideD
Copy link
Contributor

DavideD commented Jan 27, 2022

It's also not possible to use CompletableFuture.allOf( ) or Uni.combine().all() with Hibernate Reactive. That's because the order of the operations using the same session is not guaranteed. Maybe it works in this case but it's something one should not rely on.

That's why the first solution proposed by @Sanne works.
I would also expect the second example to work. I need to check what's going on with it.

We briefly explain this in the Hibernate Reactive guide.

Like @FroMage said, Hibernate Reactive internally uses the CompletionStage API, but the Mutiny layer on top of it should create a "lazy layer" that it's only executed at subscription time.

@hernael
Copy link
Author

hernael commented Jan 27, 2022

Please remember that if you first execute another simple method to establishes connection and then call the complex method with Uni.combine().all() (doesn't matter @ReactiveTransactional annotation) everything works OK

@clahres
Copy link

clahres commented Aug 10, 2022

@DavideD do I understand you correctly that I should not use a pattern like this when using hibernate reactive? Even if the @ReactiveTransactional annotation "fixes" the issue?

  @GET
  @ReactiveTransactional
  fun range(
      @QueryParam("startDate") startDate: LocalDate,
      @QueryParam("endDate") endDate: LocalDate
  ): Uni<List<Entry>> {
    return Multi.createFrom()
        .items(startDate.datesUntil(endDate.plusDays(1)))
        .onItem()
        .transformToUniAndMerge { entryResource.list(userRef = "puschel", date = it) }
        .collect()
        .asList()
        .onItem()
        .transform { it.flatten() }
  }

entryResource.list looks like:

  @GET
  fun list(
      @QueryParam("userRef") @DefaultValue("") userRef: String,
      @QueryParam("page") @DefaultValue("0") pageIndex: Int = 0,
      @QueryParam("size") @DefaultValue("100") pageSize: Int = 100,
      @QueryParam("date") date: LocalDate?
  ): Uni<List<Entry>> {
    val query = if (date == null) "userRef = :userRef" else "userRef = :userRef and date = :date"
    val params = mutableMapOf<String, Any>("userRef" to userRef)
    date?.let { params.put("date", it) }
    return repository.find(query, params).page<Entry>(Page.of(pageIndex, pageSize)).list()
  }

@einarjohnson
Copy link

I am facing the same problem where I encounter the session is currently connecting to database error when combining uni's that have performed a read-only operation on the database. what is the best practise here to avoid this error?

@DavideD
Copy link
Contributor

DavideD commented Dec 20, 2022

@clahres That's correct, you shouldn't run code like that, the problem is that you are using transformToUniAndMerge and sharing the session. Basically, you are running queries in parallel using the same session. This might work in some situations but it's not something one should rely on. I think transformToUniAndConcatenate is the correct one to use (but I need to doublecheck that it actually runs the queries one after the other).

@einarjohnson I would have to look at the code, but it usually happens that there a session used in parallel somewhere.

@einarjohnson
Copy link

einarjohnson commented Dec 22, 2022

@DavideD I have a test that reproduces this error.

@QuarkusTest
public class TestPanache {

    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(TestPanache.class);

    @Inject
    @Named("MySqlShipmentRepository")
    ShipmentRepository shipmentRepository;

    @Test
    public void testPanacheMutinyFind_SessionError(){
        List<Integer> ids = List.of(123,571,298109,1239,29,1,666,1337);
        UniJoin.Builder<Shipment> builder = Uni.join().builder();
        for(Integer id: ids){
            builder.add(shipmentRepository.findById(1));
        }
        List<Shipment> shipments = builder.joinAll().andCollectFailures().await().atMost(Duration.ofSeconds(30L));
        log.info("Results", shipments);
    }
}

However, if I wrap the uni building block inside a @ReactiveTransactional annotated method the problem disappears.

@QuarkusTest
public class TestPanache {

    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(TestPanache.class);

    @Inject
    @Named("MySqlShipmentRepository")
    ShipmentRepository shipmentRepository;

    @Test
    public void testPanacheMutinyFind_SessionError(){
        var shipments = callDatabase().await().indefinitely();
    }

    @ReactiveTransactional
    public Uni<List<Shipment>> callDatabase() {
        List<Integer> ids = List.of(123,571,298109,1239,29,1,666,1337);
        UniJoin.Builder<Shipment> builder = Uni.join().builder();
        for(Integer id: ids){
            builder.add(shipmentRepository.findById(1));
        }
        return builder.joinAll().andCollectFailures();
    }
}

@DavideD
Copy link
Contributor

DavideD commented Dec 22, 2022

It doesn't seem correct.

I don't think joinAll guarantees the order of the operations, so, even if in this simple scenario it might work, it's not something that one should rely on. That's because all the operations in that loop use the same session when running the findById.

This means that running two or more findById, could potentially change the state of the session concurrently. I don't mean concurrently as in "different thread", I mean concurrently as it's happening in two different parallel execution streams.

Probably, the transaction gives some guarantees about when the session gets opened and closed. That's why you don't see the error any more.

@einarjohnson
Copy link

einarjohnson commented Dec 22, 2022

In short, we are writing a lambda handlers that are processing a batch of messages either from Kafka or SQS. The idea is to iterate over all the elements in the batch and call the database similar to how this test is doing it. In the end we have to await on these uni's since we are running inside an AWS lambda.

@DavideD, what is the "best practise" here? Should we not use joinAll to do this and some other operator instead?

My current implementation is roughly like this:

        var unis = new ArrayList<Uni<Boolean>>();
	for (SQSEvent.SQSMessage currentMessage : sqsEvent.getRecords()) {

            unis.add(dbRepository.findById(currentMessage.id).onItem()
                    .transformToUni(
                            dbItem -> dynamodbCache.updateCache(dbItem)));
        }
        if (unis.size() > 0) {
            var result = Uni.join().all(unis).andCollectFailures().await().atMost(Duration.ofSeconds(30));
            log.info("Results: {}", result);
        }

Basically, it's fetching information from the database and calling DynamoDB to update a table with the results found.

@DavideD
Copy link
Contributor

DavideD commented Dec 23, 2022

You need to run a query only after the previous one has completed:

    @ReactiveTransactional
    public Uni<List<Shipment>> callDatabase() {
        List<Integer> ids = List.of(123,571,298109,1239,29,1,666,1337);
        List<Shipment> shipments = new ArrayList<>();
        Uni<Void> loop = Uni.createFrom().voidItem();
        for(Integer id: ids){
            loop = loop.call( () -> shipmentRepository
                            .findById(id)
                            .invoke(shipments::add));
        }
        return loop.replaceWith(shipments) ;
    }

@einarjohnson
Copy link

einarjohnson commented Dec 23, 2022

@DavideD , thank you very much for your feedback/help here, I will try this out.

@crionuke
Copy link

crionuke commented Feb 3, 2023

Hello, everyone! I have the same issue.
So, how to achieve concurrency within the "reactive" method in working with DB?

There is my example (produces error "Detected use of the reactive Session from a different Thread", but with the same idea and more complex logic from more real application produces error "session is currently connecting to database"):

Uni<Void> dispatch() {
    List<Uni<Void>> handlers = Arrays.asList(
            createHandler(1),
            createHandler(2),
            createHandler(3));
    return Uni.join().all(handlers).andCollectFailures()
            .replaceWithVoid();
}

Uni<Void> createHandler(int index) {
    return Uni.createFrom().item(index)
            .emitOn(Infrastructure.getDefaultWorkerPool())
            .invoke(i -> handle(i))
            .replaceWithVoid();
}

void handle(int index) {
    log.info("handle, {}", index);
    Panache.withTransaction(() -> eventRepository.deleteAll()) // just for instance
            .await().indefinitely();
}

Extra questions:

  • Is available to control when a reactive session will be created?
  • Why is there an error with the wrong thread names? (my threads are "executor-thread-*", but in the error text is "vert.x-eventloop-thread-2" and "vert.x-eventloop-thread-2")

Logs

2023-02-03T15:31:37.738+04:00 [] (executor-thread-1) INFO  [co.om.pu.se.fe.sc.DispatcherScheduledTask.handle:64] handle, 2
2023-02-03T15:31:37.738+04:00 [] (executor-thread-0) INFO  [co.om.pu.se.fe.sc.DispatcherScheduledTask.handle:64] handle, 1
2023-02-03T15:31:37.738+04:00 [] (executor-thread-2) INFO  [co.om.pu.se.fe.sc.DispatcherScheduledTask.handle:64] handle, 3
2023-02-03T15:31:37.983+04:00 [] (main) ERROR [io.qu.ar.im.AbstractInstanceHandle.destroyInternal:89] Error occurred while destroying instance of bean
[io.quarkus.hibernate.reactive.runtime.ReactiveSessionProducer_ProducerMethod_createMutinySession_1321d110ee9e92bda147899150401e0a136779c7_Bean]: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session
- this suggests an invalid integration; original thread [163]: 
'vert.x-eventloop-thread-5' current Thread [158]: 'vert.x-eventloop-thread-2'

@DavideD
Copy link
Contributor

DavideD commented Feb 5, 2023

I will need to check but I think the use of the emit on on the worker thread, causes the creation of multiple event loop thread. That's because the Hibernate Reactive only works on an event loop thread. The problem is that Panache.withTransaction won't create a new session each time, and therefore you have multiple threads using the same session.

If you want to execute tasks concurrently (in the sense of parallel reactive streams) you need to create a new session each time. I think you can only do this with the Hibernate Reactive session factory. Something like this should work:

       @Inject
       Mutiny.SessionFactory sf;

       Uni<Void> dispatch() {
               return Uni.join()
                    .all(Arrays.asList( createHandler(1), createHandler(2) ,createHandler(3) )
                    .andCollectFailures()
                    .replaceWithVoid();
       }

	Uni<Void> createHandler(int index) {
		log.info("handle, {}", index);
		return getMutinySessionFactory()
				.openSession()
				.chain( session -> session
						.withTransaction( tx -> deleteAll( session, tx ) )
						.eventually( session::close )
				);
	}

	private Uni<Void> deleteAll(Mutiny.Session session, Mutiny.Transaction tx) {
		try {
			return session.createQuery( "..." ).executeUpdate().replaceWithVoid();
		}
		catch (Throwable t) {
			// We need to make sure that any exception get caught so that it's
			// always possible to close the session in case of error
			return Uni.createFrom().failure( t );
		}
	}

There is an issue with .eventually(...) not actually handling all possible scenarios, but it should be fixed in the next Mutiny release.

@crionuke
Copy link

crionuke commented Feb 5, 2023

@DavideD
Yea, thanks. I've also tried to use the withTransaction method from Mutiny.SessionFactory to acquire an auto-closable session object. And it works. But it's sad because I can't use panache to work with DB in that case.

@einarjohnson
Copy link

einarjohnson commented Feb 13, 2023

@DavideD, I have tried the code you suggested and the session is currently connecting to database error has now disappeared, so thank you very much for the help on that issue. :)

However, I am now seeing a very strange issue crop up now and then when this code runs. It looks as if the shipmentRepository.findById(id) method sometimes returns "stale" or "cached" data that is not reflecting the actual values that are stored in the MySQL database.

I can easily reproduce this locally by putting a breakpoint on the findById method at the onItem line.

public Uni<com.controlant.sls.core.domain.models.Shipment> findById(long shipmentId) {
    return find("shipment_id", shipmentId).firstResult()
            .call(shipment -> Mutiny.fetch(shipment.loggers))
            .onItem().transform(shipmentResponse -> shipmentResponse.toDomainShipment());
}

When I go to the MySQL database directly and update the shipments table and set some columns to a new value and then re-run findById method I notice that after a couple of iterations of doing so the update statements I performed have no effect and I keep getting the same values returned in the shipmentResponse object even though the table itself has a new value set in it's columns.

A simple test/reproducer I have been using

@Named("MySqlShipmentRepository")
@Inject
ShipmentRepository shipmentRepository;

@Test
void reproduce_stale_data() throws InterruptedException {
    while(true) {
        var shipment = shipmentRepository.findById(1).await().indefinitely();
        log.info("Found with ref: {}", shipment.getReference());
        Thread.sleep(1000);
    }
}

I have tried wrapping the findById call inside a Panache.withTransaction without any luck. When I go into the MySQL database and update the reference column on the table I would expect the log.info line would start outputting the new value but this doesn't happen and the first value that was read is always logged out.

That being said, I bumped into this discussion thread: #23166

When I add the Arc.container().instance(Mutiny.Session.class).destroy() statement to my test everything seems to work as expected. I assume this isn't how we should avoid this problem though?

This also seems to do the trick and I get the updated values logged out immediately

@Named("MySqlShipmentRepository")
@Inject
ShipmentRepository shipmentRepository;

@Test
void reproduce_stale_data() throws InterruptedException {
    while(true) {
        var shipmentUni = Panache.getSession()
                .chain(s -> s.clear().flush())
                .chain(s -> shipmentRepository.findShipmentById(1));

        var shipment = shipmentUni.await().indefinitely();
        log.info("Ref is {}", shipment.getReference());
        Thread.sleep(1000);
    }
}

This whole thing feels like some kind of caching issue and I have tried setting the quarkus.hibernate-orm.second-level-caching-enabled to false without any luck. Furthermore, the Panache entity is not using the @Cacheable annotation.

@DavideD
Copy link
Contributor

DavideD commented Feb 14, 2023

You have already figured it out.
There is only one session, the first time you load the value it gets cached in the session and it will stay there until you destroy the session or clear it.

Keep in mind that a session is a lightweight object that should represent a logical single unit of work.

I'll admit that in Quarkus, at the moment, it's hard to figure out when a session is created or destroyed. We are working on it.

@einarjohnson
Copy link

einarjohnson commented Feb 15, 2023

Great, it's good to know I am at least on the right track with this. :)
I think it would be super beneficial to add this to the https://quarkus.io/guides/hibernate-reactive documentation as it isn't very clear you need to do this "magic" when querying the DB using the repository pattern. I may very well be missing something in this document but this wasn't very clear to me until I started digging around on various Github threads/discussions.

@DavideD , do you know when your work might be released into the Quarkus framework? I would like to stay up-to-date and have the codebase as simple as possible if you guys are planning to make changes to the Hibernate/Panache API.

@DavideD
Copy link
Contributor

DavideD commented Feb 15, 2023

The best way to keep track of the changes is to follow the labels area/panache or area/hibernate-reactive.

A PR with a big set of changes has already been sent: #29761
But it shouldn't cause too many changes to your code.

@DavideD
Copy link
Contributor

DavideD commented Feb 15, 2023

I've just realized that if withTransaction throws an exception, the session might not get closed.
Probably better to change the code to:

Uni<Void> createHandler(int index) {
		log.info("handle, {}", index);
		return getMutinySessionFactory()
				.openSession()
				.chain( session -> deleteAll( session ).eventually( session::close ) );
	}

private Uni<Void> deleteAll(Mutiny.Session session) {
		try {
			return session.withTransaction( tx -> session.createQuery( "..." ).executeUpdate() ).replaceWithVoid();
		}
		catch (Throwable t) {
			// We need to make sure that any exception get caught so that it's
			// always possible to close the session in case of error
			return Uni.createFrom().failure( t );
		}
	}

@geoand
Copy link
Contributor

geoand commented Sep 13, 2024

Do we know if this problem still exists in the latest issues of Quarkus?

@DavideD
Copy link
Contributor

DavideD commented Sep 13, 2024

As far as I understand the original problem was about using .combine().all(). Something the Hibernate Reactive doesn't support. I guess we can close this issue

@geoand
Copy link
Contributor

geoand commented Sep 13, 2024

Thanks @DavideD

@geoand geoand closed this as not planned Won't fix, can't repro, duplicate, stale Sep 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/hibernate-reactive Hibernate Reactive kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests