Permalink
Browse files

Trampolining for chained DBIOActions

- Change `AndThenAction` to support more than two sub-actions. It still
  returns the last result. These new semantics allow both, `andThen`
  and `DBIO.seq`, operations to be represented by a common
  implementation and combined into longer non-nested chains when either
  the left or right side (or both) of an `andThen` is already an
  `AndThenAction`.

- Add a special `SynchronousDatabaseAction.FusedAndThenAction` class for
  fused AndThenActions so that existing chains can be extended on both
  sides with more operations the same way it is done for non-fused
  AndThenActions.

- Implement fusion of synchronous subgroups in `DBIO.seq`, similar to
  the existing implementation in `DBIO.sequence`.

- Add trampolining logic to `DBIO.sameThreadExecutionContext` to run
  all chained non-fused actions in constant stack space.

Fixes #1186. Tests in ActionTest.testDeepRecursion.
  • Loading branch information...
szeiger committed Sep 11, 2015
1 parent 423988d commit 007e287059c8148b28e1ce9039267fb1fc4bc043
@@ -1,6 +1,6 @@
package com.typesafe.slick.testkit.tests
import com.typesafe.slick.testkit.util.{RelationalTestDB, AsyncTest}
import com.typesafe.slick.testkit.util.{StandardTestDBs, RelationalTestDB, AsyncTest}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
@@ -94,15 +94,19 @@ class ActionTest extends AsyncTest[RelationalTestDB] {
} yield ()
}
def testDeepRecursion = {
def testDeepRecursion = if(tdb == StandardTestDBs.H2Disk) {
val a1 = DBIO.sequence((1 to 5000).toSeq.map(i => LiteralColumn(i).result))
val a2 = DBIO.sequence((1 to 20).toSeq.map(i => if(i%2 == 0) LiteralColumn(i).result else DBIO.from(Future.successful(i))))
val a3 = DBIO.sequence((1 to 20).toSeq.map(i => if((i/4)%2 == 0) LiteralColumn(i).result else DBIO.from(Future.successful(i))))
val a4 = DBIO.seq((1 to 50000).toSeq.map(i => DBIO.successful("a4")): _*)
val a5 = (1 to 50000).toSeq.map(i => DBIO.successful("a5")).reduceLeft(_ andThen _)
DBIO.seq(
a1.map(_ shouldBe (1 to 5000).toSeq),
a2.map(_ shouldBe (1 to 20).toSeq),
a3.map(_ shouldBe (1 to 20).toSeq)
a3.map(_ shouldBe (1 to 20).toSeq),
a4.map(_ shouldBe ()),
a5.map(_ shouldBe "a5")
)
}
} else DBIO.successful(())
}
@@ -144,8 +144,14 @@ trait DatabaseComponent { self =>
case FutureAction(f) => f
case FlatMapAction(base, f, ec) =>
runInContext(base, ctx, false, topLevel).flatMap(v => runInContext(f(v), ctx, streaming, false))(ctx.getEC(ec))
case AndThenAction(a1, a2) =>
runInContext(a1, ctx, false, topLevel).flatMap(_ => runInContext(a2, ctx, streaming, false))(DBIO.sameThreadExecutionContext)
case AndThenAction(actions) =>
val last = actions.length - 1
def run(pos: Int, v: Any): Future[Any] = {
val f1 = runInContext(actions(pos), ctx, streaming && pos == last, pos == 0)
if(pos == last) f1
else f1.flatMap(run(pos + 1, _))(DBIO.sameThreadExecutionContext)
}
run(0, null).asInstanceOf[Future[R]]
case sa @ SequenceAction(actions) =>
val len = actions.length
val results = new AtomicReferenceArray[Any](len)
@@ -197,7 +203,7 @@ trait DatabaseComponent { self =>
case a: SynchronousDatabaseAction[_, _, _, _] =>
if(streaming) {
if(a.supportsStreaming) streamSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect]], ctx.asInstanceOf[StreamingContext], !topLevel).asInstanceOf[Future[R]]
else runInContext(CleanUpAction(AndThenAction(DBIO.Pin, a.nonFusedEquivalentAction), _ => DBIO.Unpin, true, DBIO.sameThreadExecutionContext), ctx, streaming, topLevel)
else runInContext(CleanUpAction(AndThenAction(Vector(DBIO.Pin, a.nonFusedEquivalentAction)), _ => DBIO.Unpin, true, DBIO.sameThreadExecutionContext), ctx, streaming, topLevel)
} else runSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[R, NoStream, This, _]], ctx, !topLevel)
case a: DatabaseAction[_, _, _] =>
throw new SlickException(s"Unsupported database action $a for $this")
@@ -2,6 +2,7 @@ package slick.dbio
import org.reactivestreams.Subscription
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.language.higherKinds
@@ -50,8 +51,10 @@ sealed trait DBIOAction[+R, +S <: NoStream, -E <: Effect] extends Dumpable {
/** Run another action after this action, if it completed successfully, and return the result
* of the second action. If either of the two actions fails, the resulting action also fails. */
def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] =
AndThenAction[R2, S2, E with E2](this, a)
def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case AndThenAction(as2) => AndThenAction[R2, S2, E with E2](this +: as2)
case a => AndThenAction[R2, S2, E with E2](Vector(this, a))
}
/** Run another action after this action, if it completed successfully, and return the result
* of both actions. If either of the two actions fails, the resulting action also fails. */
@@ -130,25 +133,26 @@ object DBIOAction {
/** Create a [[DBIOAction]] that always fails. */
def failed(t: Throwable): DBIOAction[Nothing, NoStream, Effect] = FailureAction(t)
private[this] def groupBySynchronicity[R, E <: Effect](in: TraversableOnce[DBIOAction[R, NoStream, E]]): Vector[Vector[DBIOAction[R, NoStream, E]]] = {
var state = 0 // no current = 0, sync = 1, async = 2
var current: mutable.Builder[DBIOAction[R, NoStream, E], Vector[DBIOAction[R, NoStream, E]]] = null
val total = Vector.newBuilder[Vector[DBIOAction[R, NoStream, E]]]
(in: TraversableOnce[Any]).foreach { a =>
val msgState = if(a.isInstanceOf[SynchronousDatabaseAction[_, _, _, _]]) 1 else 2
if(msgState != state) {
if(state != 0) total += current.result()
current = Vector.newBuilder
state = msgState
}
current += a.asInstanceOf[DBIOAction[R, NoStream, E]]
}
if(state != 0) total += current.result()
total.result()
}
/** Transform a `TraversableOnce[ DBIO[R] ]` into a `DBIO[ TraversableOnce[R] ]`. */
def sequence[R, M[+_] <: TraversableOnce[_], E <: Effect](in: M[DBIOAction[R, NoStream, E]])(implicit cbf: CanBuildFrom[M[DBIOAction[R, NoStream, E]], R, M[R]]): DBIOAction[M[R], NoStream, E] = {
implicit val ec = DBIO.sameThreadExecutionContext
def groupBySynchronicity(in: M[DBIOAction[R, NoStream, E]]): Vector[Vector[DBIOAction[R, NoStream, E]]] = {
var state = 0 // no current = 0, sync = 1, async = 2
var current: mutable.Builder[DBIOAction[R, NoStream, E], Vector[DBIOAction[R, NoStream, E]]] = null
val total = Vector.newBuilder[Vector[DBIOAction[R, NoStream, E]]]
(in: TraversableOnce[Any]).foreach { a =>
val msgState = if(a.isInstanceOf[SynchronousDatabaseAction[_, _, _, _]]) 1 else 2
if(msgState != state) {
if(state != 0) total += current.result()
current = Vector.newBuilder
state = msgState
}
current += a.asInstanceOf[DBIOAction[R, NoStream, E]]
}
if(state != 0) total += current.result()
total.result()
}
def sequenceGroupAsM(g: Vector[DBIOAction[R, NoStream, E]]): DBIOAction[M[R], NoStream, E] = {
if(g.head.isInstanceOf[SynchronousDatabaseAction[_, _, _, _]]) { // fuse synchronous group
new SynchronousDatabaseAction.Fused[M[R], NoStream, DatabaseComponent, E] {
@@ -183,7 +187,7 @@ object DBIOAction {
} else SequenceAction[R, Seq[R], E](g)
}
}
val grouped = groupBySynchronicity(in)
val grouped = groupBySynchronicity[R, E](in.asInstanceOf[TraversableOnce[DBIOAction[R, NoStream, E]]])
grouped.length match {
case 0 => DBIO.successful(cbf().result())
case 1 => sequenceGroupAsM(grouped.head)
@@ -195,10 +199,34 @@ object DBIOAction {
}
/** A simpler version of `sequence` that takes a number of DBIOActions with any return type as
* varargs and returns a DBIOAction that performs the individual actions in sequence (using
* `andThen`), returning `()` in the end. */
def seq[E <: Effect](actions: DBIOAction[_, NoStream, E]*): DBIOAction[Unit, NoStream, E] =
(actions :+ SuccessAction(())).reduceLeft(_ andThen _).asInstanceOf[DBIOAction[Unit, NoStream, E]]
* varargs and returns a DBIOAction that performs the individual actions in sequence, returning
* `()` in the end. */
def seq[E <: Effect](actions: DBIOAction[_, NoStream, E]*): DBIOAction[Unit, NoStream, E] = {
def sequenceGroup(g: Vector[DBIOAction[Any, NoStream, E]], forceUnit: Boolean): DBIOAction[Any, NoStream, E] = {
if(g.length == 1 && !forceUnit) g.head
else if(g.head.isInstanceOf[SynchronousDatabaseAction[_, _, _, _]]) sequenceSync(g)
else if(forceUnit) AndThenAction[Any, NoStream, E](g :+ DBIO.successful(()))
else AndThenAction[Any, NoStream, E](g)
}
def sequenceSync(g: Vector[DBIOAction[Any, NoStream, E]]): DBIOAction[Unit, NoStream, E] = {
new SynchronousDatabaseAction.Fused[Unit, NoStream, DatabaseComponent, E] {
def run(context: DatabaseComponent#Context) = {
g.foreach(_.asInstanceOf[SynchronousDatabaseAction[Any, NoStream, DatabaseComponent, E]].run(context))
}
override def nonFusedEquivalentAction = AndThenAction[Unit, NoStream, E](g)
}
}
if(actions.isEmpty) DBIO.successful(()) else {
val grouped = groupBySynchronicity[Any, E](actions :+ DBIO.successful(()))
grouped.length match {
case 1 => sequenceGroup(grouped.head, true).asInstanceOf[DBIOAction[Unit, NoStream, E]]
case n =>
val last = grouped.length - 1
val as = grouped.iterator.zipWithIndex.map { case (g, i) => sequenceGroup(g, i == last) }.toVector
AndThenAction[Unit, NoStream, E](as)
}
}
}
/** Create a DBIOAction that runs some other actions in sequence and combines their results
* with the given function. */
@@ -220,7 +248,31 @@ object DBIOAction {
/** An ExecutionContext used internally for executing plumbing operations during DBIOAction
* composition. */
private[slick] object sameThreadExecutionContext extends ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run()
private[this] val trampoline = new ThreadLocal[List[Runnable]]
private[this] def runTrampoline(first: Runnable): Unit = {
trampoline.set(Nil)
try {
var err: Throwable = null
var r = first
while(r ne null) {
try r.run() catch { case t: Throwable => err = t }
trampoline.get() match {
case r2 :: rest =>
trampoline.set(rest)
r = r2
case _ => r = null
}
}
if(err ne null) throw err
} finally trampoline.set(null)
}
override def execute(runnable: Runnable): Unit = trampoline.get() match {
case null => runTrampoline(runnable)
case r => trampoline.set(runnable :: r)
}
override def reportFailure(t: Throwable): Unit = throw t
}
}
@@ -253,12 +305,18 @@ case class FlatMapAction[+R, +S <: NoStream, P, -E <: Effect](base: DBIOAction[P
def getDumpInfo = DumpInfo("flatMap", String.valueOf(f), children = Vector(("base", base)))
}
/** A DBIOAction that represents an `andThen` operation for sequencing in the DBIOAction monad. */
case class AndThenAction[+R, +S <: NoStream, -E <: Effect](a1: DBIOAction[_, NoStream, E], a2: DBIOAction[R, S, E]) extends DBIOAction[R, S, E] {
def getDumpInfo = DumpInfo("andThen", children = Vector(("1", a1), ("2", a2)))
/** A DBIOAction that represents a `seq` or `andThen` operation for sequencing in the DBIOAction
* monad. Unlike `SequenceAction` it only keeps the last result. */
case class AndThenAction[R, +S <: NoStream, -E <: Effect](as: IndexedSeq[DBIOAction[Any, NoStream, E]]) extends DBIOAction[R, S, E] {
def getDumpInfo = DumpInfo("andThen", children = as.zipWithIndex.map { case (a, i) => (String.valueOf(i+1), a) })
override def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case AndThenAction(as2) => AndThenAction[R2, S2, E with E2](as ++ as2)
case a => AndThenAction[R2, S2, E with E2](as :+ a)
}
}
/** A DBIOAction that represents a `sequence` operation for sequencing in the DBIOAction monad. */
/** A DBIOAction that represents a `sequence` or operation for sequencing in the DBIOAction monad. */
case class SequenceAction[R, +R2, -E <: Effect](as: IndexedSeq[DBIOAction[R, NoStream, E]])(implicit val cbf: CanBuild[R, R2]) extends DBIOAction[R2, NoStream, E] {
def getDumpInfo = DumpInfo("sequence", children = as.zipWithIndex.map { case (a, i) => (String.valueOf(i+1), a) })
}
@@ -354,16 +412,16 @@ trait SynchronousDatabaseAction[+R, +S <: NoStream, -B <: DatabaseComponent, -E
* supports streaming. This flag is not used if the Action has a `NoStream` result type. */
def supportsStreaming: Boolean = true
private[this] def superAndThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]) = super.andThen[R2, S2, E2](a)
override def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case a: SynchronousDatabaseAction[_, _, _, _] => new SynchronousDatabaseAction.Fused[R2, S2, B, E with E2] {
def run(context: B#Context): R2 = {
self.run(context)
a.asInstanceOf[SynchronousDatabaseAction[R2, S2, B, E2]].run(context)
}
override def nonFusedEquivalentAction: DBIOAction[R2, S2, E with E2] = superAndThen(a)
}
case a => superAndThen(a)
case a: SynchronousDatabaseAction.FusedAndThenAction[_, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, B, E with E2](
self.asInstanceOf[SynchronousDatabaseAction[Any, S2, B, E with E2]] +:
a.as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, B, E with E2]]])
case a: SynchronousDatabaseAction[_, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, B, E with E2](
Vector(self.asInstanceOf[SynchronousDatabaseAction[Any, S2, B, E with E2]],
a.asInstanceOf[SynchronousDatabaseAction[Any, S2, B, E with E2]]))
case a => super.andThen[R2, S2, E2](a)
}
private[this] def superZip[R2, E2 <: Effect](a: DBIOAction[R2, NoStream, E2]) = super.zip[R2, E2](a)
@@ -444,6 +502,26 @@ object SynchronousDatabaseAction {
override def supportsStreaming: Boolean = false
}
class FusedAndThenAction[+R, +S <: NoStream, B <: DatabaseComponent, -E <: Effect](val as: IndexedSeq[SynchronousDatabaseAction[Any, S, B, E]]) extends Fused[R, S, B, E] {
def run(context: B#Context): R = {
var res: Any = null
as.foreach(a => res = a.run(context))
res.asInstanceOf[R]
}
override def nonFusedEquivalentAction: DBIOAction[R, S, E] = AndThenAction[R, S, E](as)
override def andThen[R2, S2 <: NoStream, E2 <: Effect](a: DBIOAction[R2, S2, E2]): DBIOAction[R2, S2, E with E2] = a match {
case a: SynchronousDatabaseAction.FusedAndThenAction[_, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, B, E with E2](
as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, B, E with E2]]] ++
a.as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, B, E with E2]]])
case a: SynchronousDatabaseAction[_, _, _, _] =>
new SynchronousDatabaseAction.FusedAndThenAction[R2, S2, B, E with E2](
as.asInstanceOf[IndexedSeq[SynchronousDatabaseAction[Any, S2, B, E with E2]]] :+
a.asInstanceOf[SynchronousDatabaseAction[Any, S2, B, E with E2]])
case a => super.andThen(a)
}
}
/** Fuse `flatMap` / `map`, `cleanUp` and `filter` / `withFilter` combinators if they use
* `DBIO.sameThreadExecutionContext` and produce a `SynchronousDatabaseAction` in their
* evaluation function (where applicable). This cannot be verified at fusion time, so a wrongly

0 comments on commit 007e287

Please sign in to comment.