Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Implement per-thread connection caching in BlockingDatabaseWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghavendra Prabhu committed Mar 26, 2012
1 parent 7117b98 commit d8c4a16
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Expand Up @@ -45,6 +45,13 @@ extends AsyncDatabase {
workPoolSize, new DaemonThreadFactory("asyncWorkPool-" + database.hosts.mkString(","))))
private val openTimeout = database.openTimeout

// We cache the connection checked out from the underlying database in a thread local so
// that each workPool thread can hold on to a connection for its lifetime. This saves expensive
// context switches in borrowing/returning connections from the underlying database per request.
private val tlConnection = new ThreadLocal[Connection] {
override def initialValue() = database.open()
}

// Basically all we need to do is offload the real work to workPool. However, there is one
// complication - enforcement of DB open timeout. If a connection is not available, most
// likely neither is a thread to do the work, so requests would queue up in the future pool.
Expand All @@ -57,11 +64,20 @@ extends AsyncDatabase {
val future = workPool {
val isRunnable = startCoordinator.compareAndSet(true, false)
if (isRunnable) {
val connection = database.open()
val connection = tlConnection.get()
try {
f(connection)
} finally {
database.close(connection)
} catch {
case e => {
// An exception occurred. To be safe, we return our cached connection back to the pool. This
// protects us in case either the connection has been killed or our thread is going to be
// terminated with an unhandled exception. If neither is the case (e.g. this was a benign
// exception like a SQL constraint violation), it still doesn't hurt much to return/re-borrow
// the connection from the underlying database, given that this should be rare.
database.close(connection)
tlConnection.remove()
throw e
}
}
} else {
throw new CancellationException
Expand Down
Expand Up @@ -32,13 +32,21 @@ class BlockingDatabaseWrapperSpec extends Specification {
def reset() { totalOpens.set(0); openConns.set(0) }
}

val wrapper = new BlockingDatabaseWrapper(1, database)
val numThreads = 2
val wrapper = new BlockingDatabaseWrapper(numThreads, database)

doBefore { database.reset() }

"withConnection should follow connection lifecycle" in {
wrapper withConnection { _ => "Done" } apply()

database.totalOpens.get mustEqual 1
database.openConns.get mustEqual 1
}

"withConnection should return connection on exception" in {
wrapper withConnection { _ => throw new Exception } handle { case _ => "Done with Exception" } apply()

database.totalOpens.get mustEqual 1
database.openConns.get mustEqual 0
}
Expand All @@ -51,7 +59,7 @@ class BlockingDatabaseWrapperSpec extends Specification {

result mustBe "Done"
database.totalOpens.get mustEqual 1
database.openConns.get mustEqual 0
database.openConns.get mustEqual 1
}

"withConnection should follow lifecycle regardless of cancellation" in {
Expand All @@ -77,10 +85,10 @@ class BlockingDatabaseWrapperSpec extends Specification {
println("Ran block: "+ hitBlock.get)
println("Cancelled: "+ (100000 - completed.size))
println("Completed: "+ completed.size)
println("Leaked: "+ database.openConns.get)
println("Cached: "+ database.openConns.get)

database.totalOpens.get mustEqual completed.size
database.openConns.get mustEqual 0
database.totalOpens.get mustEqual numThreads
database.openConns.get mustEqual numThreads
}
}
}

0 comments on commit d8c4a16

Please sign in to comment.