Skip to content

Commit

Permalink
Implement streaming via Reactive Streams API:
Browse files Browse the repository at this point in the history
- Add a streaming type parameter to Action which is either NoStream
  or Streaming[+T] with an element type T.

- Return the proper streaming types for collection-valued queries.

- Remove `synchronized` blocks for synchronous action contexts.
  Enforcing a happens-before relationship through volatile reads and
  writes should be enough.

- Add dependencies for Reactive Streams to the build.

- Add TestNG and related dependencies required for running the Reactive
  Streams TCK to the build.

Tests in ActionTest.testStreaming and reactive-streams-tests.
  • Loading branch information
szeiger committed Nov 27, 2014
1 parent 6a85767 commit 9ad62a9
Show file tree
Hide file tree
Showing 26 changed files with 738 additions and 203 deletions.
File renamed without changes.
Expand Up @@ -8,6 +8,7 @@
<appender-ref ref="STDOUT" />
</root>
<logger name="scala.slick.backend.DatabaseComponent.action" level="${log.action:-info}" />
<logger name="scala.slick.backend.DatabaseComponent.stream" level="${log.stream:-info}" />
<logger name="scala.slick.compiler" level="${log.qcomp:-info}" />
<logger name="scala.slick.compiler.QueryCompiler" level="${log.qcomp.phases:-inherited}" />
<logger name="scala.slick.compiler.QueryCompilerBenchmark" level="${log.qcomp.bench:-inherited}" />
Expand Down
33 changes: 28 additions & 5 deletions project/Build.scala
Expand Up @@ -9,6 +9,7 @@ import com.typesafe.tools.mima.plugin.MimaKeys.{previousArtifact, binaryIssueFil
import com.typesafe.tools.mima.core.{ProblemFilters, MissingClassProblem}
import com.typesafe.sbt.osgi.SbtOsgi.{osgiSettings, OsgiKeys}
import com.typesafe.sbt.sdlc.Plugin._
import de.johoop.testngplugin.TestNGPlugin._

object SlickBuild extends Build {

Expand All @@ -22,13 +23,19 @@ object SlickBuild extends Build {
"junit" % "junit-dep" % "4.10",
"com.novocode" % "junit-interface" % "0.10-M4"
)
val testngExtras = Seq(
"com.google.inject" % "guice" % "2.0"
)
val slf4j = "org.slf4j" % "slf4j-api" % "1.6.4"
val logback = "ch.qos.logback" % "logback-classic" % "0.9.28"
val typesafeConfig = "com.typesafe" % "config" % "1.2.1"
val reactiveStreamsVersion = "1.0.0.M1"
val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion
val reactiveStreamsTCK = "org.reactivestreams" % "reactive-streams-tck" % reactiveStreamsVersion
val pools = Seq(
"com.zaxxer" % "HikariCP-java6" % "2.0.1"
)
val mainDependencies = Seq(slf4j, typesafeConfig) ++ pools.map(_ % "optional")
val mainDependencies = Seq(slf4j, typesafeConfig, reactiveStreams) ++ pools.map(_ % "optional")
val h2 = "com.h2database" % "h2" % "1.3.170"
val testDBs = Seq(
h2,
Expand Down Expand Up @@ -93,6 +100,7 @@ object SlickBuild extends Build {
organizationName := "Typesafe",
organization := "com.typesafe.slick",
resolvers += Resolver.sonatypeRepo("snapshots"),
//resolvers += Resolver.mavenLocal,
scalacOptions ++= List("-deprecation", "-feature"),
scalacOptions in (Compile, doc) <++= (version,sourceDirectory in Compile,name).map((v,src,n) => Seq(
"-doc-title", n,
Expand Down Expand Up @@ -154,13 +162,14 @@ object SlickBuild extends Build {
def testAll = Command.command("testAll")(runTasksSequentially(List(
test in (slickTestkitProject, Test),
test in (slickTestkitProject, DocTest),
test in (osgiTestProject, Test),
//test in (osgiTestProject, Test), // Temporarily disabled until we get Reactive Streams OSGi bundles
test in (reactiveStreamsTestProject, Test),
sdlc in slickProject,
sdlc in slickCodegenProject
)))

/* Project Definitions */
lazy val aRootProject = Project(id = "root", base = file("."),
lazy val aRootProject: Project = Project(id = "root", base = file("."),
settings = Project.defaultSettings ++ sharedSettings ++ extTarget("root", Some("target/root")) ++ Seq(
sourceDirectory := file("target/root-src"),
publishArtifact := false,
Expand Down Expand Up @@ -278,20 +287,34 @@ object SlickBuild extends Build {
scalacOptions in (Compile, doc) <++= version.map(v => Seq(
"-doc-source-url", "https://github.com/slick/slick/blob/"+v+"/slick-codegen/src/main€{FILE_PATH}.scala"
)),
unmanagedResourceDirectories in Test += (baseDirectory in aRootProject).value / "common-test-resources",
sdlcBase := s"http://slick.typesafe.com/doc/${version.value}/codegen-api/",
sdlcCheckDir := (target in (slickProject, com.typesafe.sbt.SbtSite.SiteKeys.makeSite)).value,
sdlc <<= sdlc dependsOn (doc in Compile, com.typesafe.sbt.SbtSite.SiteKeys.makeSite in slickProject)
)
) dependsOn(slickProject)

lazy val reactiveStreamsTestProject = Project(id = "reactive-streams-tests", base = file("reactive-streams-tests"),
settings = Project.defaultSettings ++ sharedSettings ++ testNGSettings ++ Seq(
name := "Slick-ReactiveStreamsTests",
unmanagedResourceDirectories in Test += (baseDirectory in aRootProject).value / "common-test-resources",
libraryDependencies ++=
(Dependencies.logback +: Dependencies.testDBs).map(_ % "test") ++:
Dependencies.reactiveStreamsTCK +:
Dependencies.testngExtras,
testNGSuites := Seq("reactive-streams-tests/src/test/resources/testng.xml")
)
) dependsOn(slickTestkitProject)

lazy val osgiBundleFiles = taskKey[Seq[File]]("osgi-bundles that our tests rely on using.")

lazy val osgiTestProject = (
Project(id = "osgitests", base = file("osgi-tests"))
settings(sharedSettings:_*)
settings(
name := "Slick-OsgiTests",
libraryDependencies ++= (Dependencies.h2 +: Dependencies.logback +: Dependencies.junit ++: Dependencies.paxExam).map(_ % "test"),
libraryDependencies ++= (Dependencies.h2 +: Dependencies.logback +: Dependencies.junit ++: Dependencies.reactiveStreams +: Dependencies.paxExam).map(_ % "test"),
unmanagedResourceDirectories in Test += (baseDirectory in aRootProject).value / "common-test-resources",
fork in Test := true,
testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v", "-s", "-a"),
javaOptions in Test ++= Seq(
Expand All @@ -301,7 +324,7 @@ object SlickBuild extends Build {
testGrouping <<= definedTests in Test map partitionTests,
osgiBundleFiles := Seq((OsgiKeys.bundle in slickProject).value),
osgiBundleFiles ++= (dependencyClasspath in Compile in slickProject).value.map(_.data).filterNot(_.isDirectory),
osgiBundleFiles ++= (dependencyClasspath in Test).value.map(_.data).filter(f => f.name.contains("logback-") || f.name.contains("h2")),
osgiBundleFiles ++= (dependencyClasspath in Test).value.map(_.data).filter(f => f.name.contains("logback-") || f.name.contains("h2") || f.name.contains("reactive-streams")),
publishArtifact := false
)
dependsOn(slickProject % "test")
Expand Down
2 changes: 2 additions & 0 deletions project/build.sbt
Expand Up @@ -7,3 +7,5 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
addSbtPlugin("com.typesafe.sbt" % "sbt-osgi" % "0.7.0")

addSbtPlugin("com.typesafe" % "sbt-sdlc" % "0.1")

addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.0.2")
Expand Up @@ -4,7 +4,9 @@
<pattern>*** \(%logger{30}\) %msg%n</pattern>
</encoder>
</appender>
<root level="${log.root:-warn}">
<root level="${log.root:-info}">
<appender-ref ref="STDOUT" />
</root>
<logger name="scala.slick.backend.DatabaseComponent.action" level="${log.action:-info}" />
<logger name="scala.slick.backend.DatabaseComponent.stream" level="${log.stream:-info}" />
</configuration>
14 changes: 14 additions & 0 deletions reactive-streams-tests/src/test/resources/testng.xml
@@ -0,0 +1,14 @@
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >

<suite name="ReactiveStreams" verbose="0" >
<test name="ReactiveStreams">
<classes>
<class name="scala.slick.test.stream.JdbcPublisherTest"/>
<class name="scala.slick.test.stream.HeapPublisherTest"/>
</classes>
</test>

<listeners>
<listener class-name="scala.slick.test.stream.TestNGConsoleListener" />
</listeners>
</suite>
@@ -0,0 +1,15 @@
package scala.slick.test.stream

import org.testng.annotations.{AfterClass, BeforeClass}

import scala.slick.memory.MemoryDriver

class HeapPublisherTest extends RelationalPublisherTest[MemoryDriver](MemoryDriver) {
import driver.api._

@BeforeClass def setUpDB: Unit =
db = Database()

@AfterClass def tearDownDB: Unit =
db.close()
}
@@ -0,0 +1,15 @@
package scala.slick.test.stream

import org.testng.annotations.{AfterClass, BeforeClass}

import scala.slick.driver.{H2Driver, JdbcProfile}

class JdbcPublisherTest extends RelationalPublisherTest[JdbcProfile](H2Driver) {
import driver.api._

@BeforeClass def setUpDB: Unit =
db = Database.forURL("jdbc:h2:mem:DatabasePublisherTest", driver = "org.h2.Driver")

@AfterClass def tearDownDB: Unit =
db.close()
}
@@ -0,0 +1,42 @@
package scala.slick.test.stream

import java.util.concurrent.atomic.AtomicInteger

import org.reactivestreams._
import org.reactivestreams.tck._
import org.testng.annotations.{AfterClass, BeforeClass}

import scala.slick.profile.RelationalProfile

abstract class RelationalPublisherTest[P <: RelationalProfile](val driver: P) extends PublisherVerification[Int](new TestEnvironment(300L), 1000L) {
import driver.api._

var db: Database = _
val entityNum = new AtomicInteger()

def createPublisher(elements: Long) = {
val tableName = "data_" + elements + "_" + entityNum.incrementAndGet()
class Data(tag: Tag) extends Table[Int](tag, tableName) {
def id = column[Int]("id")
def * = id
}
val data = TableQuery[Data]
val a = data.schema.create >> (data ++= Range.apply(0, elements.toInt)) >> data.sortBy(_.id).map(_.id).result
db.stream(a.withPinnedSession)
}

def createErrorStatePublisher = {
val p = createPublisher(0)
p.subscribe(new Subscriber[Int] {
def onSubscribe(s: Subscription): Unit = s.cancel
def onComplete(): Unit = ()
def onError(t: Throwable): Unit = ()
def onNext(t: Int): Unit = ()
})
p
}

override def maxElementsFromPublisher = 73L

override def boundedDepthOfOnNextAndRequestRecursion = 1
}
@@ -0,0 +1,38 @@
package scala.slick.test.stream

import org.testng.{ITestResult, TestListenerAdapter}

import scala.slick.util.GlobalConfig

class TestNGConsoleListener extends TestListenerAdapter {
val (normal, yellow, blue, cyan, red) =
if(GlobalConfig.ansiDump) ("\u001B[0m", "\u001B[33m", "\u001B[34m", "\u001B[36m", "\u001B[31m")
else ("", "", "", "", "")

override def onTestFailure(tr: ITestResult): Unit = {
printError(tr, tr.getThrowable, "failed", red)
}

override def onTestSuccess(tr: ITestResult): Unit = {
printError(tr, null, "succeeded", cyan)
}

override def onTestSkipped(tr: ITestResult): Unit = {
printError(tr, null, "skipped", blue)
}

def printError(tr: ITestResult, t: Throwable, msg: String, highlight: String): Unit = {
val cln = tr.getTestClass.getName
val sep = cln.lastIndexOf('.')
val cln2 = if(sep == -1) (yellow + cln) else cln.substring(0, sep+1) + yellow + cln.substring(sep+1)
val mn = tr.getMethod.getMethodName
val param = tr.getParameters.map(_.toString).mkString(",")
val param2 = if(param == "") "" else s"[$yellow$param$normal]"
print(s"Test $cln2$normal.$highlight$mn$normal$param2 $msg")
if(t eq null) println()
else {
print(": ")
t.printStackTrace(System.out)
}
}
}
1 change: 0 additions & 1 deletion slick-testkit/src/codegen/resources/logback-readme.txt

This file was deleted.

Expand Up @@ -68,4 +68,24 @@ class ActionTest extends AsyncTest[RelationalTestDB] {

aSetup andThen aNotPinned andThen aFused andThen aPinned
}

def testStreaming = {
class T(tag: Tag) extends Table[Int](tag, u"t") {
def a = column[Int]("a")
def * = a
}
val ts = TableQuery[T]
val q1 = ts.sortBy(_.a).map(_.a)

val p1 = db.stream {
ts.ddl.create >>
(ts ++= Seq(2, 3, 1, 5, 4)) >>
q1.result
}

for {
r1 <- materialize(p1)
_ = r1 shouldBe Vector(1, 2, 3, 4, 5)
} yield ()
}
}
Expand Up @@ -48,7 +48,7 @@ class TemplateTest extends AsyncTest[RelationalTestDB] {
val q5a = userNameByIDOrAll(Some(3))
val q5b = userNameByIDOrAll(None)

def asAction[E <: Effect, R](a: Action[E, R]): Action[E, R] = a
def asAction[E <: Effect, R](a: Action[E, R, NoStream]): Action[E, R, NoStream] = a

for {
_ <- (users.schema ++ orders.schema).create
Expand Down
@@ -1,8 +1,10 @@
package com.typesafe.slick.testkit.util

import org.reactivestreams.{Subscription, Subscriber, Publisher}

import scala.language.existentials

import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.{Promise, ExecutionContext, Await, Future}
import scala.reflect.ClassTag

import java.lang.reflect.Method
Expand Down Expand Up @@ -90,7 +92,7 @@ case class TestMethod(name: String, desc: Description, method: Method, cl: Class

case testObject: AsyncTest[_] =>
if(r == classOf[Future[_]]) await(method.invoke(testObject).asInstanceOf[Future[Any]])
else if(r == classOf[Action[_, _]]) await(testObject.db.run(method.invoke(testObject).asInstanceOf[Action[Effect, Any]]))
else if(r == classOf[Action[_, _, _]]) await(testObject.db.run(method.invoke(testObject).asInstanceOf[Action[Effect, Any, NoStream]]))
else throw new RuntimeException(s"Illegal return type: '${r.getName}' in test method '$name' -- AsyncTest methods must return Future or Action")
}
}
Expand Down Expand Up @@ -181,36 +183,49 @@ abstract class AsyncTest[TDB >: Null <: TestDB](implicit TdbClass: ClassTag[TDB]
protected implicit def asyncTestExecutionContext = ExecutionContext.global

/** Test Action: Get the current database session */
object GetSession extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, TDB#Driver#Backend#Session] {
object GetSession extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, TDB#Driver#Backend#Session, NoStream] {
def run(context: ActionContext[TDB#Driver#Backend]) = context.session
def getDumpInfo = DumpInfo(name = "<GetSession>")
}

/** Test Action: Check if the current database session is pinned */
object IsPinned extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, Boolean] {
object IsPinned extends SynchronousDatabaseAction[TDB#Driver#Backend, Effect, Boolean, NoStream] {
def run(context: ActionContext[TDB#Driver#Backend]) = context.isPinned
def getDumpInfo = DumpInfo(name = "<IsPinned>")
}

/** Test Action: Get the current transactionality level and autoCommit flag */
object GetTransactionality extends SynchronousDatabaseAction[JdbcBackend, Effect, (Int, Boolean)] {
object GetTransactionality extends SynchronousDatabaseAction[JdbcBackend, Effect, (Int, Boolean), NoStream] {
def run(context: ActionContext[JdbcBackend]) =
context.session.asInstanceOf[JdbcBackend#BaseSession].getTransactionality
def getDumpInfo = DumpInfo(name = "<GetTransactionality>")
}

def ifCap[E <: Effect, R](caps: Capability*)(f: => Action[E, R]): Action[E, Unit] =
def ifCap[E <: Effect, R](caps: Capability*)(f: => Action[E, R, NoStream]): Action[E, Unit, NoStream] =
if(caps.forall(c => tdb.capabilities.contains(c))) f.andThen(Action.successful(())) else Action.successful(())
def ifNotCap[E <: Effect, R](caps: Capability*)(f: => Action[E, R]): Action[E, Unit] =
def ifNotCap[E <: Effect, R](caps: Capability*)(f: => Action[E, R, NoStream]): Action[E, Unit, NoStream] =
if(!caps.forall(c => tdb.capabilities.contains(c))) f.andThen(Action.successful(())) else Action.successful(())

def asAction[R](f: tdb.profile.Backend#Session => R): Action[Effect.BackendType[tdb.profile.Backend], R] =
new SynchronousDatabaseAction[tdb.profile.Backend, Effect, R] {
def asAction[R](f: tdb.profile.Backend#Session => R): Action[Effect.BackendType[tdb.profile.Backend], R, NoStream] =
new SynchronousDatabaseAction[tdb.profile.Backend, Effect, R, NoStream] {
def run(context: ActionContext[tdb.profile.Backend]): R = f(context.session)
def getDumpInfo = DumpInfo(name = "<asAction>")
}

def seq[E <: Effect](actions: Action[E, _]*): Action[E, Unit] = Action.seq[E](actions: _*)
def seq[E <: Effect](actions: Action[E, _, NoStream]*): Action[E, Unit, NoStream] = Action.seq[E](actions: _*)

/** Consume a Reactive Stream and materialize it as a Vector */
def materialize[T](p: Publisher[T]): Future[Vector[T]] = {
val builder = Vector.newBuilder[T]
val pr = Promise[Vector[T]]()
try p.subscribe(new Subscriber[T] {
def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
def onComplete(): Unit = pr.success(builder.synchronized(builder.result()))
def onError(t: Throwable): Unit = pr.failure(t)
def onNext(t: T): Unit = builder.synchronized(builder += t)
}) catch { case NonFatal(ex) => pr.failure(ex) }
pr.future
}

implicit class AssertionExtensionMethods[T](v: T) {
private[this] val cln = getClass.getName
Expand Down

0 comments on commit 9ad62a9

Please sign in to comment.