Skip to content
Browse files

add stats to blocking database wrapper

  • Loading branch information...
1 parent 8dc1a94 commit 56329419e123886fe2f3031478754659c5c73217 @freels freels committed Nov 29, 2011
View
39 querulous-core/src/main/scala/com/twitter/querulous/async/BlockingDatabaseWrapper.scala
@@ -4,15 +4,17 @@ import java.util.logging.{Logger, Level}
import java.util.concurrent.{Executors, RejectedExecutionException}
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.sql.Connection
-import com.twitter.util.{Throw, Future, Promise, FuturePool, JavaTimer, TimeoutException}
-import com.twitter.querulous.DaemonThreadFactory
+import com.twitter.util.{Throw, Future, Promise}
+import com.twitter.util.{FuturePool, ExecutorServiceFuturePool, JavaTimer, TimeoutException}
+import com.twitter.querulous.{StatsCollector, NullStatsCollector, DaemonThreadFactory}
import com.twitter.querulous.database.{Database, DatabaseFactory}
class BlockingDatabaseWrapperFactory(
workPool: => FuturePool,
checkoutPool: => FuturePool,
- factory: DatabaseFactory)
+ factory: DatabaseFactory,
+ stats: StatsCollector = NullStatsCollector)
extends AsyncDatabaseFactory {
def apply(
hosts: List[String],
@@ -25,7 +27,8 @@ extends AsyncDatabaseFactory {
new BlockingDatabaseWrapper(
workPool,
checkoutPool,
- factory(hosts, name, username, password, urlOptions, driverName)
+ factory(hosts, name, username, password, urlOptions, driverName),
+ stats
)
}
}
@@ -37,11 +40,21 @@ private object AsyncConnectionCheckout {
class BlockingDatabaseWrapper(
workPool: FuturePool,
checkoutPool: FuturePool,
- protected[async] val database: Database)
+ protected[async] val database: Database,
+ stats: StatsCollector = NullStatsCollector)
extends AsyncDatabase {
import AsyncConnectionCheckout._
+ getExecutor(workPool) foreach { e =>
+ stats.addGauge("db-async-active-threads")(e.getActiveCount.toDouble)
+ }
+
+ getExecutor(checkoutPool) foreach { e =>
+ val q = e.getQueue
+ stats.addGauge("db-async-waiters")(q.size.toDouble)
+ }
+
private val openTimeout = database.openTimeout
def withConnection[R](f: Connection => R) = {
@@ -85,14 +98,20 @@ extends AsyncDatabase {
// propagate to the checkout pool.
val result = new Promise[Connection]
- checkoutPool { database.open() } respond { result.update(_) }
+ checkoutPool { database.open() } respond { result.update(_) } onFailure { e =>
+ if (e.isInstanceOf[java.util.concurrent.RejectedExecutionException]) {
+ stats.incr("db-async-open-rejected-count", 1)
+ }
+ }
// Return within a specified timeout. If within times out, that
// means the connection is never handed off, so we need to clean
// up ourselves.
result.within(checkoutTimer, openTimeout) onFailure { e =>
if (e.isInstanceOf[java.util.concurrent.TimeoutException]) {
+ stats.incr("db-async-open-timeout-count", 1)
+
// Cancel the checkout.
result.cancel()
@@ -113,6 +132,14 @@ extends AsyncDatabase {
}
}
+ private def getExecutor(p: FuturePool) = p match {
+ case p: ExecutorServiceFuturePool => p.executor match {
+ case e: java.util.concurrent.ThreadPoolExecutor => Some(e)
+ case _ => None
+ }
+ case _ => None
+ }
+
// equality overrides
override def equals(other: Any) = other match {
View
27 querulous-core/src/main/scala/com/twitter/querulous/config/AsyncQueryEvaluator.scala
@@ -36,23 +36,12 @@ class AsyncQueryEvaluator {
synchronized {
if (!singletonFactory) memoizedFactory = None
- val workP = workPool()
- val checkoutP = checkoutPool()
-
- getExecutor(workP) foreach { e =>
- stats.addGauge("db-async-active-threads")(e.getActiveCount.toDouble)
- }
-
- getExecutor(checkoutP) foreach { e =>
- val q = e.getQueue
- stats.addGauge("db-async-waiters")(q.size.toDouble)
- }
-
memoizedFactory = memoizedFactory orElse {
val db = new async.BlockingDatabaseWrapperFactory(
- workP,
- checkoutP,
- newDatabaseFactory(stats)
+ workPool(),
+ checkoutPool(),
+ newDatabaseFactory(stats),
+ stats
)
Some(new async.StandardAsyncQueryEvaluatorFactory(db, newQueryFactory(stats)))
@@ -63,12 +52,4 @@ class AsyncQueryEvaluator {
}
def apply(): async.AsyncQueryEvaluatorFactory = apply(querulous.NullStatsCollector)
-
- private def getExecutor(p: util.FuturePool) = p match {
- case p: util.ExecutorServiceFuturePool => p.executor match {
- case e: java.util.concurrent.ThreadPoolExecutor => Some(e)
- case _ => None
- }
- case _ => None
- }
}

0 comments on commit 5632941

Please sign in to comment.
Something went wrong with that request. Please try again.