Skip to content
Permalink
Browse files

Add basic transaction support for database actions.

- The `transactionally` combinator in JdbcProfile corresponds to
  `withTransaction` in the old API: It ensures that an Action is run
  inside a transaction but does not guarantee failures coming out of it
  to be atomic. In case of nested `transactionally` actions, only the
  outermost one controls a real database transactions.

- New Action combinators `filter`, `failed` and `asTry` work similarly
  to their Future counterparts.

- `transactionally`, `failed` and `asTry` can be fused into
  SynchronousDatabaseActions.

- Renamed Action.const to Action.successful and added Action.failed,
  similar to the API for the Future values produced by running these
  Actions.

- Fixed return types for JDBC insert Actions.

Test in TransactionTest (OldTransactionTest now contains the tests for
the old API which were previously located in TransactionTest).
  • Loading branch information...
szeiger committed Nov 15, 2014
1 parent 09590fd commit 8ef6cde487c00a1c7d4c520d6baf844ce698da43
@@ -47,6 +47,7 @@ testkit {
${testPackage}.SequenceTest
${testPackage}.TemplateTest
${testPackage}.TransactionTest
${testPackage}.OldTransactionTest
${testPackage}.UnionTest
]
}
@@ -0,0 +1,34 @@
package com.typesafe.slick.testkit.tests

import com.typesafe.slick.testkit.util.{JdbcTestDB, TestkitTest}
import org.junit.Assert._

class OldTransactionTest extends TestkitTest[JdbcTestDB] {
import tdb.profile.simple._

def test {

class T(tag: Tag) extends Table[Int](tag, "t") {
def a = column[Int]("a")
def * = a
}
val ts = TableQuery[T]

ts.ddl.create

implicitSession withTransaction {
ts.insert(42)
assertEquals(Some(42), ts.firstOption)
implicitSession.rollback()
}
assertEquals(None, ts.firstOption)

ts.insert(1)
implicitSession withTransaction {
ts.delete
assertEquals(None, ts.firstOption)
implicitSession.rollback()
}
assertEquals(Some(1), ts.firstOption)
}
}
@@ -1,34 +1,63 @@
package com.typesafe.slick.testkit.tests

import org.junit.Assert._
import com.typesafe.slick.testkit.util.{JdbcTestDB, TestkitTest}
import com.typesafe.slick.testkit.util.{AsyncTest, JdbcTestDB}

class TransactionTest extends TestkitTest[JdbcTestDB] {
import tdb.profile.simple._

def test {
class TransactionTest extends AsyncTest[JdbcTestDB] {
import tdb.profile.api._

def testTransactions = {
class T(tag: Tag) extends Table[Int](tag, "t") {
def a = column[Int]("a")
def a = column[Int]("a", O.PrimaryKey)
def * = a
}
val ts = TableQuery[T]

ts.ddl.create

implicitSession withTransaction {
ts.insert(42)
assertEquals(Some(42), ts.firstOption)
implicitSession.rollback()
}
assertEquals(None, ts.firstOption)
class ExpectedException extends RuntimeException

ts.insert(1)
implicitSession withTransaction {
ts.delete
assertEquals(None, ts.firstOption)
implicitSession.rollback()
ts.ddl.create andThen { // failed transaction
(for {
_ <- ts += 1
_ <- ts.result.map(_ shouldBe Seq(1))
_ <- GetTransactionality.map(_ shouldBe (1, false))
_ = throw new ExpectedException
} yield ()).transactionally.failed.map(_ should (_.isInstanceOf[ExpectedException]))
} andThen {
ts.result.map(_ shouldBe Nil) andThen
GetTransactionality.map(_ shouldBe (0, true))
} andThen { // successful transaction
(for {
_ <- ts += 2
_ <- ts.result.map(_ shouldBe Seq(2))
_ <- GetTransactionality.map(_ shouldBe (1, false))
} yield ()).transactionally
} andThen {
ts.result.map(_ shouldBe Seq(2))
} andThen { // nested successful transaction
(for {
_ <- ts += 3
_ <- ts.to[Set].result.map(_ shouldBe Set(2, 3))
_ <- GetTransactionality.map(_ shouldBe (2, false))
} yield ()).transactionally.transactionally
} andThen {
ts.to[Set].result.map(_ shouldBe Set(2, 3))
} andThen { // failed nested transaction
(for {
_ <- ts += 4
_ <- ts.to[Set].result.map(_ shouldBe Set(2, 3, 4))
_ <- GetTransactionality.map(_ shouldBe (2, false))
_ = throw new ExpectedException
} yield ()).transactionally.transactionally.failed.map(_ should (_.isInstanceOf[ExpectedException]))
} andThen { // fused successful transaction
(ts += 5).andThen(ts += 6).transactionally
} andThen {
ts.to[Set].result.map(_ shouldBe Set(2, 3, 5, 6)) andThen
GetTransactionality.map(_ shouldBe (0, true))
} andThen { // fused failed transaction
(ts += 7).andThen(ts += 6).transactionally.failed
} andThen {
ts.to[Set].result.map(_ shouldBe Set(2, 3, 5, 6)) andThen
GetTransactionality.map(_ shouldBe (0, true))
}
assertEquals(Some(1), ts.firstOption)
}
}
@@ -10,6 +10,7 @@ import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.slick.action._
import scala.slick.jdbc.JdbcBackend
import scala.slick.util.DumpInfo
import scala.util.control.NonFatal
import scala.slick.profile.{RelationalProfile, SqlProfile, Capability}
@@ -159,6 +160,8 @@ sealed abstract class GenericTest[TDB >: Null <: TestDB](implicit TdbClass: Clas
def shouldBe(o: Any): Unit = fixStack(Assert.assertEquals(o, v))

def shouldNotBe(o: Any): Unit = fixStack(Assert.assertNotSame(o, v))

def should(f: Any => Boolean): Unit = fixStack(Assert.assertTrue(f(v)))
}

implicit class StringContextExtensionMethods(s: StringContext) {
@@ -191,13 +194,22 @@ abstract class AsyncTest[TDB >: Null <: TestDB](implicit TdbClass: ClassTag[TDB]

protected implicit def asyncTestExecutionContext = ExecutionContext.global

/** Test Action: Get the current database session */
object GetSession extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, TDB#Driver#Backend#Session] {
def run(context: ActionContext[TDB#Driver#Backend]) = context.session
def getDumpInfo = DumpInfo(name = "GetSession")
def getDumpInfo = DumpInfo(name = "<GetSession>")
}

/** Test Action: Check if the current database session is pinned */
object IsPinned extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, Boolean] {
def run(context: ActionContext[TDB#Driver#Backend]) = context.isPinned
def getDumpInfo = DumpInfo(name = "IsPinned")
def getDumpInfo = DumpInfo(name = "<IsPinned>")
}

/** Test Action: Get the current transactionality level and autoCommit flag */
object GetTransactionality extends SynchronousDatabaseAction[JdbcBackend, Effect, (Int, Boolean)] {
def run(context: ActionContext[JdbcBackend]) =
context.session.asInstanceOf[JdbcBackend#BaseSession].getTransactionality
def getDumpInfo = DumpInfo(name = "<GetTransactionality>")
}
}
@@ -4,6 +4,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.slick.SlickException
import scala.slick.backend.DatabaseComponent
import scala.slick.util.{DumpInfo, Dumpable}
import scala.util.{Try, Failure, Success}
import scala.util.control.NonFatal

/** Abstract type for Actions. Allows separation of execution logic and resource usage
@@ -16,7 +17,7 @@ sealed trait Action[-E <: Effect, +R] extends Dumpable {
/** Transform the result of a successful execution of this action. If this action fails, the
* resulting action also fails. */
def map[R2](f: R => R2)(implicit executor: ExecutionContext): Action[E, R2] =
flatMap[E, R2](r => ConstantAction[R2](f(r)))
flatMap[E, R2](r => SuccessAction[R2](f(r)))

/** Use the result produced by the successful execution of this action to compute and then
* run the next action in sequence. The resulting action fails if either this action, the
@@ -45,29 +46,46 @@ sealed trait Action[-E <: Effect, +R] extends Dumpable {
final def >> [E2 <: Effect, R2](a: Action[E2, R2]): Action[E with E2, R2] =
andThen[E2, R2](a)

/** Filter the result of this Action with the given predicate If the predicate matches, the
* original result is returned, otherwise the resulting Action fails with a
* NoSuchElementException. */
final def filter(p: R => Boolean)(implicit executor: ExecutionContext): Action[E, R] =
withFilter(p)

def withFilter(p: R => Boolean)(implicit executor: ExecutionContext): Action[E, R] =
flatMap(v => if(p(v)) ConstantAction(v) else throw new SlickException("Action.withFilter failed"))
flatMap(v => if(p(v)) SuccessAction(v) else throw new NoSuchElementException("Action.withFilter failed"))

/** Return an Action which contains the Throwable with which this Action failed as its result.
* If this Action succeeded, the resulting Action fails with a NoSuchElementException. */
def failed: Action[E, Throwable] = FailedAction[E](this)

/** Convert a successful result `v` of this Action into a successful result `Success(v)` and a
* failure `t` into a successful result `Failure(t)` */
def asTry: Action[E, Try[R]] = AsTryAction[E, R](this)

/** Run this Action with a pinned database session. If this action is composed of multiple
* database actions, they will all use the same session, even when sequenced with non-database
* actions. For non-composite or non-database actions, this has no effect. */
def withPinnedSession: Action[E, R] =
(Action.Pin andThen this andFinally Action.Unpin).asInstanceOf[Action[E, R]]

/** Whether or not this is a control flow action. These actions are not logged by default. */
def isControlFlowAction = false

/** Get the equivalent non-fused Action if this Action has been fused, otherwise this
* Action is returned. */
def nonFusedEquivalentAction: Action[E, R] = this

/** Whether of not this action should be included in log output by default. */
def isLogged: Boolean = false
}

object Action {
/** Convert a [[scala.concurrent.Future]] to an [[Action]]. */
def from[R](f: Future[R]): Action[Effect, R] = FutureAction[R](f)

/** Lift a constant value to an [[Action]]. */
def const[R](v: R): Action[Effect, R] = ConstantAction[R](v)
def successful[R](v: R): Action[Effect, R] = SuccessAction[R](v)

/** Create an [[Action]] that always fails. */
def failed(t: Throwable): Action[Effect, Nothing] = FailureAction(t)

/** An Action that pins the current session */
private object Pin extends SynchronousDatabaseAction[DatabaseComponent, Effect, Unit] {
@@ -80,44 +98,65 @@ object Action {
def run(context: ActionContext[DatabaseComponent]): Unit = context.unpin
def getDumpInfo = DumpInfo(name = "SynchronousDatabaseAction.Unpin")
}

/** An ExecutionContext used internally for executing plumbing operations during Action
* composition. */
private[slick] object sameThreadExecutionContext extends ExecutionContext {
override def execute(runnable: Runnable): Unit = runnable.run()
override def reportFailure(t: Throwable): Unit = throw t
}
}

/** An Action that represents a database operation. Concrete implementations are backend-specific
* and therefore carry a `BackendType` effect. */
trait DatabaseAction[B <: DatabaseComponent, -E <: Effect, +R] extends Action[E with Effect.BackendType[B], R]
trait DatabaseAction[B <: DatabaseComponent, -E <: Effect, +R] extends Action[E with Effect.BackendType[B], R] {
override def isLogged = true
}

/** An Action that returns a constant value. */
case class ConstantAction[+R](value: R) extends Action[Effect, R] {
def getDumpInfo = DumpInfo("const", String.valueOf(value))
case class SuccessAction[+R](value: R) extends Action[Effect, R] {
def getDumpInfo = DumpInfo("success", String.valueOf(value))
}

/** An Action that fails. */
case class FailureAction(t: Throwable) extends Action[Effect, Nothing] {
def getDumpInfo = DumpInfo("failure", String.valueOf(t))
}

/** An asynchronous Action that returns the result of a Future. */
case class FutureAction[+R](f: Future[R]) extends Action[Effect, R] {
def getDumpInfo = DumpInfo("future", String.valueOf(f))
override def isLogged = true
}

/** An Action that represents a `flatMap` operation for sequencing in the Action monad. */
case class FlatMapAction[-E <: Effect, +R, P](base: Action[E, P], f: P => Action[E, R], executor: ExecutionContext) extends Action[E, R] {
def getDumpInfo = DumpInfo("flatMap", String.valueOf(f), children = Vector(("base", base)))
override def isControlFlowAction = true
}

/** An Action that represents an `andThen` operation for sequencing in the Action monad. */
case class AndThenAction[-E <: Effect, +R](a1: Action[E, _], a2: Action[E, R]) extends Action[E, R] {
def getDumpInfo = DumpInfo("andThen", children = Vector(("1", a1), ("2", a2)))
override def isControlFlowAction = true
}

/** An Action that represents a `zip` operation for sequencing in the Action monad. */
case class ZipAction[-E <: Effect, +R1, +R2](a1: Action[E, R1], a2: Action[E, R2]) extends Action[E, (R1, R2)] {
def getDumpInfo = DumpInfo("zip", children = Vector(("1", a1), ("2", a2)))
override def isControlFlowAction = true
}

/** An Action that represents an `andFinally` operation for sequencing in the Action monad. */
case class AndFinallyAction[-E <: Effect, +R](a1: Action[E, R], a2: Action[E, _]) extends Action[E, R] {
def getDumpInfo = DumpInfo("andFinally", children = Vector(("try", a1), ("finally", a2)))
override def isControlFlowAction = true
}

/** An Action that represents a `failed` operation. */
case class FailedAction[-E <: Effect](a: Action[E, _]) extends Action[E, Throwable] {
def getDumpInfo = DumpInfo("failed", children = Vector(("base", a)))
}

/** An Action that represents an `asTry` operation. */
case class AsTryAction[-E <: Effect, +R](a: Action[E, R]) extends Action[E, Try[R]] {
def getDumpInfo = DumpInfo("asTry")
}

/** The context object passed to database actions by the execution engine. */
@@ -147,8 +186,8 @@ trait ActionContext[+B <: DatabaseComponent] {
* type. `DatabaseComponent.DatabaseDef.run` supports this kind of action out of the box
* through `DatabaseComponent.DatabaseDef.runSynchronousDatabaseAction` so that `run` does not
* need to be extended if all primitive database actions can be expressed in this way. These
* actions also implement construction-time fusion for the @andThen, @andFinally, @zip and
* @withPinnedSession operations. */
* actions also implement construction-time fusion for the @andThen, @andFinally, @zip, @failed,
* @asTry and @withPinnedSession operations. */
trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R] extends DatabaseAction[B, E, R] { self =>
def run(context: ActionContext[B]): R

@@ -208,6 +247,31 @@ trait SynchronousDatabaseAction[B <: DatabaseComponent, -E <: Effect, +R] extend
}
override def nonFusedEquivalentAction = superWithPinnedSession
}

private[this] def superFailed: Action[E with Effect.BackendType[B], Throwable] = super.failed
override def failed: Action[E with Effect.BackendType[B], Throwable] = new SynchronousDatabaseAction.Fused[B, E, Throwable] {
def run(context: ActionContext[B]): Throwable = {
var ok = false
try {
self.run(context)
ok = true
throw new NoSuchElementException("Action.failed (fused) not completed with a Throwable")
} catch {
case NonFatal(ex) if !ok => ex
}
}
override def nonFusedEquivalentAction = superFailed
}

private[this] def superAsTry: Action[E with Effect.BackendType[B], Try[R]] = super.asTry
override def asTry: Action[E with Effect.BackendType[B], Try[R]] = new SynchronousDatabaseAction.Fused[B, E, Try[R]] {
def run(context: ActionContext[B]): Try[R] = {
try Success(self.run(context)) catch {
case NonFatal(ex) => Failure(ex)
}
}
override def nonFusedEquivalentAction = superAsTry
}
}

object SynchronousDatabaseAction {
Oops, something went wrong.

0 comments on commit 8ef6cde

Please sign in to comment.
You can’t perform that action at this time.