Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
change throttledpool watchdog task from timer task to a dedicated thr…
Browse files Browse the repository at this point in the history
…ead waking up periodically, change default connection timeout to 100ms from 500ms
  • Loading branch information
yswu committed Aug 5, 2011
1 parent bf5af4a commit 04bdb32
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 37 deletions.
Expand Up @@ -10,7 +10,7 @@ object Database {
val defaultUrlOptions = Map(
"useUnicode" -> "true",
"characterEncoding" -> "UTF-8",
"connectTimeout" -> "500"
"connectTimeout" -> "100"
)
}

Expand Down
@@ -1,6 +1,5 @@
package com.twitter.querulous.database

import java.util.{Timer, TimerTask}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import java.sql.{SQLException, DriverManager, Connection}
Expand All @@ -9,6 +8,7 @@ import org.apache.commons.pool.{PoolableObjectFactory, ObjectPool}
import com.twitter.util.Duration
import com.twitter.util.Time
import scala.annotation.tailrec
import java.lang.Thread

class PoolTimeoutException extends SQLException
class PoolEmptyException extends SQLException
Expand Down Expand Up @@ -118,16 +118,35 @@ class ThrottledPool(factory: () => Connection, val size: Int, timeout: Duration,
}
}

class PoolWatchdog(pool: ThrottledPool) extends TimerTask {
def run() {
try {
pool.addObjectUnlessFull()
} catch {
case e: Throwable =>
System.err.println("Watchdog task tried to throw an exception: " + e.toString())
e.printStackTrace(System.err) // output to stderr for now. will inject logging later.
class PoolWatchdogThread(
pool: ThrottledPool,
hosts: Seq[String],
repopulateInterval: Duration) extends Thread(hosts.mkString(",") + "-pool-watchdog") {

this.setDaemon(true)

override def run() {
var lastTimePoolPopulated = Time.now
while(true) {
try {
val timeToSleepInMills = (repopulateInterval - (Time.now - lastTimePoolPopulated)).inMillis
if (timeToSleepInMills > 0) {
Thread.sleep(timeToSleepInMills)
}
lastTimePoolPopulated = Time.now
pool.addObjectUnlessFull()
} catch {
case t: Throwable => {
System.err.println(Time.now.format("yyyy-MM-dd HH:mm:ss Z") + ": " +
Thread.currentThread().getName() +
" failed to add connection to the pool")
t.printStackTrace(System.err)
}
}
}
}

// TODO: provide a reliable way to have this thread exit when shutdown is implemented
}

class ThrottledPoolingDatabaseFactory(
Expand All @@ -151,7 +170,7 @@ class ThrottledPoolingDatabaseFactory(
defaultUrlOptions ++ urlOptions
}

new ThrottledPoolingDatabase(dbhosts, dbname, username, password, urlOptions, size, openTimeout, idleTimeout, repopulateInterval)
new ThrottledPoolingDatabase(dbhosts, dbname, username, password, finalUrlOptions, size, openTimeout, idleTimeout, repopulateInterval)
}
}

Expand All @@ -171,9 +190,7 @@ class ThrottledPoolingDatabase(
private val pool = new ThrottledPool(mkConnection, numConnections, openTimeout, idleTimeout)
private val poolingDataSource = new PoolingDataSource(pool)
poolingDataSource.setAccessToUnderlyingConnectionAllowed(true)
private val watchdogTask = new PoolWatchdog(pool)
private val watchdog = new Timer(hosts.mkString(",") + "-pool-watchdog", true)
watchdog.scheduleAtFixedRate(watchdogTask, 0, repopulateInterval.inMillis)
new PoolWatchdogThread(pool, hosts, repopulateInterval).start()

def open() = {
try {
Expand Down
Expand Up @@ -30,4 +30,3 @@ class ThrottledPoolingDatabaseSpec extends ConfiguredSpecification {
}
}
}

Expand Up @@ -12,8 +12,16 @@ import com.twitter.querulous.query.{SqlQueryTimeoutException, TimingOutQueryFact
object ThrottledPoolingDatabaseWithFakeConnSpec {
// configure repopulation interval to a minute to avoid conn repopulation when test running
val testDatabaseFactory = new ThrottledPoolingDatabaseFactory(1, 1.second, 1.second, 60.seconds, Map.empty)
// configure repopulation interval to 1 second so that we can verify the watchdog actually works
val testRepopulatedDatabaseFactory = new ThrottledPoolingDatabaseFactory(1, 1.second, 1.second, 1.second, Map.empty)
val testQueryFactory = new TimingOutQueryFactory(new SqlQueryFactory, 500.millis, true)
val testEvaluatorFactory = new StandardQueryEvaluatorFactory(testDatabaseFactory, testQueryFactory)
val testRepopulatedEvaluatorFactory = new StandardQueryEvaluatorFactory(testRepopulatedDatabaseFactory, testQueryFactory)
// configure repopulation interval to 100ms, and connection timeout to 2 seconds
val testRepopulatedLongConnTimeoutDbFactory = new ThrottledPoolingDatabaseFactory(1, 1.second,
1.second, 100.milliseconds, Map("connectTimeout" -> "2000"))
val testRepopulatedLongConnTimeoutEvaluatorFactory = new StandardQueryEvaluatorFactory(
testRepopulatedLongConnTimeoutDbFactory, testQueryFactory)
}

class ThrottledPoolingDatabaseWithFakeConnSpec extends ConfiguredSpecification {
Expand All @@ -23,19 +31,16 @@ class ThrottledPoolingDatabaseWithFakeConnSpec extends ConfiguredSpecification {

"ThrottledJdbcPoolSpec" should {
val host = config.hostnames.mkString(",") + "/" + config.database
val queryEvaluator = testEvaluatorFactory(config)

FakeContext.setQueryResult(host, "SELECT 1 FROM DUAL", Array(Array[java.lang.Object](1.asInstanceOf[AnyRef])))
FakeContext.setQueryResult(host, "SELECT 2 FROM DUAL", Array(Array[java.lang.Object](2.asInstanceOf[AnyRef])))
"execute some queries" >> {
val queryEvaluator = testEvaluatorFactory(config)

queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } mustEqual List(1)
queryEvaluator.select("SELECT 2 FROM DUAL") { r => r.getInt(1) } mustEqual List(2)
}

"failfast after a host is down" >> {
val queryEvaluator = testEvaluatorFactory(config)

queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } mustEqual List(1)
FakeContext.markServerDown(host)
try {
Expand All @@ -49,8 +54,6 @@ class ThrottledPoolingDatabaseWithFakeConnSpec extends ConfiguredSpecification {
}

"failfast after connections are closed due to query timeout" >> {
val queryEvaluator = testEvaluatorFactory(config)

queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } mustEqual List(1)
FakeContext.setTimeTakenToExecQuery(host, 1.second)
try {
Expand All @@ -63,6 +66,42 @@ class ThrottledPoolingDatabaseWithFakeConnSpec extends ConfiguredSpecification {
FakeContext.setTimeTakenToExecQuery(host, 0.second)
}
}

"repopulate the pool every repopulation interval" >> {
val queryEvaluator = testRepopulatedEvaluatorFactory(config)

queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } mustEqual List(1)
FakeContext.setTimeTakenToExecQuery(host, 1.second)
try {
// this will cause the underlying connection being destroyed
queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } must throwA[SqlQueryTimeoutException]
Thread.sleep(2000)
FakeContext.setTimeTakenToExecQuery(host, 0.second)
// after repopulation, biz as usual
queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } mustEqual List(1)
} finally {
FakeContext.setTimeTakenToExecQuery(host, 0.second)
}
}

"repopulate the pool even if it takes longer to establish a connection than repopulation interval" >> {
val queryEvaluator = testRepopulatedLongConnTimeoutEvaluatorFactory(config)

queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } mustEqual List(1)
FakeContext.markServerDown(host)
try {
// this will cause the underlying connection being destroyed
queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } must throwA[CommunicationsException]
FakeContext.setTimeTakenToOpenConn(host, 1.second)
FakeContext.markServerUp(host)
Thread.sleep(2000)
// after repopulation, biz as usual
queryEvaluator.select("SELECT 1 FROM DUAL") { r => r.getInt(1) } mustEqual List(1)
} finally {
FakeContext.setTimeTakenToOpenConn(host, 0.second)
FakeContext.markServerUp(host)
}
}
}

doAfterSpec { Database.driverName = "jdbc:mysql" }
Expand Down
@@ -1,12 +1,12 @@
package com.twitter.querulous.unit

import com.twitter.querulous.database.{PoolTimeoutException, PoolEmptyException, ThrottledPool, PoolWatchdog, PooledConnection}
import com.twitter.conversions.time._
import java.util.Timer
import java.sql.{SQLException, Connection}
import org.apache.commons.pool.ObjectPool
import org.specs.Specification
import org.specs.mock.JMocker
import org.specs.util.TimeConversions
import com.twitter.querulous.database._
import java.sql.{SQLException, Connection}

class PooledConnectionSpec extends Specification with JMocker {
"PooledConnectionSpec" should {
Expand Down Expand Up @@ -39,11 +39,8 @@ class PooledConnectionSpec extends Specification with JMocker {
}
}



class ThrottledPoolSpec extends Specification with JMocker {
"ThrottledPoolSpec" should {
val size = 1
val connection = mock[Connection]

val repopulateInterval = 250.millis
Expand All @@ -58,7 +55,7 @@ class ThrottledPoolSpec extends Specification with JMocker {
"checkout" in {
val pool = createPool(5)
pool.getTotal() mustEqual 5
val conn = pool.borrowObject()
pool.borrowObject()
pool.getNumActive() mustEqual 1
pool.getNumIdle() mustEqual 4
}
Expand All @@ -76,7 +73,7 @@ class ThrottledPoolSpec extends Specification with JMocker {
"timeout" in {
val pool = createPool(1)
pool.getTotal() mustEqual 1
val conn = pool.borrowObject()
pool.borrowObject()
pool.getNumIdle() mustEqual 0
pool.borrowObject() must throwA[PoolTimeoutException]
}
Expand All @@ -97,7 +94,7 @@ class ThrottledPoolSpec extends Specification with JMocker {
val pool = createPool(1)
pool.getTotal() mustEqual 1
Thread.sleep(idleTimeout.inMillis + 5)
val conn = pool.borrowObject()
pool.borrowObject()
pool.getNumIdle() mustEqual 0
pool.getTotal() mustEqual 1

Expand All @@ -110,19 +107,20 @@ class ThrottledPoolSpec extends Specification with JMocker {

"repopulate" in {
val pool = createPool(2)
val timer = new Timer(true)
val watchdog = new PoolWatchdog(pool)
val conn = pool.borrowObject()
pool.invalidateObject(conn)
pool.getTotal() mustEqual 1
val conn2 = pool.borrowObject()
pool.invalidateObject(conn2)
pool.getTotal() mustEqual 0
timer.scheduleAtFixedRate(watchdog, 0, repopulateInterval.inMillis)
pool.getTotal() must eventually(be_==(1))
Thread.sleep(repopulateInterval)
pool.getTotal() must eventually(be_==(2))
timer.cancel()
new PoolWatchdogThread(pool, List(""), repopulateInterval).start()
// 4 retries give us about 300ms to check whether the condition is finally met
// because first check is applied immediately
pool.getTotal() must eventually(4, TimeConversions.intToRichLong(100).millis) (be_==(1))
pool.getTotal() must eventually(4, TimeConversions.intToRichLong(100).millis) (be_==(2))
Thread.sleep(repopulateInterval.inMillis + 100)
// make sure that the watchdog thread won't add more connections than the size of the pool
pool.getTotal() must be_==(2)
}
}
}

0 comments on commit 04bdb32

Please sign in to comment.