Skip to content

Commit

Permalink
Merge branch 'master' of github.com:nkallen/querulous
Browse files Browse the repository at this point in the history
  • Loading branch information
Robey Pointer committed Sep 16, 2010
2 parents 7e54ca5 + 6c1aeea commit 0dc8550
Show file tree
Hide file tree
Showing 23 changed files with 279 additions and 109 deletions.
7 changes: 6 additions & 1 deletion config/test.conf
Original file line number Original file line Diff line number Diff line change
@@ -1,4 +1,9 @@
db { db {
hostname = "localhost"
username = "root" username = "root"
password = "" password = ""
} url_options {
useUnicode = "true"
characterEncoding = "UTF-8"
}
}
4 changes: 2 additions & 2 deletions project/build.properties
Original file line number Original file line Diff line number Diff line change
@@ -1,8 +1,8 @@
#Project properties #Project properties
#Sat Apr 17 13:53:31 PDT 2010 #Wed Sep 08 16:18:46 PDT 2010
project.organization=com.twitter project.organization=com.twitter
project.name=querulous project.name=querulous
sbt.version=0.7.4 sbt.version=0.7.4
project.version=1.1.13 project.version=1.2.1
build.scala.versions=2.7.7 build.scala.versions=2.7.7
project.initialize=false project.initialize=false
2 changes: 1 addition & 1 deletion project/plugins/Plugins.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import sbt._


class Plugins(info: ProjectInfo) extends PluginDefinition(info) { class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val twitterNest = "com.twitter" at "http://www.lag.net/nest" val twitterNest = "com.twitter" at "http://www.lag.net/nest"
val defaultProject = "com.twitter" % "standard-project" % "0.7.0" val defaultProject = "com.twitter" % "standard-project" % "0.7.1"
} }
3 changes: 0 additions & 3 deletions project/plugins/project/build.properties

This file was deleted.

37 changes: 37 additions & 0 deletions src/main/scala/com/twitter/querulous/AutoDisabler.scala
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.twitter.querulous

import com.twitter.xrayspecs.{Time, Duration}
import com.twitter.xrayspecs.TimeConversions._
import java.sql.{SQLException, SQLIntegrityConstraintViolationException}


trait AutoDisabler {
protected val disableErrorCount: Int
protected val disableDuration: Duration

private var disabledUntil: Time = Time.never
private var consecutiveErrors = 0

protected def throwIfDisabled(throwMessage: String): Unit = {
synchronized {
if (Time.now < disabledUntil) {
throw new SQLException("Server is temporarily disabled: " + throwMessage)
}
}
}

protected def throwIfDisabled(): Unit = { throwIfDisabled("") }

protected def noteOperationOutcome(success: Boolean) {
synchronized {
if (success) {
consecutiveErrors = 0
} else {
consecutiveErrors += 1
if (consecutiveErrors >= disableErrorCount) {
disabledUntil = disableDuration.fromNow
}
}
}
}
}
42 changes: 42 additions & 0 deletions src/main/scala/com/twitter/querulous/ConnectionDestroying.scala
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.twitter.querulous.query

import java.sql.Connection
import org.apache.commons.dbcp.{DelegatingConnection => DBCPConnection}
import com.mysql.jdbc.{ConnectionImpl => MySQLConnection}


// Emergency connection destruction toolkit

trait ConnectionDestroying {
def destroyConnection(conn: Connection) {
if ( !conn.isClosed )
conn match {
case c: DBCPConnection =>
destroyDbcpWrappedConnection(c)
case c: MySQLConnection =>
destroyMysqlConnection(c)
case _ => error("Unsupported driver type, cannot reliably timeout.")
}
}

def destroyDbcpWrappedConnection(conn: DBCPConnection) {
val inner = conn.getInnermostDelegate

if ( inner != null ) {
destroyConnection(inner)
} else {
// this should never happen if we use our own ApachePoolingDatabase to get connections.
error("Could not get access to the delegate connection. Make sure the dbcp connection pool allows access to underlying connections.")
}

// "close" the wrapper so that it updates its internal bookkeeping, just do it
try { conn.close } catch { case _ => }
}

def destroyMysqlConnection(conn: MySQLConnection) {
val abort = Class.forName("com.mysql.jdbc.ConnectionImpl").getDeclaredMethod("abortInternal")
abort.setAccessible(true)

abort.invoke(conn)
}
}
12 changes: 8 additions & 4 deletions src/main/scala/com/twitter/querulous/Timeout.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import com.twitter.xrayspecs.Duration
class TimeoutException extends Exception class TimeoutException extends Exception


object Timeout { object Timeout {
val timer = new Timer("Timer thread", true) val defaultTimer = new Timer("Timer thread", true)


def apply[T](timeout: Duration)(f: => T)(onTimeout: => Unit): T = { def apply[T](timer: Timer, timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
@volatile var cancelled = false @volatile var cancelled = false
val task = if (timeout.inMillis > 0) val task = if (timeout.inMillis > 0)
Some(schedule(timeout, { cancelled = true; onTimeout })) Some(schedule(timer, timeout, { cancelled = true; onTimeout }))
else None else None


try { try {
Expand All @@ -26,7 +26,11 @@ object Timeout {
} }
} }


private def schedule(timeout: Duration, f: => Unit) = { def apply[T](timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
apply(defaultTimer, timeout)(f)(onTimeout)
}

private def schedule(timer: Timer, timeout: Duration, f: => Unit) = {
val task = new TimerTask() { val task = new TimerTask() {
override def run() { f } override def run() { f }
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ class ApachePoolingDatabaseFactory(
checkConnectionHealthOnReservation: Boolean, checkConnectionHealthOnReservation: Boolean,
evictConnectionIfIdleFor: Duration) extends DatabaseFactory { evictConnectionIfIdleFor: Duration) extends DatabaseFactory {


def apply(dbhosts: List[String], dbname: String, username: String, password: String) = { def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
val pool = new ApachePoolingDatabase( val pool = new ApachePoolingDatabase(
dbhosts, dbhosts,
dbname, dbname,
username, username,
password, password,
urlOptions,
minOpenConnections, minOpenConnections,
maxOpenConnections, maxOpenConnections,
checkConnectionHealthWhenIdleFor, checkConnectionHealthWhenIdleFor,
Expand All @@ -27,15 +28,14 @@ class ApachePoolingDatabaseFactory(
evictConnectionIfIdleFor) evictConnectionIfIdleFor)
pool pool
} }

def apply(dbhosts: List[String], username: String, password: String) = apply(dbhosts, null, username, password)
} }


class ApachePoolingDatabase( class ApachePoolingDatabase(
dbhosts: List[String], dbhosts: List[String],
dbname: String, dbname: String,
username: String, username: String,
password: String, password: String,
urlOptions: Map[String, String],
minOpenConnections: Int, minOpenConnections: Int,
maxOpenConnections: Int, maxOpenConnections: Int,
checkConnectionHealthWhenIdleFor: Duration, checkConnectionHealthWhenIdleFor: Duration,
Expand All @@ -52,12 +52,14 @@ class ApachePoolingDatabase(
config.maxWait = maxWaitForConnectionReservation.inMillis config.maxWait = maxWaitForConnectionReservation.inMillis


config.timeBetweenEvictionRunsMillis = checkConnectionHealthWhenIdleFor.inMillis config.timeBetweenEvictionRunsMillis = checkConnectionHealthWhenIdleFor.inMillis
config.testWhileIdle = true config.testWhileIdle = false
config.testOnBorrow = checkConnectionHealthOnReservation config.testOnBorrow = checkConnectionHealthOnReservation
config.minEvictableIdleTimeMillis = evictConnectionIfIdleFor.inMillis config.minEvictableIdleTimeMillis = evictConnectionIfIdleFor.inMillis


config.lifo = false

private val connectionPool = new GenericObjectPool(null, config) private val connectionPool = new GenericObjectPool(null, config)
private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname), username, password) private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname, urlOptions), username, password)
private val poolableConnectionFactory = new PoolableConnectionFactory( private val poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory, connectionFactory,
connectionPool, connectionPool,
Expand All @@ -66,6 +68,7 @@ class ApachePoolingDatabase(
false, false,
true) true)
private val poolingDataSource = new PoolingDataSource(connectionPool) private val poolingDataSource = new PoolingDataSource(connectionPool)
poolingDataSource.setAccessToUnderlyingConnectionAllowed(true)


def close(connection: Connection) { def close(connection: Connection) {
try { try {
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.twitter.querulous.database

import com.twitter.xrayspecs.Duration
import com.twitter.xrayspecs.TimeConversions._
import java.sql.{Connection, SQLException, SQLIntegrityConstraintViolationException}


class AutoDisablingDatabaseFactory(databaseFactory: DatabaseFactory, disableErrorCount: Int, disableDuration: Duration) extends DatabaseFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
new AutoDisablingDatabase(
databaseFactory(dbhosts, dbname, username, password, urlOptions),
dbhosts.first,
disableErrorCount,
disableDuration)
}
}

class AutoDisablingDatabase(database: Database, dbhost: String, protected val disableErrorCount: Int, protected val disableDuration: Duration) extends Database with AutoDisabler {
def open() = {
throwIfDisabled(dbhost)
try {
val rv = database.open()
noteOperationOutcome(true)
rv
} catch {
case e: SQLException =>
noteOperationOutcome(false)
throw e
case e: Exception =>
throw e
}
}

def close(connection: Connection) { database.close(connection) }
}
21 changes: 15 additions & 6 deletions src/main/scala/com/twitter/querulous/database/Database.scala
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ object DatabaseFactory {
} }


trait DatabaseFactory { trait DatabaseFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String): Database def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]): Database
def apply(dbhosts: List[String], username: String, password: String): Database
def apply(dbhosts: List[String], dbname: String, username: String, password: String): Database =
apply(dbhosts, dbname, username, password, null)

def apply(dbhosts: List[String], username: String, password: String): Database =
apply(dbhosts, null, username, password, null)
} }


trait Database { trait Database {
Expand All @@ -52,10 +57,14 @@ trait Database {
} }
} }


protected def url(dbhosts: List[String], dbname: String) = { protected def url(dbhosts: List[String], dbname: String, urlOptions: Map[String, String]) = {
val dbnameSegment = if (dbname == null) "" else ("/" + dbname) val dbnameSegment = if (dbname == null) "" else ("/" + dbname)
"jdbc:mysql://" + dbhosts.mkString(",") + dbnameSegment + "?" + urlOptions val urlOptsSegment = if (urlOptions == null) {
} "?useUnicode=true&characterEncoding=UTF-8"
} else {
"?" + urlOptions.keys.map( k => k + "=" + urlOptions(k) ).mkString("&")
}


def urlOptions = "useUnicode=true&characterEncoding=UTF-8" "jdbc:mysql://" + dbhosts.mkString(",") + dbnameSegment + urlOptsSegment
}
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import scala.collection.mutable
class MemoizingDatabaseFactory(databaseFactory: DatabaseFactory) extends DatabaseFactory { class MemoizingDatabaseFactory(databaseFactory: DatabaseFactory) extends DatabaseFactory {
private val databases = new mutable.HashMap[String, Database] with mutable.SynchronizedMap[String, Database] private val databases = new mutable.HashMap[String, Database] with mutable.SynchronizedMap[String, Database]


def apply(dbhosts: List[String], dbname: String, username: String, password: String) = synchronized { def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = synchronized {
databases.getOrElseUpdate( databases.getOrElseUpdate(
dbhosts.first + "/" + dbname, dbhosts.first + "/" + dbname,
databaseFactory(dbhosts, dbname, username, password)) databaseFactory(dbhosts, dbname, username, password, urlOptions))
} }


def apply(dbhosts: List[String], username: String, password: String) = databaseFactory(dbhosts, username, password) // cannot memoize a connection without specifying a database
override def apply(dbhosts: List[String], username: String, password: String) = databaseFactory(dbhosts, username, password)
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import java.sql.{SQLException, Connection}




class SingleConnectionDatabaseFactory extends DatabaseFactory { class SingleConnectionDatabaseFactory extends DatabaseFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String) = { def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
new SingleConnectionDatabase(dbhosts, dbname, username, password) new SingleConnectionDatabase(dbhosts, dbname, username, password, urlOptions)
} }
def apply(dbhosts: List[String], username: String, password: String) = apply(dbhosts, null, username, password)
} }


class SingleConnectionDatabase(dbhosts: List[String], dbname: String, username: String, password: String) class SingleConnectionDatabase(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String])
extends Database { extends Database {
Class.forName("com.mysql.jdbc.Driver") Class.forName("com.mysql.jdbc.Driver")
private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname), username, password) private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname, urlOptions), username, password)


def close(connection: Connection) { def close(connection: Connection) {
try { try {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ class StatsCollectingDatabaseFactory(
databaseFactory: DatabaseFactory, databaseFactory: DatabaseFactory,
stats: StatsCollector) extends DatabaseFactory { stats: StatsCollector) extends DatabaseFactory {


def apply(dbhosts: List[String], dbname: String, username: String, password: String) = { def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
new StatsCollectingDatabase(databaseFactory(dbhosts, dbname, username, password), stats) new StatsCollectingDatabase(databaseFactory(dbhosts, dbname, username, password, urlOptions), stats)
}

def apply(dbhosts: List[String], username: String, password: String) = {
new StatsCollectingDatabase(databaseFactory(dbhosts, username, password), stats)
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import net.lag.logging.Logger
class SqlDatabaseTimeoutException(msg: String, val timeout: Duration) extends SQLException(msg) class SqlDatabaseTimeoutException(msg: String, val timeout: Duration) extends SQLException(msg)


class TimingOutDatabaseFactory(databaseFactory: DatabaseFactory, poolSize: Int, queueSize: Int, openTimeout: Duration, initialTimeout: Duration, maxConnections: Int) extends DatabaseFactory { class TimingOutDatabaseFactory(databaseFactory: DatabaseFactory, poolSize: Int, queueSize: Int, openTimeout: Duration, initialTimeout: Duration, maxConnections: Int) extends DatabaseFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String) = { def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
new TimingOutDatabase(databaseFactory(dbhosts, dbname, username, password), dbhosts, dbname, poolSize, queueSize, openTimeout, initialTimeout, maxConnections) val dbLabel = if (dbname != null) dbname else "(null)"
}


def apply(dbhosts: List[String], username: String, password: String) = { new TimingOutDatabase(databaseFactory(dbhosts, dbname, username, password, urlOptions), dbhosts, dbLabel, poolSize, queueSize, openTimeout, initialTimeout, maxConnections)
new TimingOutDatabase(databaseFactory(dbhosts, username, password), dbhosts, "(null)", poolSize, queueSize, openTimeout, initialTimeout, maxConnections)
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@ class AutoDisablingQueryEvaluatorFactory(
disableDuration) disableDuration)
} }


def apply(dbhosts: List[String], dbname: String, username: String, password: String) = { def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
chainEvaluator(queryEvaluatorFactory(dbhosts, dbname, username, password)) chainEvaluator(queryEvaluatorFactory(dbhosts, dbname, username, password, urlOptions))
}

def apply(dbhosts: List[String], username: String, password: String) = {
chainEvaluator(queryEvaluatorFactory(dbhosts, username, password))
} }
} }


class AutoDisablingQueryEvaluator( class AutoDisablingQueryEvaluator (
queryEvaluator: QueryEvaluator, queryEvaluator: QueryEvaluator,
disableErrorCount: Int, protected val disableErrorCount: Int,
disableDuration: Duration) extends QueryEvaluatorProxy(queryEvaluator) { protected val disableDuration: Duration) extends QueryEvaluatorProxy(queryEvaluator) with AutoDisabler {


private var disabledUntil: Time = Time.never private var disabledUntil: Time = Time.never
private var consecutiveErrors = 0 private var consecutiveErrors = 0
Expand All @@ -56,24 +52,4 @@ class AutoDisablingQueryEvaluator(
} }
} }


private def noteOperationOutcome(success: Boolean) {
synchronized {
if (success) {
consecutiveErrors = 0
} else {
consecutiveErrors += 1
if (consecutiveErrors >= disableErrorCount) {
disabledUntil = disableDuration.fromNow
}
}
}
}

private def throwIfDisabled() {
synchronized {
if (Time.now < disabledUntil) {
throw new SQLException("Server is temporarily disabled")
}
}
}
} }
Loading

0 comments on commit 0dc8550

Please sign in to comment.