Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Incorporate review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghavendra Prabhu committed Mar 23, 2012
1 parent ef48dc8 commit d2645c5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 25 deletions.
Expand Up @@ -55,8 +55,8 @@ extends AsyncDatabase {
def withConnection[R](f: Connection => R): Future[R] = {
val startCoordinator = new AtomicBoolean(true)
val future = workPool {
val runnable = startCoordinator.compareAndSet(true, false)
if (runnable) {
val isRunnable = startCoordinator.compareAndSet(true, false)
if (isRunnable) {
val connection = database.open()
try {
f(connection)
Expand All @@ -68,24 +68,19 @@ extends AsyncDatabase {
}
}

future.within(checkoutTimer, openTimeout) rescue { e =>
e match {
// openTimeout elapsed. If our task has still not started, cancel it and return the
// exception. If not, rescue the exception with the original future, as if nothing
// happened.
case e: TimeoutException => {
val cancellable = startCoordinator.compareAndSet(true, false)
if (cancellable) {
stats.incr("db-async-open-timeout-count", 1)
future.cancel()
Future.exception(e)
} else {
future
}
// If openTimeout elapsed and our task has still not started, cancel it and return the
// exception. If not, rescue the exception with the *original* future, as if nothing
// happened. Any other exception - just propagate unchanged.
future.within(checkoutTimer, openTimeout) rescue {
case e: TimeoutException => {
val isCancellable = startCoordinator.compareAndSet(true, false)
if (isCancellable) {
stats.incr("db-async-open-timeout-count", 1)
future.cancel()
Future.exception(e)
} else {
future // note: this is the original future not bounded by within().
}

// Any other exception - just propagate unchanged.
case _ => Future.exception(e)
}
}
}
Expand Down
Expand Up @@ -6,14 +6,14 @@ import com.twitter.querulous.async
import com.twitter.querulous.database.DatabaseFactory
import com.twitter.querulous.query.QueryFactory

class AsyncQueryEvaluator {
abstract class AsyncQueryEvaluator {
var database: Database = new Database
var query: Query = new Query
var singletonFactory = false

// Size of the work pool used by the AsyncDatabase to do all the DB query work.
// This will usually be the same size as the connection pool.
var workPoolSize: Option[Int] = None
// This should typically be the same size as the DB connection pool.
var workPoolSize: Int

private var memoizedFactory: Option[async.AsyncQueryEvaluatorFactory] = None

Expand All @@ -39,7 +39,7 @@ class AsyncQueryEvaluator {

memoizedFactory = memoizedFactory orElse {
var dbFactory: async.AsyncDatabaseFactory = new async.BlockingDatabaseWrapperFactory(
workPoolSize.get, // workPoolSize is a required setting.
workPoolSize,
newDatabaseFactory(stats, dbStatsFactory),
stats
)
Expand Down
Expand Up @@ -5,8 +5,7 @@ import com.twitter.querulous
import querulous.query.TracingQueryFactory
import com.twitter.querulous.query.QueryFactory

class TracingAsyncQueryEvaluator extends AsyncQueryEvaluator {

abstract class TracingAsyncQueryEvaluator extends AsyncQueryEvaluator {
var tracerFactory: tracing.Tracer.Factory = tracing.NullTracer.factory
var serviceName: String = ""
var annotateQuery: Boolean = true // send info such as service name, ip and trace id with query
Expand Down

0 comments on commit d2645c5

Please sign in to comment.