Permalink
Browse files

fixes issue 1274

1 parent 39d53ac commit 84dc188ec460524cc8b81fdd730347f3c011815c @kwark kwark committed Mar 15, 2016
@@ -76,3 +76,11 @@ distrib2 {
keepAliveConnection = true
}
}
+
+h2mem1 = {
+ url = "jdbc:h2:mem:test1"
+ driver = org.h2.Driver
+ connectionPool = "HikariCP"
+ numThreads = 1
+ keepAliveConnection = true
+}
@@ -1,16 +1,25 @@
package slick.jdbc.hikaricp
-import java.sql.{Driver, Connection}
+import java.sql.{Connection, Driver}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.ReentrantLock
+
import com.typesafe.config.Config
import slick.SlickException
-import slick.jdbc.{JdbcDataSourceFactory, JdbcDataSource}
+import slick.jdbc.{JdbcDataSource, JdbcDataSourceFactory}
import slick.util.ConfigExtensionMethods._
+import slick.util.Logging
/** A JdbcDataSource for a HikariCP connection pool.
* See `slick.jdbc.JdbcBackend#Database.forConfig` for documentation on the config parameters. */
-class HikariCPJdbcDataSource(val ds: com.zaxxer.hikari.HikariDataSource, val hconf: com.zaxxer.hikari.HikariConfig) extends JdbcDataSource {
+class HikariCPJdbcDataSource(val ds: com.zaxxer.hikari.HikariDataSource, val hconf: com.zaxxer.hikari.HikariConfig)
+ extends JdbcDataSource {
+
def createConnection(): Connection = ds.getConnection()
+
def close(): Unit = ds.close()
+
+ override val maxConnections: Option[Int] = Some(ds.getMaximumPoolSize)
}
object HikariCPJdbcDataSource extends JdbcDataSourceFactory {
@@ -11,7 +11,7 @@ class DelegateConnection(conn: Connection) extends Connection {
def setAutoCommit(autoCommit: Boolean): Unit = conn.setAutoCommit(autoCommit)
def setHoldability(holdability: Int): Unit = conn.setHoldability(holdability)
def clearWarnings(): Unit = conn.clearWarnings()
- def getNetworkTimeout: Int = ??? //conn.getNetworkTimeout
+ def getNetworkTimeout: Int = conn.getNetworkTimeout
def createBlob(): Blob = conn.createBlob()
def createSQLXML(): SQLXML = conn.createSQLXML()
def setSavepoint(): Savepoint = conn.setSavepoint()
@@ -20,8 +20,8 @@ class DelegateConnection(conn: Connection) extends Connection {
def getTransactionIsolation: Int = conn.getTransactionIsolation
def getClientInfo(name: String): String = conn.getClientInfo(name)
def getClientInfo: Properties = conn.getClientInfo
- def getSchema: String = ??? //conn.getSchema
- def setNetworkTimeout(executor: Executor, milliseconds: Int): Unit = ??? //conn.setNetworkTimeout(executor, milliseconds)
+ def getSchema: String = conn.getSchema
+ def setNetworkTimeout(executor: Executor, milliseconds: Int): Unit = conn.setNetworkTimeout(executor, milliseconds)
def getMetaData: DatabaseMetaData = conn.getMetaData
def getTypeMap: util.Map[String, Class[_]] = conn.getTypeMap
def rollback(): Unit = conn.rollback()
@@ -45,7 +45,7 @@ class DelegateConnection(conn: Connection) extends Connection {
def setCatalog(catalog: String): Unit = conn.setCatalog(catalog)
def close(): Unit = conn.close()
def getAutoCommit: Boolean = conn.getAutoCommit
- def abort(executor: Executor): Unit = ??? //conn.abort(executor)
+ def abort(executor: Executor): Unit = conn.abort(executor)
def isValid(timeout: Int): Boolean = conn.isValid(timeout)
def prepareStatement(sql: String): PreparedStatement = conn.prepareStatement(sql)
def prepareStatement(sql: String, resultSetType: Int, resultSetConcurrency: Int): PreparedStatement = conn.prepareStatement(sql, resultSetType, resultSetConcurrency)
@@ -57,7 +57,7 @@ class DelegateConnection(conn: Connection) extends Connection {
def isClosed: Boolean = conn.isClosed
def createStruct(typeName: String, attributes: Array[AnyRef]): Struct = conn.createStruct(typeName, attributes)
def getWarnings: SQLWarning = conn.getWarnings
- def setSchema(schema: String): Unit = ??? //conn.setSchema(schema)
+ def setSchema(schema: String): Unit = conn.setSchema(schema)
def commit(): Unit = conn.commit()
def unwrap[T](iface: Class[T]): T = conn.unwrap[T](iface)
def isWrapperFor(iface: Class[_]): Boolean = conn.isWrapperFor(iface)
@@ -1,13 +1,16 @@
package slick.test.jdbc
-import java.sql.{SQLException, DriverPropertyInfo, Connection, Driver}
+import java.io.PrintWriter
+import java.sql.{Connection, Driver, DriverPropertyInfo, SQLException}
import java.util.Properties
import java.util.logging.Logger
+import javax.sql.DataSource
+import com.typesafe.config.ConfigFactory
import org.junit.Test
import org.junit.Assert._
import slick.basic.DatabaseConfig
-import slick.jdbc.{JdbcProfile, JdbcBackend}
+import slick.jdbc.{JdbcBackend, JdbcProfile}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -34,6 +37,7 @@ class DataSourceTest {
MockDriver.reset
val db = JdbcBackend.Database.forConfig("databaseUrl")
try {
+ assertEquals(Some(100), db.source.maxConnections)
try Await.result(db.run(sqlu"dummy"), Duration.Inf) catch { case ex: SQLException => }
val (url, info) = MockDriver.getLast.getOrElse(fail("No connection data recorded").asInstanceOf[Nothing])
assertEquals("jdbc:postgresql://host/dbname", url)
@@ -42,6 +46,54 @@ class DataSourceTest {
assertEquals("bar", info.getProperty("foo"))
} finally db.close
}
+
+ @Test def testMaxConnections: Unit = {
+ MockDriver.reset
+ val db = JdbcBackend.Database.forConfig("databaseUrl", ConfigFactory.parseString(
+ """
+ |databaseUrl {
+ | dataSourceClass = "slick.jdbc.DatabaseUrlDataSource"
+ | maxConnections = 20
+ | url = "postgres://user:pass@host/dbname"
+ |}
+ |""".stripMargin))
+ try {
+ assertEquals("maxConnections should be respected", Some(20), db.source.maxConnections)
+ } finally db.close
+ }
+
+ @Test def testMaxConnectionsNumThreads: Unit = {
+ MockDriver.reset
+ val db = JdbcBackend.Database.forConfig("databaseUrl", ConfigFactory.parseString(
+ """
+ |databaseUrl {
+ | dataSourceClass = "slick.jdbc.DatabaseUrlDataSource"
+ | numThreads = 10
+ | url = "postgres://user:pass@host/dbname"
+ |}
+ |""".stripMargin
+ ))
+ try {
+ assertEquals("maxConnections should be numThreads * 5", Some(50), db.source.maxConnections)
+ } finally db.close
+ }
+
+ @Test def testConnectionPoolDisabled: Unit = {
+ MockDriver.reset
+ val db = JdbcBackend.Database.forConfig("databaseUrl", ConfigFactory.parseString(
+ """
+ |databaseUrl {
+ | dataSourceClass = "slick.jdbc.DatabaseUrlDataSource"
+ | connectionPool = "disabled"
+ | url = "postgres://user:pass@host/dbname"
+ |}
+ |
+ """.stripMargin))
+ try {
+ assertEquals("maxConnections should be None when not using a pool", None, db.source.maxConnections)
+ } finally db.close
+ }
+
}
object MockDriver {
@@ -62,3 +114,4 @@ class MockDriver extends Driver {
}
def getMajorVersion: Int = 0
}
+
@@ -0,0 +1,86 @@
+package slick.test.jdbc.hikaricp
+
+import java.sql.Blob
+import java.util.concurrent.TimeUnit
+import javax.sql.rowset.serial.SerialBlob
+
+import com.typesafe.slick.testkit.util.{AsyncTest, JdbcTestDB}
+import org.junit.{After, Before, Ignore, Test}
+import slick.jdbc.H2Profile.api._
+import slick.lifted.{ProvenShape, TableQuery}
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
+
+class SlickDeadlockTest extends AsyncTest[JdbcTestDB] {
+
+ class TestTable(tag: Tag) extends Table[(Int)](tag, "SDL") {
+
+ def id: Rep[Int] = column[Int]("ID")
+ def * : ProvenShape[(Int)] = id
+
+ }
+
+ class BlobTable(tag: Tag) extends Table[(Int, Blob)](tag, "SDLIO") {
+ def id = column[Int]("id")
+ def data = column[Blob]("data")
+ def * = (id, data)
+ }
+
+
+ var database: Database = _
+ val testTable: TableQuery[TestTable] = TableQuery[TestTable]
+ val blobTable: TableQuery[BlobTable] = TableQuery[BlobTable]
+
+ @Before
+ def openDatabase() = {
+ database = Database.forConfig("h2mem1")
+ Await.result(database.run((testTable.schema ++ blobTable.schema).create), 2.seconds)
+ }
+
+ @After
+ def closeDatabase() = {
+ Await.result(database.run((testTable.schema ++ blobTable.schema).drop), 2.seconds)
+ database.close()
+ }
+
+
+ @Test def slickDoesNotDeadlock() {
+
+ val tasks = 1 to 100 map { i =>
+ val action = { testTable += i }
+ .flatMap { _ => testTable.length.result }
+ .flatMap { _ => DBIO.successful(s"inserted value $i") }
+
+ database.run(action.transactionally)
+ }
+ Await.result(Future.sequence(tasks), Duration(10, TimeUnit.SECONDS))
+ }
+
+ @Test def slickDoesNotDeadlockWithSleeps(): Unit = {
+ val tasks = 1 to 50 map { c =>
+ val action = sql"select $c".as[Int].head.map { i => Thread.sleep(if(c == 1) 100 else 200); i }
+
+ database.run(action.transactionally)
+ }
+ Await.result(Future.sequence(tasks), Duration(10, TimeUnit.SECONDS))
+
+ }
+
+ @Test def slickDoesNotDeadlockWithIo() {
+
+ Await.result(database.run((
+ (blobTable += (1, new SerialBlob(Array[Byte](1,2,3)))) >>
+ (blobTable += (2, new SerialBlob(Array[Byte](4,5)))) >>
+ blobTable.result
+ ).transactionally), Duration(2, TimeUnit.SECONDS))
+
+ val tasks = 1 to 100 map { i =>
+ materializeAsync[(Int, Blob), (Int, String)](database.stream(blobTable.result.transactionally, bufferNext = false),
+ { case (id, data) => database.io((id, data.getBytes(1, data.length.toInt).mkString)) })
+ }
+
+ Await.result(Future.sequence(tasks), Duration(10, TimeUnit.SECONDS))
+ }
+
+}
@@ -1,5 +1,7 @@
package slick.basic
+import slick.util.AsyncExecutor.{Priority, Continuation, Fresh, WithConnection}
+
import scala.language.existentials
import java.io.Closeable
@@ -43,6 +45,7 @@ trait BasicBackend { self =>
/** Create a Database instance through [[https://github.com/typesafehub/config Typesafe Config]].
* The supported config keys are backend-specific. This method is used by `DatabaseConfig`.
+ *
* @param path The path in the configuration file for the database configuration, or an empty
* string for the top level of the `Config` object.
* @param config The `Config` object to read from.
@@ -225,10 +228,10 @@ trait BasicBackend { self =>
}
/** Run a `SynchronousDatabaseAction` on this database. */
- protected[this] def runSynchronousDatabaseAction[R](a: SynchronousDatabaseAction[R, NoStream, This, _], ctx: Context, highPrio: Boolean): Future[R] = {
+ protected[this] def runSynchronousDatabaseAction[R](a: SynchronousDatabaseAction[R, NoStream, This, _], ctx: Context, continuation: Boolean): Future[R] = {
val promise = Promise[R]()
ctx.getEC(synchronousExecutionContext).prepare.execute(new AsyncExecutor.PrioritizedRunnable {
- def highPriority = highPrio
+ def priority = ctx.priority(continuation)
def run: Unit =
try {
ctx.readSync
@@ -240,26 +243,29 @@ trait BasicBackend { self =>
}
releaseSession(ctx, false)
res
- } finally { ctx.sync = 0 }
+ } finally {
+ if (!ctx.isPinned) connectionReleased = true
+ ctx.sync = 0
+ }
promise.success(res)
} catch { case NonFatal(ex) => promise.tryFailure(ex) }
})
promise.future
}
/** Stream a `SynchronousDatabaseAction` on this database. */
- protected[this] def streamSynchronousDatabaseAction(a: SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect], ctx: StreamingContext, highPrio: Boolean): Future[Null] = {
+ protected[this] def streamSynchronousDatabaseAction(a: SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect], ctx: StreamingContext, continuation: Boolean): Future[Null] = {
ctx.streamingAction = a
- scheduleSynchronousStreaming(a, ctx, highPrio)(null)
+ scheduleSynchronousStreaming(a, ctx, continuation)(null)
ctx.streamingResultPromise.future
}
/** Stream a part of the results of a `SynchronousDatabaseAction` on this database. */
- protected[BasicBackend] def scheduleSynchronousStreaming(a: SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect], ctx: StreamingContext, highPrio: Boolean)(initialState: a.StreamState): Unit = try {
+ protected[BasicBackend] def scheduleSynchronousStreaming(a: SynchronousDatabaseAction[_, _ <: NoStream, This, _ <: Effect], ctx: StreamingContext, continuation: Boolean)(initialState: a.StreamState): Unit = try {
ctx.getEC(synchronousExecutionContext).prepare.execute(new AsyncExecutor.PrioritizedRunnable {
private[this] def str(l: Long) = if(l != Long.MaxValue) l else if(GlobalConfig.unicodeDump) "\u221E" else "oo"
- def highPriority = highPrio
+ def priority = ctx.priority(continuation)
def run: Unit = try {
val debug = streamLogger.isDebugEnabled
@@ -294,6 +300,7 @@ trait BasicBackend { self =>
throw ex
} finally {
ctx.streamState = state
+ if (!ctx.isPinned) connectionReleased = true
ctx.sync = 0
}
if(debug) {
@@ -368,6 +375,14 @@ trait BasicBackend { self =>
private[BasicBackend] var currentSession: Session = null
+ private[BasicBackend] var releasedConnection = false
+
+ private[BasicBackend] def priority(continuation: Boolean): Priority = {
+ if (currentSession != null) WithConnection
+ else if (continuation) Continuation
+ else Fresh
+ }
+
/** Used for the sequence counter in Action debug output. This variable is volatile because it
* is only updated sequentially but not protected by a synchronous action context. */
@volatile private[BasicBackend] var sequenceCounter = 0
@@ -448,7 +463,7 @@ trait BasicBackend { self =>
streamState = null
if(streamLogger.isDebugEnabled) streamLogger.debug("Scheduling stream continuation after transition from demand = 0")
val a = streamingAction
- database.scheduleSynchronousStreaming(a, this.asInstanceOf[StreamingContext], highPrio = true)(s.asInstanceOf[a.StreamState])
+ database.scheduleSynchronousStreaming(a, this.asInstanceOf[StreamingContext], continuation = true)(s.asInstanceOf[a.StreamState])
} else {
if(streamLogger.isDebugEnabled) streamLogger.debug("Saw transition from demand = 0, but no stream continuation available")
}
@@ -269,7 +269,9 @@ trait JdbcBackend extends RelationalBackend {
classLoader: ClassLoader = ClassLoaderUtil.defaultClassLoader): Database = {
val usedConfig = if(path.isEmpty) config else config.getConfig(path)
val source = JdbcDataSource.forConfig(usedConfig, driver, path, classLoader)
- val executor = AsyncExecutor(path, usedConfig.getIntOr("numThreads", 20), usedConfig.getIntOr("queueSize", 1000))
+ val numThreads = usedConfig.getIntOr("numThreads", 20)
+ val maxConnections = source.maxConnections.fold(numThreads*5)(identity)
+ val executor = AsyncExecutor(path, numThreads, numThreads, usedConfig.getIntOr("queueSize", 1000), maxConnections)
forSource(source, executor)
}
}
@@ -410,17 +412,11 @@ trait JdbcBackend extends RelationalBackend {
}
class BaseSession(val database: Database) extends SessionDef {
- protected var open = false
protected var inTransactionally = 0
- def isOpen = open
def isInTransaction = inTransactionally > 0
- lazy val conn = {
- val c = database.source.createConnection
- open = true
- c
- }
+ val conn = database.source.createConnection
lazy val metaData = conn.getMetaData()
@@ -434,9 +430,7 @@ trait JdbcBackend extends RelationalBackend {
}
}
- def close() {
- if(open) conn.close()
- }
+ def close() { conn.close() }
private[slick] def startInTransaction: Unit = {
if(!isInTransaction) conn.setAutoCommit(false)
Oops, something went wrong.

0 comments on commit 84dc188

Please sign in to comment.