Skip to content
Browse files

workaround for broken util behavior, fix over-cancellation.

  • Loading branch information...
1 parent 8964b40 commit c4c0510b68789cedac68e4f33ca427f070dcddc6 @freels freels committed Nov 14, 2011
View
33 querulous-core/src/main/scala/com/twitter/querulous/async/BlockingDatabaseWrapper.scala
@@ -1,9 +1,9 @@
package com.twitter.querulous.async
import java.util.concurrent.{Executors, RejectedExecutionException}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.sql.Connection
-import com.twitter.util.{Throw, Future, FuturePool, JavaTimer, TimeoutException}
+import com.twitter.util.{Throw, Future, Promise, FuturePool, JavaTimer, TimeoutException}
import com.twitter.querulous.DaemonThreadFactory
import com.twitter.querulous.database.{Database, DatabaseFactory}
@@ -45,28 +45,37 @@ extends AsyncDatabase {
def withConnection[R](f: Connection => R) = {
checkoutConnection() flatMap { conn =>
+
+ // As a workaround for a FuturePool bug where it may throw away
+ // work if cancelled but already in progress, therefore not
+ // allowing ensure to predictably clean up, use an AtomicBoolean
+ // to allow the finally block and ensure block to race to close
+ // the connection.
+ val closed = new AtomicBoolean(false)
+
workPool {
- f(conn)
+ try {
+ f(conn)
+ } finally {
+ if (!closed.getAndSet(true)) database.close(conn)
+ }
} ensure {
- database.close(conn)
+ if (!closed.getAndSet(true)) database.close(conn)
}
}
}
private def checkoutConnection(): Future[Connection] = {
- val result = checkoutPool {
- database.open()
- }
+ // creating a detached promise here so that cancellations do not
+ // propagate to the checkout pool.
+ val result = new Promise[Connection]
- // release the connection if future is cancelled
- result onCancellation {
- result foreach { database.close(_) }
- }
+ checkoutPool { database.open() } respond { result.update(_) }
// cancel future if it times out
result.within(checkoutTimer, openTimeout) onFailure { e =>
if (e.isInstanceOf[java.util.concurrent.TimeoutException]) {
- result.cancel()
+ result foreach { database.close(_) }
}
}
}

0 comments on commit c4c0510

Please sign in to comment.
Something went wrong with that request. Please try again.