Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Merge pull request #47 from twitter/cache_connections
Browse files Browse the repository at this point in the history
Implement per-thread connection caching in async querulous
  • Loading branch information
Raghavendra Prabhu committed Mar 27, 2012
2 parents 7117b98 + 402df4c commit 3b8ba05
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.twitter.querulous.async

import java.util.logging.{Logger, Level}
import java.util.concurrent.{Executors, CancellationException}
import java.util.concurrent.{Executors, CancellationException, ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.sql.Connection
import com.twitter.util.{Future, FuturePool, JavaTimer, TimeoutException}
Expand Down Expand Up @@ -41,10 +41,30 @@ class BlockingDatabaseWrapper(
extends AsyncDatabase {
import AsyncConnectionCheckout._

private val workPool = FuturePool(Executors.newFixedThreadPool(
workPoolSize, new DaemonThreadFactory("asyncWorkPool-" + database.hosts.mkString(","))))
// Note: Our executor is similar to what you'd get via Executors.newFixedThreadPool(), but the latter
// returns an ExecutorService, which unfortunately doesn't give us as much visibility into stats as
// the ThreadPoolExecutor, so we create one ourselves.
private val executor = {
val e = new ThreadPoolExecutor(workPoolSize, workPoolSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](),
new DaemonThreadFactory("asyncWorkPool-" + database.hosts.mkString(",")));
stats.addGauge("db-async-active-threads")(e.getActiveCount)
stats.addGauge("db-async-waiters")(e.getQueue.size)
e
}
private val workPool = FuturePool(executor)
private val openTimeout = database.openTimeout

// We cache the connection checked out from the underlying database in a thread local so
// that each workPool thread can hold on to a connection for its lifetime. This saves expensive
// context switches in borrowing/returning connections from the underlying database per request.
private val tlConnection = new ThreadLocal[Connection] {
override def initialValue() = {
stats.incr("db-async-cached-connection-acquire", 1)
database.open()
}
}

// Basically all we need to do is offload the real work to workPool. However, there is one
// complication - enforcement of DB open timeout. If a connection is not available, most
// likely neither is a thread to do the work, so requests would queue up in the future pool.
Expand All @@ -57,11 +77,24 @@ extends AsyncDatabase {
val future = workPool {
val isRunnable = startCoordinator.compareAndSet(true, false)
if (isRunnable) {
val connection = database.open()
val connection = tlConnection.get()
try {
f(connection)
} finally {
database.close(connection)
} catch {
case e => {
// An exception occurred. To be safe, we return our cached connection back to the pool. This
// protects us in case either the connection has been killed or our thread is going to be
// terminated with an unhandled exception. If neither is the case (e.g. this was a benign
// exception like a SQL constraint violation), it still doesn't hurt much to return/re-borrow
// the connection from the underlying database, given that this should be rare.
// TODO: Handle possible connection leakage if this thread is destroyed in some other way.
// (Note that leaking an exception from here will not kill the thread since the FuturePool
// will swallow it and wrap with a Throw()).
stats.incr("db-async-cached-connection-release", 1)
database.close(connection)
tlConnection.remove()
throw e
}
}
} else {
throw new CancellationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ class BlockingDatabaseWrapperSpec extends Specification {
def reset() { totalOpens.set(0); openConns.set(0) }
}

val wrapper = new BlockingDatabaseWrapper(1, database)
val numThreads = 2
val wrapper = new BlockingDatabaseWrapper(numThreads, database)

doBefore { database.reset() }

"withConnection should follow connection lifecycle" in {
wrapper withConnection { _ => "Done" } apply()

database.totalOpens.get mustEqual 1
database.openConns.get mustEqual 1
}

"withConnection should return connection on exception" in {
wrapper withConnection { _ => throw new Exception } handle { case _ => "Done with Exception" } apply()

database.totalOpens.get mustEqual 1
database.openConns.get mustEqual 0
}
Expand All @@ -51,7 +59,7 @@ class BlockingDatabaseWrapperSpec extends Specification {

result mustBe "Done"
database.totalOpens.get mustEqual 1
database.openConns.get mustEqual 0
database.openConns.get mustEqual 1
}

"withConnection should follow lifecycle regardless of cancellation" in {
Expand All @@ -77,10 +85,10 @@ class BlockingDatabaseWrapperSpec extends Specification {
println("Ran block: "+ hitBlock.get)
println("Cancelled: "+ (100000 - completed.size))
println("Completed: "+ completed.size)
println("Leaked: "+ database.openConns.get)
println("Cached: "+ database.openConns.get)

database.totalOpens.get mustEqual completed.size
database.openConns.get mustEqual 0
database.totalOpens.get mustEqual numThreads
database.openConns.get mustEqual numThreads
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class StandardAsyncQueryEvaluatorSpec extends Specification with JMocker with Cl
one(database).open() willReturn connection
one(queryFactory).apply(connection, QueryClass.Select, "SELECT 1") willReturn query
one(query).select(fromRow) willReturn Seq(1)
one(database).close(connection)
}

newEvaluator().select("SELECT 1")(fromRow).get()
Expand All @@ -48,7 +47,6 @@ class StandardAsyncQueryEvaluatorSpec extends Specification with JMocker with Cl
one(database).open() willReturn connection
one(queryFactory).apply(connection, QueryClass.Select, "SELECT 1") willReturn query
one(query).select(fromRow) willReturn Seq(1)
one(database).close(connection)
}

newEvaluator().selectOne("SELECT 1")(fromRow).get()
Expand All @@ -61,7 +59,6 @@ class StandardAsyncQueryEvaluatorSpec extends Specification with JMocker with Cl
one(database).open() willReturn connection
one(queryFactory).apply(connection, QueryClass.Select, "SELECT 1") willReturn query
one(query).select(any[ResultSet => Int]) willReturn Seq(1)
one(database).close(connection)
}

newEvaluator().count("SELECT 1").get()
Expand All @@ -76,7 +73,6 @@ class StandardAsyncQueryEvaluatorSpec extends Specification with JMocker with Cl
one(database).open() willReturn connection
one(queryFactory).apply(connection, QueryClass.Execute, sql) willReturn query
one(query).execute() willReturn 1
one(database).close(connection)
}

newEvaluator().execute("INSERT INTO foo (id) VALUES (1)").get()
Expand All @@ -92,7 +88,6 @@ class StandardAsyncQueryEvaluatorSpec extends Specification with JMocker with Cl
one(queryFactory).apply(connection, QueryClass.Execute, sql) willReturn query
one(query).addParams(1)
one(query).execute() willReturn 1
one(database).close(connection)
}

newEvaluator().executeBatch("INSERT INTO foo (id) VALUES (?)")(_(1)).get()
Expand All @@ -110,7 +105,6 @@ class StandardAsyncQueryEvaluatorSpec extends Specification with JMocker with Cl
one(query).execute() willReturn 1
one(connection).commit()
one(connection).setAutoCommit(true)
one(database).close(connection)
}

newEvaluator().transaction(_.execute("INSERT INTO foo (id) VALUES (1)")).get()
Expand Down

0 comments on commit 3b8ba05

Please sign in to comment.