Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanly close down Reactive streams when the Database has been closed #1426

Open
nadavsamet opened this issue Jan 30, 2016 · 2 comments
Open
Milestone

Comments

@nadavsamet
Copy link

I've encountered occasional issues with Slick in our application - it seems that you make a non standard use of Reactive Streams:

akka.stream.impl.ReactiveStreamsCompliance$SignalThrewException: It is illegal to throw exceptions from request(), rule 3.16 at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:111) ~[com.typesafe.akka.akka-stream-experimental_2.11-2.0.2.jar:na] at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.akka$stream$impl$fusing$ActorGraphInterpreter$BatchingActorInputBoundary$$dequeue(ActorGraphInterpreter.scala:120) ~[com.typesafe.akka.akka-stream-experimental_2.11-2.0.2.jar:na] at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.onNext(ActorGraphInterpreter.scala:147) ~[com.typesafe.akka.akka-stream-experimental_2.11-2.0.2.jar:na] at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:388) ~[com.typesafe.akka.akka-stream-experimental_2.11-2.0.2.jar:na] at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:549) ~[com.typesafe.akka.akka-stream-experimental_2.11-2.0.2.jar:na] at akka.actor.Actor$class.aroundReceive(Actor.scala:480) ~[com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:495) ~[com.typesafe.akka.akka-stream-experimental_2.11-2.0.2.jar:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at akka.actor.ActorCell.invoke_aroundBody0(ActorCell.scala:495) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at akka.actor.ActorCell$AjcClosure1.run(ActorCell.scala:1) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149) [org.aspectj.aspectjrt-1.8.6.jar:na] at akka.kamon.instrumentation.ActorCellInstrumentation$$anonfun$aroundBehaviourInvoke$1.apply(ActorCellInstrumentation.scala:63) [io.kamon.kamon-akka_2.11-0.5.2.jar:0.5.2] at kamon.trace.Tracer$.withContext(TracerModule.scala:53) [io.kamon.kamon-core_2.11-0.5.2.jar:0.5.2] at akka.kamon.instrumentation.ActorCellInstrumentation.aroundBehaviourInvoke(ActorCellInstrumentation.scala:62) [io.kamon.kamon-akka_2.11-0.5.2.jar:0.5.2] at akka.actor.ActorCell.invoke(ActorCell.scala:488) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:224) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:234) [com.typesafe.akka.akka-actor_2.11-2.4.1.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.11.7.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.11.7.jar:na] Caused by: java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$3@25fd8ebb rejected from java.util.concurrent.ThreadPoolExecutor@17bcf117[Terminated, pool size = 14, active threads = 0, queued tasks = 0, completed tasks = 972900] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_72-internal] at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[na:1.8.0_72-internal] at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[na:1.8.0_72-internal] at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136) ~[org.scala-lang.scala-library-2.11.7.jar:na] at slick.backend.DatabaseComponent$DatabaseDef$class.scheduleSynchronousStreaming(DatabaseComponent.scala:259) ~[com.typesafe.slick.slick_2.11-3.1.1.jar:na] at slick.jdbc.JdbcBackend$DatabaseDef.scheduleSynchronousStreaming(JdbcBackend.scala:38) ~[com.typesafe.slick.slick_2.11-3.1.1.jar:na] at slick.backend.DatabaseComponent$BasicStreamingActionContext.restartStreaming(DatabaseComponent.scala:449) ~[com.typesafe.slick.slick_2.11-3.1.1.jar:na] at slick.backend.DatabaseComponent$BasicStreamingActionContext.request(DatabaseComponent.scala:464) ~[com.typesafe.slick.slick_2.11-3.1.1.jar:na] at akka.stream.impl.ReactiveStreamsCompliance$.tryRequest(ReactiveStreamsCompliance.scala:110) ~[com.typesafe.akka.akka-stream-experimental_2.11-2.0.2.jar:na] ... 21 common frames omitted

@onsails
Copy link

onsails commented Sep 15, 2016

I have a bit similar problem.
Under load my app is getting java.util.concurrent.RejectedExecutionException with non-zero pool size but active threads and queued tasks are equal to 0.
Slick 3.1.1 with #1274 fix (https://github.com/kwark/slick/blob/3.1-deadlock/README.md)

@szeiger
Copy link
Member

szeiger commented Sep 16, 2016

The important bit is this:

java.util.concurrent.ThreadPoolExecutor@17bcf117[Terminated, pool size = 14, active threads = 0, queued tasks = 0, completed tasks = 972900]
                                                 ^^^^^^^^^^

The thread pool has been shut down asynchronously (presumably by calling database.close()). After shutting down, every action will be rejected, including stream continuations (which are otherwise guaranteed to be accepted). The only option for delivering this exception asynchronously would be to start a new thread specifically for this purpose, but this relies on all threads having already terminated when the thread is in the terminated state, otherwise we would again run afoul of the Reactive Streams spec. I'm not sure we can rely on this.

I think the best suggestion is: Don't shut the Database down while it's still being used.

@hvesalai hvesalai added this to the Future milestone Feb 28, 2018
@hvesalai hvesalai changed the title Reactive Streams standard violation - exception in request() Cleanly close down Reactive streams when the Database has been closed Feb 28, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants