Skip to content
Permalink
Browse files

Upgrade to Reactive Streams 1.0.0.RC3

- Re-enable transitive dependencies for the TCK

- Remove demand overflow detection. It required additional atomic
  transitions and is no longer required by the spec.
  • Loading branch information
szeiger committed Feb 17, 2015
1 parent 838af0d commit 12faf5df07a897ac1a59037bd66ee8416e175455
Showing with 4 additions and 17 deletions.
  1. +3 −3 project/Build.scala
  2. +1 −14 slick/src/main/scala/slick/backend/DatabaseComponent.scala
@@ -29,7 +29,7 @@ object SlickBuild extends Build {
val slf4j = "org.slf4j" % "slf4j-api" % "1.6.4"
val logback = "ch.qos.logback" % "logback-classic" % "0.9.28"
val typesafeConfig = "com.typesafe" % "config" % "1.2.1"
val reactiveStreamsVersion = "1.0.0.RC2"
val reactiveStreamsVersion = "1.0.0.RC3"
val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion
val reactiveStreamsTCK = "org.reactivestreams" % "reactive-streams-tck" % reactiveStreamsVersion
val pools = Seq(
@@ -242,7 +242,7 @@ object SlickBuild extends Build {
//scalacOptions in Compile += "-Yreify-copypaste",
libraryDependencies ++=
Dependencies.junit ++:
(Dependencies.reactiveStreamsTCK % "test").intransitive() +:
(Dependencies.reactiveStreamsTCK % "test") +:
(Dependencies.logback +: Dependencies.testDBs).map(_ % "test") ++:
(Dependencies.logback +: Dependencies.testDBs).map(_ % "codegen"),
// Run the Queryable tests (which need macros) on a forked JVM
@@ -318,7 +318,7 @@ object SlickBuild extends Build {
unmanagedResourceDirectories in Test += (baseDirectory in aRootProject).value / "common-test-resources",
libraryDependencies ++=
(Dependencies.logback +: Dependencies.testDBs).map(_ % "test") ++:
Dependencies.reactiveStreamsTCK.intransitive() +:
Dependencies.reactiveStreamsTCK +:
Dependencies.testngExtras,
testNGSuites := Seq("reactive-streams-tests/src/test/resources/testng.xml")
)
@@ -432,11 +432,6 @@ trait DatabaseComponent { self =>
* the actual demand at that point. It is reset to 0 when the initial streaming ends. */
private[this] val remaining = new AtomicLong(Long.MinValue)

/** The number of remaining elements that are not in the current batch. Unlike `remaining`,
* which is decremented at the end of the batch, this is decremented at the beginning. It is
* only used for overflow detection according to Reactive Streams spec, 3.17. */
private[this] val remainingNotInBatch = new AtomicLong(0L)

/** An error that will be signaled to the Subscriber when the stream is cancelled or
* terminated. This is used for signaling demand overflow in `request()` while guaranteeing
* that the `onError` message does not overlap with an active `onNext` call. */
@@ -463,12 +458,7 @@ trait DatabaseComponent { self =>
* the current batch. When this value is negative, the initial streaming action is still
* running and the real demand can be computed by subtracting `Long.MinValue` from the
* returned value. */
def demandBatch: Long = {
val demand = remaining.get()
val realDemand = if(demand < 0L) demand - Long.MinValue else demand
remainingNotInBatch.addAndGet(-realDemand)
demand
}
def demandBatch: Long = remaining.get()

/** Whether the stream has been cancelled by the Subscriber */
def cancelled: Boolean = cancelRequested
@@ -517,9 +507,6 @@ trait DatabaseComponent { self =>
if(l <= 0) {
deferredError = new IllegalArgumentException("Requested count must not be <= 0 (see Reactive Streams spec, 3.9)")
cancel
} else if(remainingNotInBatch.addAndGet(l) < 0) {
deferredError = new IllegalStateException("Pending element count must not exceed 2^63-1 (see Reactive Streams spec, 3.17)")
cancel
} else {
if(!cancelRequested && remaining.getAndAdd(l) == 0L) restartStreaming
}

0 comments on commit 12faf5d

Please sign in to comment.
You can’t perform that action at this time.