Skip to content
Permalink
Browse files

Refactoring of ActionContext:

Abstract over the whole Context type as part of the DatabaseComponent,
not just over the Session. This avoids some casts and paves the road for
the next iteration so that Session can be removed entirely.
  • Loading branch information
szeiger committed Jan 26, 2015
1 parent e29e83b commit 3315a3bcd187e504110a28f027532b3710b8e217
@@ -186,26 +186,26 @@ abstract class AsyncTest[TDB >: Null <: TestDB](implicit TdbClass: ClassTag[TDB]

/** Test Action: Get the current database session */
object GetSession extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, TDB#Driver#Backend#Session, NoStream] {
def run(context: ActionContext[TDB#Driver#Backend]) = context.session
def run(context: TDB#Driver#Backend#Context) = context.session
def getDumpInfo = DumpInfo(name = "<GetSession>")
}

/** Test Action: Check if the current database session is pinned */
object IsPinned extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, Boolean, NoStream] {
def run(context: ActionContext[TDB#Driver#Backend]) = context.isPinned
def run(context: TDB#Driver#Backend#Context) = context.isPinned
def getDumpInfo = DumpInfo(name = "<IsPinned>")
}

/** Test Action: Get the current transactionality level and autoCommit flag */
object GetTransactionality extends SynchronousDatabaseAction[JdbcBackend, Effect, (Int, Boolean), NoStream] {
def run(context: ActionContext[JdbcBackend]) =
def run(context: JdbcBackend#Context) =
context.session.asInstanceOf[JdbcBackend#BaseSession].getTransactionality
def getDumpInfo = DumpInfo(name = "<GetTransactionality>")
}

/** Test Action: Get the current statement parameters */
object GetStatementParameters extends SynchronousDatabaseAction[JdbcBackend, Effect, JdbcBackend.StatementParameters, NoStream] {
def run(context: ActionContext[JdbcBackend]) = {
def run(context: JdbcBackend#Context) = {
val s = context.session.asInstanceOf[JdbcBackend#Session]
JdbcBackend.StatementParameters(s.resultSetType, s.resultSetConcurrency, s.resultSetHoldability)
}
@@ -224,7 +224,7 @@ abstract class AsyncTest[TDB >: Null <: TestDB](implicit TdbClass: ClassTag[TDB]

def asAction[R](f: tdb.profile.Backend#Session => R): EffectfulAction[Effect, R, NoStream] =
new SynchronousDatabaseAction[tdb.profile.Backend, Effect, R, NoStream] {
def run(context: ActionContext[tdb.profile.Backend]): R = f(context.session)
def run(context: tdb.profile.Backend#Context): R = f(context.session)
def getDumpInfo = DumpInfo(name = "<asAction>")
}

@@ -1,5 +1,7 @@
package scala.slick.action

import org.reactivestreams.Subscription

import scala.language.higherKinds

import scala.collection.generic.CanBuildFrom
@@ -94,7 +96,9 @@ sealed trait EffectfulAction[-E <: Effect, +R, +S <: NoStream] extends Dumpable
def failed: EffectfulAction[E, Throwable, NoStream] = FailedAction[E](this)

/** Convert a successful result `v` of this Action into a successful result `Success(v)` and a
* failure `t` into a successful result `Failure(t)` */
* failure `t` into a successful result `Failure(t)`. This is the most generic combinator that
* can be used for error recovery. If possible, use [[andFinally]] or [[cleanUp]] instead,
* because those combinators, unlike `asTry`, support streaming. */
def asTry: EffectfulAction[E, Try[R], NoStream] = AsTryAction[E, R](this)

/** Run this Action with a pinned database session. If this action is composed of multiple
@@ -145,13 +149,13 @@ object Action {

/** An Action that pins the current session */
private[slick] object Pin extends SynchronousDatabaseAction[DatabaseComponent, Effect, Unit, NoStream] {
def run(context: ActionContext[DatabaseComponent]): Unit = context.pin
def run(context: DatabaseComponent#Context): Unit = context.pin
def getDumpInfo = DumpInfo(name = "SynchronousDatabaseAction.Pin")
}

/** An Action that unpins the current session */
private[slick] object Unpin extends SynchronousDatabaseAction[DatabaseComponent, Effect, Unit, NoStream] {
def run(context: ActionContext[DatabaseComponent]): Unit = context.unpin
def run(context: DatabaseComponent#Context): Unit = context.unpin
def getDumpInfo = DumpInfo(name = "SynchronousDatabaseAction.Unpin")
}

@@ -169,15 +173,15 @@ trait DatabaseAction[-E <: Effect, +R, +S <: NoStream] extends EffectfulAction[E
}

/** An Action that returns a constant value. */
case class SuccessAction[+R](value: R) extends SynchronousDatabaseAction[Nothing, Effect, R, NoStream] {
case class SuccessAction[+R](value: R) extends SynchronousDatabaseAction[DatabaseComponent, Effect, R, NoStream] {
def getDumpInfo = DumpInfo("success", String.valueOf(value))
def run(ctx: ActionContext[Nothing]): R = value
def run(ctx: DatabaseComponent#Context): R = value
}

/** An Action that fails. */
case class FailureAction(t: Throwable) extends SynchronousDatabaseAction[Nothing, Effect, Nothing, NoStream] {
case class FailureAction(t: Throwable) extends SynchronousDatabaseAction[DatabaseComponent, Effect, Nothing, NoStream] {
def getDumpInfo = DumpInfo("failure", String.valueOf(t))
def run(ctx: ActionContext[Nothing]): Nothing = throw t
def run(ctx: DatabaseComponent#Context): Nothing = throw t
}

/** An asynchronous Action that returns the result of a Future. */
@@ -222,8 +226,9 @@ case class NamedAction[-E <: Effect, +R, +S <: NoStream](a: EffectfulAction[E, R
override def isLogged = true
}

/** The context object passed to database actions by the execution engine. */
trait ActionContext[+B <: DatabaseComponent] {
/** The base trait for the context object passed to synchronous database actions by the execution
* engine. */
trait ActionContext {
private[this] var stickiness = 0

/** Check if the session is pinned. May only be called from a synchronous action context. */
@@ -239,19 +244,16 @@ trait ActionContext[+B <: DatabaseComponent] {

/** Unpin this session once. May only be called from a synchronous action context. */
final def unpin: Unit = stickiness -= 1

/** Get the current database session for this context. Inside a single action this value will
* never change. If the session has not been pinned, a subsequent action will see a different
* session (unless the actions have been fused). May only be called from a synchronous action
* context. */
def session: B#Session
}

/** An ActionContext with extra functionality required for streaming Actions. */
trait StreamingActionContext[+B <: DatabaseComponent] extends ActionContext[B] {
trait StreamingActionContext extends ActionContext {
/** Emit a single result of the stream. Any Exception thrown by this method should be passed on
* to the caller. */
def emit(v: Any)

/** Get the Subscription for this stream. */
def subscription: Subscription
}

/** A synchronous database action provides a function from an `ActionContext` to the result
@@ -264,14 +266,14 @@ trait StreamingActionContext[+B <: DatabaseComponent] extends ActionContext[B] {
* The execution engine ensures that an [[ActionContext]] is never used concurrently and that
* all state changes performed by one invocation of a SynchronousDatabaseAction are visible
* to the next invocation of the same or a different SynchronousDatabaseAction. */
trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <: NoStream] extends DatabaseAction[E, R, S] { self =>
trait SynchronousDatabaseAction[-B <: DatabaseComponent, -E <: Effect, +R, +S <: NoStream] extends DatabaseAction[E, R, S] { self =>
/** The type used by this action for the state of a suspended stream. A call to `emitStream`
* produces such a state which is then fed back into the next call. */
type StreamState >: Null <: AnyRef

/** Run this action synchronously and produce a result, or throw an Exception to indicate a
* failure. */
def run(context: ActionContext[B]): R
def run(context: B#Context): R

/** Run this action synchronously and emit results to the context. This methods may throw an
* Exception to indicate a failure.
@@ -281,13 +283,13 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <:
* a new stream should be produced.
* @return A stream state if there are potentially more results available, or null if the
* stream is finished. */
def emitStream(context: StreamingActionContext[B], limit: Long, state: StreamState): StreamState =
def emitStream(context: B#StreamingContext, limit: Long, state: StreamState): StreamState =
throw new SlickException("Internal error: Streaming is not supported by this Action")

/** Dispose of a `StreamState` when a streaming action is cancelled. Whenever `emitStream`
* returns `null` or throws an Exception, it needs to dispose of the state itself. This
* method will not be called in these cases. */
def cancelStream(context: StreamingActionContext[B], state: StreamState): Unit = ()
def cancelStream(context: B#StreamingContext, state: StreamState): Unit = ()

/** Whether or not this Action supports streaming results. An Action with a `Streaming` result
* type must either support streaming directly or have a [[nonFusedEquivalentAction]] which
@@ -297,7 +299,7 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <:
private[this] def superAndThen[E2 <: Effect, R2, S2 <: NoStream](a: EffectfulAction[E2, R2, S2]) = super.andThen[E2, R2, S2](a)
override def andThen[E2 <: Effect, R2, S2 <: NoStream](a: EffectfulAction[E2, R2, S2]): EffectfulAction[E with E2, R2, S2] = a match {
case a: SynchronousDatabaseAction[_, _, _, _] => new SynchronousDatabaseAction.Fused[B, E with E2, R2, S2] {
def run(context: ActionContext[B]): R2 = {
def run(context: B#Context): R2 = {
self.run(context)
a.asInstanceOf[SynchronousDatabaseAction[B, E2, R2, S2]].run(context)
}
@@ -309,7 +311,7 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <:
private[this] def superZip[E2 <: Effect, R2](a: EffectfulAction[E2, R2, NoStream]) = super.zip[E2, R2](a)
override def zip[E2 <: Effect, R2](a: EffectfulAction[E2, R2, NoStream]): EffectfulAction[E with E2, (R, R2), NoStream] = a match {
case a: SynchronousDatabaseAction[_, _, _, _] => new SynchronousDatabaseAction.Fused[B, E with E2, (R, R2), NoStream] {
def run(context: ActionContext[B]): (R, R2) = {
def run(context: B#Context): (R, R2) = {
val r1 = self.run(context)
val r2 = a.asInstanceOf[SynchronousDatabaseAction[B, E2, R2, NoStream]].run(context)
(r1, r2)
@@ -322,7 +324,7 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <:
private[this] def superAndFinally[E2 <: Effect](a: EffectfulAction[E2, _, NoStream]) = super.andFinally[E2](a)
override def andFinally[E2 <: Effect](a: EffectfulAction[E2, _, NoStream]): EffectfulAction[E with E2, R, S] = a match {
case a: SynchronousDatabaseAction[_, _, _, _] => new SynchronousDatabaseAction.Fused[B, E with E2, R, S] {
def run(context: ActionContext[B]): R = {
def run(context: B#Context): R = {
val res = try self.run(context) catch {
case NonFatal(ex) =>
try a.asInstanceOf[SynchronousDatabaseAction[B, E2, Any, NoStream]].run(context) catch ignoreFollowOnError
@@ -338,7 +340,7 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <:

private[this] def superWithPinnedSession = super.withPinnedSession
override def withPinnedSession: EffectfulAction[E, R, S] = new SynchronousDatabaseAction.Fused[B, E, R, S] {
def run(context: ActionContext[B]): R = {
def run(context: B#Context): R = {
context.pin
val res = try self.run(context) catch {
case NonFatal(ex) =>
@@ -353,7 +355,7 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <:

private[this] def superFailed: EffectfulAction[E, Throwable, NoStream] = super.failed
override def failed: EffectfulAction[E, Throwable, NoStream] = new SynchronousDatabaseAction.Fused[B, E, Throwable, NoStream] {
def run(context: ActionContext[B]): Throwable = {
def run(context: B#Context): Throwable = {
var ok = false
try {
self.run(context)
@@ -368,7 +370,7 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R, +S <:

private[this] def superAsTry: EffectfulAction[E, Try[R], NoStream] = super.asTry
override def asTry: EffectfulAction[E, Try[R], NoStream] = new SynchronousDatabaseAction.Fused[B, E, Try[R], NoStream] {
def run(context: ActionContext[B]): Try[R] = {
def run(context: B#Context): Try[R] = {
try Success(self.run(context)) catch {
case NonFatal(ex) => Failure(ex)
}

0 comments on commit 3315a3b

Please sign in to comment.
You can’t perform that action at this time.