Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
first pass at idle timeout behavior.
Browse files Browse the repository at this point in the history
  • Loading branch information
eaceaser committed May 6, 2011
1 parent 0d44e90 commit 62fadd3
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
2 changes: 2 additions & 0 deletions src/main/scala/com/twitter/querulous/config/Database.scala
Expand Up @@ -19,6 +19,7 @@ class SimplePoolingDatabase {
var size: Int = 10
var openTimeout: Duration = 50.millis
var repopulateInterval: Duration = 2.seconds
var idleTimeout: Duration = 1.minute
}

class TimingOutDatabase {
Expand Down Expand Up @@ -56,6 +57,7 @@ class Database {
new SimplePoolingDatabaseFactory(
simpleConfig.size,
simpleConfig.openTimeout,
simpleConfig.idleTimeout,
simpleConfig.repopulateInterval
)
).getOrElse(new SingleConnectionDatabaseFactory))
Expand Down
Expand Up @@ -7,25 +7,45 @@ import org.apache.commons.dbcp.{PoolableConnection, PoolingDataSource}
import org.apache.commons.pool.{PoolableObjectFactory, ObjectPool}
import com.twitter.querulous.PeriodicBackgroundProcess
import com.twitter.util.Duration
import com.twitter.util.Time
import com.twitter.util.TimeConversions._

class PoolTimeoutException extends SQLException

class SimplePool[T <: AnyRef](factory: SimplePool[T] => T, val size: Int, timeout: Duration) extends ObjectPool {
private val pool = new LinkedBlockingQueue[T]()
class SimplePool(factory: () => Connection, val size: Int, timeout: Duration, idleTimeout: Duration) extends ObjectPool {
private val pool = new LinkedBlockingQueue[(Connection, Time)]()
private val currentSize = new AtomicInteger(0)

for (i <- (0.until(size))) addObject()

def addObject() {
pool.offer(factory(this))
pool.offer((factory(), Time.now))
currentSize.incrementAndGet()
}

def borrowObject(): Object = {
def borrowObject(): Connection = {
val rv = pool.poll(timeout.inMillis, TimeUnit.MILLISECONDS)
if (rv == null) throw new PoolTimeoutException
rv
val lastUse = rv._2
val connection = if ((Time.now - lastUse) > idleTimeout) {
val c = rv._1
// TODO: perhaps replace with forcible termination.
try { c.close() } catch { case _: SQLException => }
invalidateObject(c)
try {
borrowObject()
} catch {
case e: PoolTimeoutException =>
if (getTotal() == 0) {
val conn = factory()
currentSize.incrementAndGet()
conn
} else throw e
}
} else {
rv._1
}
new PoolableConnection(connection, this)
}

def clear() {
Expand Down Expand Up @@ -53,16 +73,16 @@ class SimplePool[T <: AnyRef](factory: SimplePool[T] => T, val size: Int, timeou
}

def returnObject(obj: Object) {
val conn = obj.asInstanceOf[T]
pool.offer(conn)
val conn = obj.asInstanceOf[Connection]
pool.offer((conn, Time.now))
}

def setFactory(factory: PoolableObjectFactory) {
// deprecated
}
}

class PoolWatchdog(pool: SimplePool[_], repopulateInterval: Duration, name: String) extends PeriodicBackgroundProcess(name, repopulateInterval) {
class PoolWatchdog(pool: SimplePool, repopulateInterval: Duration, name: String) extends PeriodicBackgroundProcess(name, repopulateInterval) {
def periodic() {
val delta = pool.size - pool.getTotal()
if (delta > 0) {
Expand All @@ -74,20 +94,25 @@ class PoolWatchdog(pool: SimplePool[_], repopulateInterval: Duration, name: Stri
class SimplePoolingDatabaseFactory(
size: Int,
openTimeout: Duration,
idleTimeout: Duration,
repopulateInterval: Duration,
defaultUrlOptions: Map[String, String]) extends DatabaseFactory {

def this(size: Int, openTimeout: Duration, repopulateInterval: Duration) = this(size, openTimeout, repopulateInterval, Map.empty)
def this(
size: Int,
openTimeout: Duration,
idleTimeout: Duration,
repopulateInterval: Duration) = this(size, openTimeout, idleTimeout, repopulateInterval, Map.empty)

def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
val finalUrlOptions =
val finalUrlOptions =
if (urlOptions eq null) {
defaultUrlOptions
} else {
defaultUrlOptions ++ urlOptions
}

new SimplePoolingDatabase(dbhosts, dbname, username, password, urlOptions, size, openTimeout, repopulateInterval)
new SimplePoolingDatabase(dbhosts, dbname, username, password, urlOptions, size, openTimeout, idleTimeout, repopulateInterval)
}
}

Expand All @@ -99,11 +124,12 @@ class SimplePoolingDatabase(
urlOptions: Map[String, String],
numConnections: Int,
openTimeout: Duration,
idleTimeout: Duration,
repopulateInterval: Duration) extends Database {

Class.forName("com.mysql.jdbc.Driver")

private val pool = new SimplePool(mkConnection, numConnections, openTimeout)
private val pool = new SimplePool(mkConnection, numConnections, openTimeout, idleTimeout)
private val poolingDataSource = new PoolingDataSource(pool)
poolingDataSource.setAccessToUnderlyingConnectionAllowed(true)
private val watchdog = new PoolWatchdog(pool, repopulateInterval, dbhosts.mkString(","))
Expand All @@ -118,12 +144,11 @@ class SimplePoolingDatabase(
}
}


def close(connection: Connection) {
try { connection.close() } catch { case e: SQLException => e.printStackTrace()}
try { connection.close() } catch { case _: SQLException => }
}

protected def mkConnection(p: SimplePool[Connection]): Connection = {
new PoolableConnection(DriverManager.getConnection(url(dbhosts, dbname, urlOptions), username, password), p)
protected def mkConnection(): Connection = {
DriverManager.getConnection(url(dbhosts, dbname, urlOptions), username, password)
}
}
Expand Up @@ -7,7 +7,7 @@ import com.twitter.querulous.evaluator.StandardQueryEvaluatorFactory
import com.twitter.util.TimeConversions._

object SimpleJdbcPoolSpec {
val testDatabaseFactory = new SimplePoolingDatabaseFactory(1, 1.second, 1.second, Map())
val testDatabaseFactory = new SimplePoolingDatabaseFactory(1, 1.second, 1.second, 1.second, Map.empty)
val testQueryFactory = new SqlQueryFactory
val testEvaluatorFactory = new StandardQueryEvaluatorFactory(testDatabaseFactory, testQueryFactory)
private val userEnv = System.getenv("DB_USERNAME")
Expand Down
Expand Up @@ -12,7 +12,8 @@ class SimpleJdbcPoolSpec extends Specification with JMocker {
val connection = mock[Connection]

val repopulateInterval = 250.millis
def createPool(size: Int) = { new SimplePool( { pool: SimplePool[Connection] => connection }, size, 10.millis) }
val idleTimeout = 50.millis
def createPool(size: Int) = { new SimplePool( { () => connection }, size, 10.millis, 50.millis) }

"create and populate" in {
val pool = createPool(5)
Expand Down Expand Up @@ -45,6 +46,26 @@ class SimpleJdbcPoolSpec extends Specification with JMocker {
pool.borrowObject() must throwA[PoolTimeoutException]
}

"eject idle" in {
expect {
one(connection).close()
one(connection).close()
}

val pool = createPool(1)
pool.getTotal() mustEqual 1
Thread.sleep(idleTimeout.inMillis + 5)
val conn = pool.borrowObject() mustNot throwA[PoolTimeoutException]
pool.getNumIdle() mustEqual 0
pool.getTotal() mustEqual 1

// we should throw a timeout exception when the pool isn't empty.
pool.addObject()
pool.getTotal() mustEqual 2
Thread.sleep(idleTimeout.inMillis + 5)
pool.borrowObject() must throwA[PoolTimeoutException]
}

"repopulate" in {
val pool = createPool(1)
val watchdog = new PoolWatchdog(pool, repopulateInterval, "test-watchdog-thread")
Expand Down

0 comments on commit 62fadd3

Please sign in to comment.