Skip to content

Commit

Permalink
Merge pull request #1122 from slick/tmp/reactive-streams-rc4
Browse files Browse the repository at this point in the history
Upgrade to Reactive Streams 1.0.0.RC4:
  • Loading branch information
szeiger committed Apr 2, 2015
2 parents a95c6e3 + 1d77b9d commit d2ea6b3
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 58 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object SlickBuild extends Build {
val slf4j = "org.slf4j" % "slf4j-api" % "1.6.4"
val logback = "ch.qos.logback" % "logback-classic" % "0.9.28"
val typesafeConfig = "com.typesafe" % "config" % "1.2.1"
val reactiveStreamsVersion = "1.0.0.RC3"
val reactiveStreamsVersion = "1.0.0.RC4"
val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion
val reactiveStreamsTCK = "org.reactivestreams" % "reactive-streams-tck" % reactiveStreamsVersion
val pools = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,5 @@ import slick.memory.MemoryDriver
class HeapPublisherTest extends RelationalPublisherTest[MemoryDriver](MemoryDriver, 300L) {
import driver.api._

@BeforeClass def setUpDB: Unit =
db = Database(ExecutionContext.global)

@AfterClass def tearDownDB: Unit =
db.close()
def createDB = Database(ExecutionContext.global)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@ package slick.test.stream

import org.testng.annotations.{AfterClass, BeforeClass}

import slick.driver.{H2Driver, JdbcProfile}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import slick.driver.{H2Driver, JdbcProfile}
import scala.util.control.NonFatal

class JdbcPublisherTest extends RelationalPublisherTest[JdbcProfile](H2Driver, 1000L) {
import driver.api._

@BeforeClass def setUpDB: Unit = {
db = Database.forURL("jdbc:h2:mem:DatabasePublisherTest", driver = "org.h2.Driver")
//db = Database.forURL("jdbc:derby:memory:JdbcPublisherTest;create=true", driver = "org.apache.derby.jdbc.EmbeddedDriver")
def createDB = {
val db = Database.forURL("jdbc:h2:mem:DatabasePublisherTest", driver = "org.h2.Driver", keepAliveConnection = true)
// Wait until the database has been initialized and can process queries:
try { Await.result(db.run(sql"select 1".as[Int]), Duration.Inf) } catch { case NonFatal(ex) => }
db
}

@AfterClass def tearDownDB: Unit =
db.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,39 @@ import org.testng.annotations.{AfterClass, BeforeClass}

import slick.profile.RelationalProfile

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

abstract class RelationalPublisherTest[P <: RelationalProfile](val driver: P, timeout: Long) extends PublisherVerification[Int](new TestEnvironment(timeout), 1000L) {
import driver.api._

override def maxElementsFromPublisher = 73L
override def boundedDepthOfOnNextAndRequestRecursion = 1

class Data(tableName: String)(tag: Tag) extends Table[Int](tag, tableName) {
def id = column[Int]("id")
def * = id
}
lazy val data = TableQuery(new Data("data")(_))
lazy val dataErr = TableQuery(new Data("data_err")(_))

var db: Database = _
val entityNum = new AtomicInteger()

def createPublisher(elements: Long) = {
val tableName = "data_" + elements + "_" + entityNum.incrementAndGet()
class Data(tag: Tag) extends Table[Int](tag, tableName) {
def id = column[Int]("id")
def * = id
}
val data = TableQuery[Data]
val a = data.schema.create >> (data ++= Range.apply(0, elements.toInt)) >> data.sortBy(_.id).map(_.id).result
db.stream(a.withPinnedSession)
}
def createDB: Database

def createErrorStatePublisher = {
val p = createPublisher(0)
p.subscribe(new Subscriber[Int] {
def onSubscribe(s: Subscription): Unit = s.cancel
def onComplete(): Unit = ()
def onError(t: Throwable): Unit = ()
def onNext(t: Int): Unit = ()
})
p
@BeforeClass def setUpDB: Unit = {
db = createDB
Await.result(db.run(data.schema.create >> (data ++= (1 to maxElementsFromPublisher.toInt))), Duration.Inf)
}

override def maxElementsFromPublisher = 73L
@AfterClass def tearDownDB: Unit =
db.close()

override def boundedDepthOfOnNextAndRequestRecursion = 1
def createPublisher(elements: Long) =
db.stream(data.filter(_.id <= elements.toInt).sortBy(_.id).result)

def createFailedPublisher =
db.stream(dataErr.result)
}
42 changes: 18 additions & 24 deletions slick/src/main/scala/slick/backend/DatabaseComponent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,19 @@ trait DatabaseComponent { self =>
catch { case NonFatal(ex) => Future.failed(ex) }

/** Create a `Publisher` for Reactive Streams which, when subscribed to, will run the specified
* Action and return the result directly as a stream without buffering everything first. This
* method is only supported for streaming Actions.
* `DBIOAction` and return the result directly as a stream without buffering everything first.
* This method is only supported for streaming actions.
*
* The Publisher itself is just a stub that holds a reference to the Action and this Database.
* The Action does not actually start to run until the call to `onSubscribe` returns, after
* The Publisher itself is just a stub that holds a reference to the action and this Database.
* The action does not actually start to run until the call to `onSubscribe` returns, after
* which the Subscriber is responsible for reading the full response or cancelling the
* Subscription. The created Publisher will only serve a single Subscriber and cannot be
* reused (because multiple runs of an Action can produce different results, which is not
* allowed for a Publisher).
* Subscription. The created Publisher can be reused to serve a multiple Subscribers,
* each time triggering a new execution of the action.
*
* For the purpose of combinators such as `andFinally` which can run after a stream has been
* produced, consuming the stream is always considered to be successful, even when cancelled
* by the Subscriber. For example, there is no way for the Subscriber to cause a rollback when
* streaming the results of `someQuery.result.transactionally`.
* For the purpose of combinators such as `cleanup` which can run after a stream has been
* produced, cancellation of a stream by the Subscriber is not considered an error. For
* example, there is no way for the Subscriber to cause a rollback when streaming the
* results of `someQuery.result.transactionally`.
*
* When using a JDBC back-end, all `onNext` calls are done synchronously and the ResultSet row
* is not advanced before `onNext` returns. This allows the Subscriber to access LOB pointers
Expand All @@ -103,7 +102,6 @@ trait DatabaseComponent { self =>

/** Create a Reactive Streams `Publisher` using the given context factory. */
protected[this] def createPublisher[T](a: DBIOAction[_, Streaming[T], Nothing], createCtx: Subscriber[_ >: T] => StreamingContext): DatabasePublisher[T] = new DatabasePublisher[T] {
private[this] val used = new AtomicBoolean()
def subscribe(s: Subscriber[_ >: T]) = {
if(s eq null) throw new NullPointerException("Subscriber is null")
val ctx = createCtx(s)
Expand All @@ -114,16 +112,12 @@ trait DatabaseComponent { self =>
false
}
if(subscribed) {
if(used.getAndSet(true)) {
ctx.tryOnError(new IllegalStateException("Database Action Publisher may not be subscribed to more than once"))
} else {
try {
runInContext(a, ctx, true, true).onComplete {
case Success(_) => ctx.tryOnComplete
case Failure(t) => ctx.tryOnError(t)
}(DBIO.sameThreadExecutionContext)
} catch { case NonFatal(ex) => ctx.tryOnError(ex) }
}
try {
runInContext(a, ctx, true, true).onComplete {
case Success(_) => ctx.tryOnComplete
case Failure(t) => ctx.tryOnError(t)
}(DBIO.sameThreadExecutionContext)
} catch { case NonFatal(ex) => ctx.tryOnError(ex) }
}
}
}
Expand Down Expand Up @@ -301,8 +295,8 @@ trait DatabaseComponent { self =>
else streamLogger.debug(s"Sent ${str(realDemand)} elements, more available - Performing atomic state transition")
}
demand = ctx.delivered(demand)
realDemand = demand
} while ((state ne null) && demand > 0)
realDemand = if(demand < 0) demand - Long.MinValue else demand
} while ((state ne null) && realDemand > 0)
if(debug) {
if(state ne null) streamLogger.debug("Suspending streaming action with continuation (more data available)")
else streamLogger.debug("Finished streaming action")
Expand Down

0 comments on commit d2ea6b3

Please sign in to comment.