Skip to content
Permalink
Browse files

Add concurrency stress test for streaming API:

https://travis-ci.org/slick/slick/builds/44488287 shows a test failure
of createPublisher1MustProduceAStreamOfExactly1Element for JdbcBackend:

Publisher (...) produced no elements expected [true] but found [false]

In response to this failure I added a concurrency stress test in the
hope of being able to reproduce it. `StreamsStressTest` runs the same
scenario as the failed test, except many times over and in parallel. On
my MBP I ran into issues with H2 on JDK 8 with as little as 2 threads
and a few hundred iterations on each, which looks like a bug in either
H2 or the version of JDK 8 I have installed. Local tests with up to 16
threads and 10000 iterations on each thread ran successfully against
H2 on JDK 6 and 7, and against Derby on JDK 6, 7 and 8.
  • Loading branch information...
szeiger committed Dec 19, 2014
1 parent 80929a6 commit 4d4252800991e26c3a7ac5a14209ed99958545d4
@@ -241,6 +241,7 @@ object SlickBuild extends Build {
("postgresql" % "postgresql" % "9.1-901.jdbc4" % "test") +:
("mysql" % "mysql-connector-java" % "5.1.23" % "test") +:
Dependencies.junit ++:
Dependencies.reactiveStreamsTCK % "test" +:
(Dependencies.logback +: Dependencies.testDBs).map(_ % "test") ++:
(Dependencies.logback +: Dependencies.testDBs).map(_ % "codegen"),
// Run the Queryable tests (which need macros) on a forked JVM
@@ -0,0 +1,53 @@
package scala.slick.benchmark

import java.util.concurrent.atomic.AtomicInteger

import org.reactivestreams.tck.TestEnvironment

object StreamsStressTest extends App {
/*import scala.slick.driver.DerbyDriver.api._
val url = "jdbc:derby:memory:StreamsStressTest;create=true"
val driver = "org.apache.derby.jdbc.EmbeddedDriver"*/
import scala.slick.driver.H2Driver.api._
val url = "jdbc:h2:mem:StreamsStressTest"
val driver = "org.h2.Driver"

val repeats = 10000
val numThreads = 16

val env = new TestEnvironment(10000)
val entityNum = new AtomicInteger()
val db = Database.forURL(url, driver = driver, keepAliveConnection = true)
try {
val threads = 1.to(numThreads).toVector.map { i =>
new Thread(new Runnable {
def run(): Unit = {
for(j <- 1 to repeats) {
run1
if(j % 100 == 0) println(s"Thread $i: Stream $j successful")
}
}
})
}
threads.foreach(_.start())
threads.foreach(_.join())
println("All threads finished")
} finally db.close

def run1: Unit = {
val sub = env.newManualSubscriber(createPublisher(1L))
sub.requestNextElementOrEndOfStream("Timeout while waiting for next element from Publisher")
sub.requestEndOfStream()
}

def createPublisher(elements: Long) = {
val tableName = "data_" + elements + "_" + entityNum.incrementAndGet()
class Data(tag: Tag) extends Table[Int](tag, tableName) {
def id = column[Int]("id")
def * = id
}
val data = TableQuery[Data]
val a = data.schema.create >> (data ++= Range.apply(0, elements.toInt)) >> data.sortBy(_.id).map(_.id).result
db.stream(a.withPinnedSession)
}
}
@@ -92,8 +92,8 @@ trait JdbcBackend extends RelationalBackend {
}

/** Create a Database that uses the DriverManager to open new connections. */
def forURL(url:String, user:String = null, password:String = null, prop: Properties = null, driver:String = null, executor: AsyncExecutor = AsyncExecutor.default()): DatabaseDef =
forSource(new DriverJdbcDataSource(url, user, password, prop, driverName = driver), executor)
def forURL(url:String, user:String = null, password:String = null, prop: Properties = null, driver:String = null, executor: AsyncExecutor = AsyncExecutor.default(), keepAliveConnection: Boolean = false): DatabaseDef =
forSource(new DriverJdbcDataSource(url, user, password, prop, driverName = driver, keepAliveConnection = keepAliveConnection), executor)

/** Create a Database that uses the DriverManager to open new connections. */
def forURL(url:String, prop: Map[String, String]): Database = {
@@ -46,7 +46,7 @@ Using a JDBC URL
________________

You can provide a JDBC URL to
:api:`forURL <scala.slick.jdbc.JdbcBackend$DatabaseFactoryDef@forURL(String,String,String,Properties,String,AsyncExecutor):DatabaseDef>`.
:api:`forURL <scala.slick.jdbc.JdbcBackend$DatabaseFactoryDef@forURL(String,String,String,Properties,String,AsyncExecutor,Boolean):DatabaseDef>`.
(see your database's JDBC driver's documentation for the correct URL syntax).

.. includecode:: code/Connection.scala#forURL

0 comments on commit 4d42528

Please sign in to comment.
You can’t perform that action at this time.