Permalink
Browse files

trampoline chained actions

  • Loading branch information...
derekjw committed Mar 10, 2017
1 parent fdf9ce8 commit 70dfd2f8805405a3c7d0d64133feb171ac0869da
@@ -99,13 +99,17 @@ class ActionTest extends AsyncTest[RelationalTestDB] {
val a3 = DBIO.sequence((1 to 20).toSeq.map(i => if((i/4)%2 == 0) LiteralColumn(i).result else DBIO.from(Future.successful(i))))
val a4 = DBIO.seq((1 to 50000).toSeq.map(i => DBIO.successful("a4")): _*)
val a5 = (1 to 50000).toSeq.map(i => DBIO.successful("a5")).reduceLeft(_ andThen _)
val a6 = DBIO.fold((1 to 50000).toSeq.map(i => LiteralColumn(i).result), 0)(_ + _)
val a7 = (1 to 10000).map(_ => DBIO.successful("a7")).reduceLeft((a, b) => a flatMap (_ => b) andThen b)

DBIO.seq(
a1.map(_ shouldBe (1 to 5000).toSeq),
a2.map(_ shouldBe (1 to 20).toSeq),
a3.map(_ shouldBe (1 to 20).toSeq),
a4.map(_ shouldBe (())),
a5.map(_ shouldBe "a5")
a5.map(_ shouldBe "a5"),
a6.map(_ shouldBe (1 to 50000).sum),
a7.map(_ shouldBe "a7")
)
} else DBIO.successful(())

@@ -138,53 +138,81 @@ trait BasicBackend { self =>
* completed with `null` or failed after streaming has finished. This
* method should not call any `Subscriber` method other than `onNext`. */
protected[this] def runInContext[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean): Future[R] = {
runInContextSafe(a, ctx, streaming, topLevel, stackLevel = 0)
}

// If recursion has reached the limit then run next batch in trampoline
private[this] def runInContextSafe[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean, stackLevel: Int): Future[R] = {
if (stackLevel < 100) {
runInContextInline(a, ctx, streaming, topLevel, stackLevel + 1)
} else {
val promise = Promise[R]
val runnable = new Runnable {
override def run() = {
try {
promise.completeWith(runInContextInline(a, ctx, streaming, topLevel, stackLevel = 1))
} catch {
case NonFatal(ex) => promise.failure(ex)
}
}
}
DBIO.sameThreadExecutionContext.execute(runnable)
promise.future
}
}

private[this] def runInContextInline[R](a: DBIOAction[R, NoStream, Nothing], ctx: Context, streaming: Boolean, topLevel: Boolean, stackLevel: Int): Future[R] = {
logAction(a, ctx)
a match {
case SuccessAction(v) => Future.successful(v)
case FailureAction(t) => Future.failed(t)
case FutureAction(f) => f
case FlatMapAction(base, f, ec) =>
runInContext(base, ctx, false, topLevel).flatMap(v => runInContext(f(v), ctx, streaming, false))(ctx.getEC(ec))
runInContextSafe(base, ctx, false, topLevel, stackLevel).flatMap(v => runInContext(f(v), ctx, streaming, false))(ctx.getEC(ec))
case AndThenAction(actions) =>
val last = actions.length - 1

def run(pos: Int, v: Any): Future[Any] = {
val f1 = runInContext(actions(pos), ctx, streaming && pos == last, topLevel && pos == 0)
val f1 = runInContextSafe(actions(pos), ctx, streaming && pos == last, topLevel && pos == 0, stackLevel)

if(pos == last) f1
else f1.flatMap(run(pos + 1, _))(DBIO.sameThreadExecutionContext)
}

run(0, null).asInstanceOf[Future[R]]
case sa @ SequenceAction(actions) =>
case sa@SequenceAction(actions) =>
val len = actions.length
val results = new AtomicReferenceArray[Any](len)

def run(pos: Int): Future[Any] = {
if(pos == len) Future.successful {
if (pos == len) Future.successful {
val b = sa.cbf()
var i = 0
while(i < len) {
while (i < len) {
b += results.get(i)
i += 1
}
b.result()
}
else runInContext(actions(pos), ctx, false, topLevel && pos == 0).flatMap { (v: Any) =>
else runInContextSafe(actions(pos), ctx, false, topLevel && pos == 0, stackLevel).flatMap { (v: Any) =>
results.set(pos, v)
run(pos + 1)
} (DBIO.sameThreadExecutionContext)
}(DBIO.sameThreadExecutionContext)
}

run(0).asInstanceOf[Future[R]]
case CleanUpAction(base, f, keepFailure, ec) =>
val p = Promise[R]()
runInContext(base, ctx, streaming, topLevel).onComplete { t1 =>
runInContextSafe(base, ctx, streaming, topLevel, stackLevel).onComplete { t1 =>
try {
val a2 = f(t1 match {
case Success(_) => None
case Failure(t) => Some(t)
})
runInContext(a2, ctx, false, false).onComplete { t2 =>
if(t2.isFailure && (t1.isSuccess || !keepFailure)) p.complete(t2.asInstanceOf[Failure[R]])
if (t2.isFailure && (t1.isSuccess || !keepFailure)) p.complete(t2.asInstanceOf[Failure[R]])
else p.complete(t1)
} (DBIO.sameThreadExecutionContext)
}(DBIO.sameThreadExecutionContext)
} catch {
case NonFatal(ex) =>
val e = t1 match {
@@ -196,20 +224,20 @@ trait BasicBackend { self =>
actionLogger.warn("Exception after promise completed", e)
}
}
} (ctx.getEC(ec))
}(ctx.getEC(ec))
p.future
case FailedAction(a) =>
runInContext(a, ctx, false, topLevel).failed.asInstanceOf[Future[R]]
runInContextSafe(a, ctx, false, topLevel, stackLevel).failed.asInstanceOf[Future[R]]
case AsTryAction(a) =>
val p = Promise[R]()
runInContext(a, ctx, false, topLevel).onComplete(v => p.success(v.asInstanceOf[R]))(DBIO.sameThreadExecutionContext)
runInContextSafe(a, ctx, false, topLevel, stackLevel).onComplete(v => p.success(v.asInstanceOf[R]))(DBIO.sameThreadExecutionContext)
p.future
case NamedAction(a, _) =>
runInContext(a, ctx, streaming, topLevel)
runInContextSafe(a, ctx, streaming, topLevel, stackLevel)
case a: SynchronousDatabaseAction[_, _, _, _] =>
if(streaming) {
if(a.supportsStreaming) streamSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect]], ctx.asInstanceOf[StreamingContext], !topLevel).asInstanceOf[Future[R]]
else runInContext(CleanUpAction(AndThenAction(Vector(DBIO.Pin, a.nonFusedEquivalentAction)), _ => DBIO.Unpin, true, DBIO.sameThreadExecutionContext), ctx, streaming, topLevel)
if (streaming) {
if (a.supportsStreaming) streamSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect]], ctx.asInstanceOf[StreamingContext], !topLevel).asInstanceOf[Future[R]]
else runInContextSafe(CleanUpAction(AndThenAction(Vector(DBIO.Pin, a.nonFusedEquivalentAction)), _ => DBIO.Unpin, true, DBIO.sameThreadExecutionContext), ctx, streaming, topLevel, stackLevel)
} else runSynchronousDatabaseAction(a.asInstanceOf[SynchronousDatabaseAction[R, NoStream, This, _]], ctx, !topLevel)
case a: DatabaseAction[_, _, _] =>
throw new SlickException(s"Unsupported database action $a for $this")

0 comments on commit 70dfd2f

Please sign in to comment.