Skip to content

No leaks #34

Merged
merged 6 commits into from Nov 22, 2011
View
72 querulous-core/src/main/scala/com/twitter/querulous/async/BlockingDatabaseWrapper.scala
@@ -1,9 +1,10 @@
package com.twitter.querulous.async
+import java.util.logging.{Logger, Level}
import java.util.concurrent.{Executors, RejectedExecutionException}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.sql.Connection
-import com.twitter.util.{Throw, Future, FuturePool, JavaTimer, TimeoutException}
+import com.twitter.util.{Throw, Future, Promise, FuturePool, JavaTimer, TimeoutException}
import com.twitter.querulous.DaemonThreadFactory
import com.twitter.querulous.database.{Database, DatabaseFactory}
@@ -45,29 +46,70 @@ extends AsyncDatabase {
def withConnection[R](f: Connection => R) = {
checkoutConnection() flatMap { conn =>
+
+ // TODO: remove the atomic boolean and the close attempt in the
+ // try/finally block.
+ //
+ // As a workaround for a FuturePool bug where it may throw away
+ // work if cancelled but already in progress, therefore not
+ // allowing ensure to predictably clean up, use an AtomicBoolean
+ // to allow ensure and the work block to race to steal the
+ // connection.
+ val inProgress = new AtomicBoolean(false)
+
workPool {
- try {
- f(conn)
- } finally {
+ if(inProgress.compareAndSet(false, true)) {
+ try {
+ f(conn)
+ } finally {
+ database.close(conn)
+ }
+ } else {
+ // not truly an error in this case, but we need something
+ // that evalutates to Nothing here.
+ error("Lost race with ensure block. Connection closed.")
+ }
+ } ensure {
+ if (inProgress.compareAndSet(false, true)) {
database.close(conn)
}
}
}
}
private def checkoutConnection(): Future[Connection] = {
- val result = checkoutPool {
- database.open()
- }
+ // TODO: remove the explicit promise creation once twitter util
+ // gets updated.
+ //
+ // creating a detached promise here so that cancellations do not
+ // propagate to the checkout pool.
+ val result = new Promise[Connection]
- // release the connection if future is cancelled
- result onCancellation {
- result foreach { database.close(_) }
- }
+ checkoutPool { database.open() } respond { result.update(_) }
@9len
9len added a note Nov 15, 2011

I think if you're going to use an explicit promise, you have to explicitly link it to the checkoutPool future, so that cancellation is propagated properly.

@freels
freels added a note Nov 16, 2011

That's the point. I don't want cancellation to propagate in this case, in order to not trigger the regression in FuturePool.

@9len
9len added a note Nov 16, 2011

got it, though stevej will very shortly check in the future pool fix...

@stevej
stevej added a note Nov 17, 2011

the future pool fix is in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
- // cancel future if it times out
- result.within(checkoutTimer, openTimeout) onFailure {
- case _: java.util.concurrent.TimeoutException => result.cancel()
+ // Return within a specified timeout. If within times out, that
+ // means the connection is never handed off, so we need to clean
+ // up ourselves.
+ result.within(checkoutTimer, openTimeout) onFailure { e =>
+ if (e.isInstanceOf[java.util.concurrent.TimeoutException]) {
@9len
9len added a note Nov 15, 2011

why not use a case matcher here?

@9len
9len added a note Nov 15, 2011

catching up, realize this is a matcher problem. why not just do:

case _: java.util.concurrent.TimeoutException =>
...
case _ => ()

@freels
freels added a note Nov 15, 2011

If this was using if/then, I'd do that. I don't like the explicit "otherwise, do nothing" match. Delete that and this starts breaking.

@9len
9len added a note Nov 16, 2011

sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ // Cancel the checkout.
+ result.cancel()
+
+ // If the future is not cancelled in time, close the dangling
+ // connection.
+ result foreach { c =>
+ try {
+ database.close(c)
+ } catch {
+ case e => Logger.getLogger("querulous").log(
+ Level.WARNING,
+ "Exception on database close.",
+ e
+ )
+ }
+ }
+ }
}
}
View
37 querulous-core/src/main/scala/com/twitter/querulous/config/AsyncQueryEvaluator.scala
@@ -8,12 +8,15 @@ trait FuturePool {
def apply(): util.FuturePool
}
-object DefaultWorkPool extends FuturePool {
- def apply() = async.AsyncQueryEvaluator.defaultWorkPool
-}
-
class AsyncQueryEvaluator {
- var workPool: FuturePool = DefaultWorkPool
+ var workPool: FuturePool = new FuturePool {
+ def apply() = async.AsyncQueryEvaluator.defaultWorkPool
+ }
+
+ var checkoutPool: FuturePool = new FuturePool {
+ def apply() = async.AsyncQueryEvaluator.checkoutPool(maxWaiters)
+ }
+
var database: Database = new Database
var query: Query = new Query
var maxWaiters = async.AsyncQueryEvaluator.defaultMaxWaiters
@@ -25,10 +28,22 @@ class AsyncQueryEvaluator {
synchronized {
if (!singletonFactory) memoizedFactory = None
+ val workP = workPool()
+ val checkoutP = checkoutPool()
+
+ getExecutor(workP) foreach { e =>
+ stats.addGauge("db-async-active-threads")(e.getActiveCount.toDouble)
+ }
+
+ getExecutor(checkoutP) foreach { e =>
+ val q = e.getQueue
+ stats.addGauge("db-async-waiters")(q.size.toDouble)
+ }
+
memoizedFactory = memoizedFactory orElse {
val db = new async.BlockingDatabaseWrapperFactory(
- DefaultWorkPool(),
- async.AsyncQueryEvaluator.checkoutPool(maxWaiters),
+ workP,
+ checkoutP,
database(stats)
)
@@ -40,4 +55,12 @@ class AsyncQueryEvaluator {
}
def apply(): async.AsyncQueryEvaluatorFactory = apply(querulous.NullStatsCollector)
+
+ private def getExecutor(p: util.FuturePool) = p match {
+ case p: util.ExecutorServiceFuturePool => p.executor match {
+ case e: java.util.concurrent.ThreadPoolExecutor => Some(e)
+ case _ => None
+ }
+ case _ => None
+ }
}
View
80 querulous-core/src/test/scala/com/twitter/querulous/unit/BlockingDatabaseWrapperSpec.scala
@@ -0,0 +1,80 @@
+package com.twitter.querulous.unit
+
+import org.specs.Specification
+import java.sql.Connection
+import java.util.concurrent.atomic._
+import com.twitter.util.Future
+import com.twitter.querulous.database._
+import com.twitter.querulous.query._
+import com.twitter.querulous.async._
+import com.twitter.conversions.time._
+
+
+class BlockingDatabaseWrapperSpec extends Specification {
+ "BlockingDatabaseWrapper" should {
+ val database = new DatabaseProxy {
+ var database: Database = _ // the "real" database
+ val totalOpens = new AtomicInteger(0)
+ val openConns = new AtomicInteger(0)
+
+ // the one other method BlockingDatabaseWrapper uses, hence the
+ // override
+ override def openTimeout = 500.millis
+
+ def open() = {
+ totalOpens.incrementAndGet
+ openConns.incrementAndGet
+ null.asInstanceOf[Connection]
+ }
+
+ def close(c: Connection) { openConns.decrementAndGet }
+
+ def reset() { totalOpens.set(0); openConns.set(0) }
+ }
+
+ val wrapper = new BlockingDatabaseWrapper(
+ AsyncQueryEvaluator.defaultWorkPool,
+ AsyncQueryEvaluator.checkoutPool(50),
+ database
+ )
+
+ doBefore { database.reset() }
+
+ "withConnection should follow connection lifecycle" in {
+ wrapper withConnection { _ => "Done" } apply()
+
+ database.totalOpens.get mustEqual 1
+ database.openConns.get mustEqual 0
+ }
+
+ "withConnection should follow lifecycle regardless of cancellation" in {
+ val hitBlock = new AtomicInteger(0)
+ val futures = for (i <- 1 to 1000) yield {
+ val f = wrapper.withConnection { _ =>
+ hitBlock.incrementAndGet
+ "Done"
+ } handle {
+ case e => "Cancelled"
+ }
+
+ f.cancel()
+ f
+ }
+
+ val results = Future.collect(futures).apply()
+ val completed = results partition { _ == "Done" } _1
+
+
+ // println debugging
+ println("Opened: "+ database.totalOpens.get)
@9len
9len added a note Nov 15, 2011

you could just wrap this in "if database.openConns.get != 0", so you only get the useful debug when you need it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ println("Ran block: "+ hitBlock.get)
+ println("Cancelled: "+ (1000 - completed.size))
+ println("Completed: "+ completed.size)
+ println("Leaked: "+ database.openConns.get)
+
+ // TODO: commented out, but should pass with the fix in util
+ //database.totalOpens.get mustEqual completed.size
+ database.openConns.get mustEqual 0
+ }
+ }
+}
Something went wrong with that request. Please try again.