Permalink
Browse files

Backport #1745 to 3.2 (#1754)

* fixes race-condition using unpinned/flatMapped DBIOActions, leading to missing decreaseInUseCount calls and possible stall of ManagedBlockingArrayQueue
* Try "inUseCount" as field name
  • Loading branch information...
trevorsibanda committed Jul 12, 2017
1 parent d205216 commit f21a58af4b60c9aed465da6b6774f062ba416371
@@ -85,6 +85,17 @@ h2mem1 = {
keepAliveConnection = true
}
h2mem-inuse = {
url = "jdbc:h2:mem:test1"
driver = org.h2.Driver
connectionPool = "HikariCP"
numThreads = 5
maxConnections = 5
keepAliveConnection = true
registerMbeans = true
poolName = "inUseCount"
}
// MBeans-enabled configuration, used by MBeansTest
mbeans = {
url = "jdbc:h2:mem:test_mbeans"
@@ -0,0 +1,99 @@
package slick.test.jdbc.hikaricp
import java.lang.management.ManagementFactory
import java.util.concurrent.TimeUnit
import javax.management.ObjectName
import com.typesafe.slick.testkit.util.{AsyncTest, JdbcTestDB}
import org.junit.Assert.assertEquals
import org.junit.{After, Before, Ignore, Test}
import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile.api._
import slick.lifted.{ProvenShape, TableQuery}
import slick.util.SlickLogger
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Try, Success}
class SlickInUseCountTest extends AsyncTest[JdbcTestDB] {
val poolName = "inUseCount"
val mbeanServer = ManagementFactory.getPlatformMBeanServer
val aeBeanName = new ObjectName(s"slick:type=AsyncExecutor,name=$poolName")
val poolBeanName = new ObjectName(s"com.zaxxer.hikari:type=Pool ($poolName)")
val logger = new SlickLogger(LoggerFactory.getLogger("slick.util.AsyncExecutor"))
class TestTable(tag: Tag) extends Table[(Int)](tag, "SDL") {
def id: Rep[Int] = column[Int]("ID")
def * : ProvenShape[(Int)] = id
}
var database: Database = _
val testTable: TableQuery[TestTable] = TableQuery[TestTable]
@Before
def openDatabase() = {
System.setProperty("com.zaxxer.hikari.housekeeping.periodMs", "5000")
database = Database.forConfig("h2mem-inuse")
Await.result(database.run(testTable.schema.create), Duration.Inf /*2.seconds*/)
}
@After
def closeDatabase() = {
Await.result(database.run(testTable.schema.drop), 2.seconds)
database.close()
}
@Test def slickInUseCount() {
val loops = 1000
val count = 100
1 to loops foreach { _ =>
val tasks = 1 to count map { i =>
val action = { testTable += i }
.flatMap { _ => testTable.length.result }
//.flatMap { _ => DBIO.successful(s"inserted value $i") }
database.run(action)
}
Await.result(Future.sequence(tasks), Duration(10, TimeUnit.SECONDS))
}
//we need to wait until there are no more active threads in the threadpool
//DBIOAction results might be available before the threads have completely finished their work
while (mbeanServer.getAttribute(aeBeanName, "ActiveThreads").asInstanceOf[Int] > 0) {
Thread.sleep(100)
}
assertEquals(0, inUseCount)
}
/**
* Use introspection to retrieve the inUseCount field of the ManagedArrayBlockingQueue
*/
def inUseCount: Int = {
val asyncExecutorField = database.getClass.getDeclaredField("executor")
asyncExecutorField.setAccessible(true)
val asyncExecutor = asyncExecutorField.get(database)
val threadPoolExecutorField = asyncExecutor.getClass.getDeclaredField("slick$util$AsyncExecutor$$anon$$executor")
threadPoolExecutorField.setAccessible(true)
val threadPoolExecutor = threadPoolExecutorField.get(asyncExecutor)
val queue = threadPoolExecutor.getClass.getMethod("getQueue").invoke(threadPoolExecutor)
val inUseCountField = Seq("inUseCount", "slick$util$ManagedArrayBlockingQueue$$inUseCount").collectFirst{name =>
Try(queue.getClass.getDeclaredField(name)) match{
case Success(field) => field
}
}.get
inUseCountField.setAccessible(true)
inUseCountField.get(queue).asInstanceOf[Int]
}
}
@@ -231,7 +231,11 @@ trait BasicBackend { self =>
protected[this] def runSynchronousDatabaseAction[R](a: SynchronousDatabaseAction[R, NoStream, This, _], ctx: Context, continuation: Boolean): Future[R] = {
val promise = Promise[R]()
ctx.getEC(synchronousExecutionContext).prepare.execute(new AsyncExecutor.PrioritizedRunnable {
def priority = ctx.priority(continuation)
def priority = {
ctx.readSync
ctx.priority(continuation)
}
def run: Unit =
try {
ctx.readSync
@@ -244,7 +248,7 @@ trait BasicBackend { self =>
releaseSession(ctx, false)
res
} finally {
if (!ctx.isPinned) connectionReleased = true
if (!ctx.isPinned && ctx.priority(continuation) != WithConnection) connectionReleased = true
ctx.sync = 0
}
promise.success(res)
@@ -265,7 +269,10 @@ trait BasicBackend { self =>
ctx.getEC(synchronousExecutionContext).prepare.execute(new AsyncExecutor.PrioritizedRunnable {
private[this] def str(l: Long) = if(l != Long.MaxValue) l else if(GlobalConfig.unicodeDump) "\u221E" else "oo"
def priority = ctx.priority(continuation)
def priority = {
ctx.readSync
ctx.priority(continuation)
}
def run: Unit = try {
val debug = streamLogger.isDebugEnabled
@@ -300,7 +307,7 @@ trait BasicBackend { self =>
throw ex
} finally {
ctx.streamState = state
if (!ctx.isPinned) connectionReleased = true
if (!ctx.isPinned && ctx.priority(continuation) != WithConnection) connectionReleased = true
ctx.sync = 0
}
if(debug) {
@@ -375,8 +382,6 @@ trait BasicBackend { self =>
private[BasicBackend] var currentSession: Session = null
private[BasicBackend] var releasedConnection = false
private[BasicBackend] def priority(continuation: Boolean): Priority = {
if (currentSession != null) WithConnection
else if (continuation) Continuation
@@ -83,9 +83,7 @@ object AsyncExecutor extends Logging {
override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
(r, queue) match {
case (pr: PrioritizedRunnable, q: ManagedArrayBlockingQueue[Runnable]) =>
if (pr.connectionReleased && pr.priority != WithConnection) q.decreaseInUseCount()
pr.inUseCounterSet = false
case (pr: PrioritizedRunnable, q: ManagedArrayBlockingQueue[Runnable]) if pr.connectionReleased => q.decreaseInUseCount()
case _ =>
}
}
@@ -152,7 +150,7 @@ object AsyncExecutor extends Logging {
case object Fresh extends Priority
/** Continuation is used for database actions that are a continuation of some previously executed actions */
case object Continuation extends Priority
/** WithContinuation is used for database actions that already have a JDBC connection associated. */
/** WithConnection is used for database actions that already have a JDBC connection associated. */
case object WithConnection extends Priority
trait PrioritizedRunnable extends Runnable {

0 comments on commit f21a58a

Please sign in to comment.