Browse files

add back descrete promise creation

  • Loading branch information...
1 parent 1b5be24 commit 7610ae99d211d936be5937b89737e7b3903bef19 @freels freels committed Dec 1, 2011
View
16 querulous-core/src/main/scala/com/twitter/querulous/async/BlockingDatabaseWrapper.scala
@@ -4,7 +4,7 @@ import java.util.logging.{Logger, Level}
import java.util.concurrent.{Executors, RejectedExecutionException}
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
import java.sql.Connection
-import com.twitter.util.{Throw, Future, Promise}
+import com.twitter.util.{Try, Throw, Future, Promise}
import com.twitter.util.{FuturePool, ExecutorServiceFuturePool, JavaTimer, TimeoutException}
import com.twitter.querulous.{StatsCollector, NullStatsCollector, DaemonThreadFactory}
import com.twitter.querulous.database.{Database, DatabaseFactory}
@@ -62,14 +62,22 @@ extends AsyncDatabase {
workPool {
f(conn)
} ensure {
- database.close(conn)
+ closeConnection(conn)
}
}
}
private def checkoutConnection(): Future[Connection] = {
- val conn = stats.timeFutureMillis("db-async-open-timing") {
- checkoutPool { database.open() }
+
+ // XXX: there is a bug in FuturePool before util 1.12.5 that
+ // causes it to potentially drop completed work when cancelled. As
+ // a workaround, we create a promise explicitly and set it from
+ // the future pool in order to prevent the checkout FuturePool
+ // from receiving cancellation signals.
+ val conn = new Promise[Connection]()
+
+ stats.timeFutureMillis("db-async-open-timing") {
+ checkoutPool { conn() = Try(database.open()) }
}
// Return within a specified timeout. If within times out, that
View
4 querulous-core/src/test/scala/com/twitter/querulous/unit/BlockingDatabaseWrapperSpec.scala
@@ -49,7 +49,7 @@ class BlockingDatabaseWrapperSpec extends Specification {
"withConnection should follow lifecycle regardless of cancellation" in {
val hitBlock = new AtomicInteger(0)
- val futures = for (i <- 1 to 1000) yield {
+ val futures = for (i <- 1 to 100000) yield {
val f = wrapper.withConnection { _ =>
hitBlock.incrementAndGet
"Done"
@@ -68,7 +68,7 @@ class BlockingDatabaseWrapperSpec extends Specification {
// println debugging
println("Opened: "+ database.totalOpens.get)
println("Ran block: "+ hitBlock.get)
- println("Cancelled: "+ (1000 - completed.size))
+ println("Cancelled: "+ (100000 - completed.size))
println("Completed: "+ completed.size)
println("Leaked: "+ database.openConns.get)
View
24 querulous-core/src/test/scala/com/twitter/querulous/unit/ThrottledPoolingDatabaseSpec.scala
@@ -49,45 +49,58 @@ class ThrottledPoolSpec extends Specification with JMocker {
"create and populate" in {
val pool = createPool(5)
+
pool.getTotal() mustEqual 5
}
"successfully construct if connections fail to create" in {
val pool = new ThrottledPool( { () => throw new Exception("blah!") }, 5, 10.millis, 50.millis, "test")
+
pool.getTotal() mustEqual 0
}
"checkout" in {
val pool = createPool(5)
+
pool.getTotal() mustEqual 5
+
pool.borrowObject()
+
pool.getNumActive() mustEqual 1
pool.getNumIdle() mustEqual 4
}
"return" in {
val pool = createPool(5)
+
pool.getTotal() mustEqual 5
+
val conn = pool.borrowObject()
+
pool.getNumActive() mustEqual 1
+
pool.returnObject(conn)
+
pool.getNumActive() mustEqual 0
pool.getNumIdle() mustEqual 5
}
"timeout" in {
val pool = createPool(1)
+
pool.getTotal() mustEqual 1
+
pool.borrowObject()
+
pool.getNumIdle() mustEqual 0
pool.borrowObject() must throwA[PoolTimeoutException]
}
- "fast fail when the pool is empty" {
+ "fast fail when the pool is empty" in {
val pool = createPool(0)
+
pool.getTotal() mustEqual 0
pool.borrowObject() must throwA[PoolEmptyException]
- 1
}
"eject idle" in {
@@ -96,18 +109,24 @@ class ThrottledPoolSpec extends Specification with JMocker {
}
val pool = createPool(5)
+
pool.getTotal() mustEqual 5
+
Thread.sleep(idleTimeout.inMillis + 5)
pool.borrowObject()
+
pool.getTotal() mustEqual 1
}
"repopulate" in {
val pool = createPool(2)
val conn = pool.borrowObject()
+
pool.invalidateObject(conn)
pool.getTotal() mustEqual 1
+
val conn2 = pool.borrowObject()
+
pool.invalidateObject(conn2)
pool.getTotal() mustEqual 0
new PoolWatchdogThread(pool, List(""), repopulateInterval).start()
@@ -116,6 +135,7 @@ class ThrottledPoolSpec extends Specification with JMocker {
pool.getTotal() must eventually(4, TimeConversions.intToRichLong(100).millis) (be_==(1))
pool.getTotal() must eventually(4, TimeConversions.intToRichLong(100).millis) (be_==(2))
Thread.sleep(repopulateInterval.inMillis + 100)
+
// make sure that the watchdog thread won't add more connections than the size of the pool
pool.getTotal() must be_==(2)
}

0 comments on commit 7610ae9

Please sign in to comment.