Permalink
Browse files

Merge remote branch 'freels/new_version'

  • Loading branch information...
2 parents 4b452a0 + a283b66 commit 8931e1a7ef549c5ce3fc6190fe162bc7baa00861 Ed Ceaser committed Sep 9, 2010
Showing with 278 additions and 107 deletions.
  1. +6 −1 config/test.conf
  2. +2 −2 project/build.properties
  3. +1 −1 project/plugins/Plugins.scala
  4. +1 −1 project/plugins/project/build.properties
  5. +37 −0 src/main/scala/com/twitter/querulous/AutoDisabler.scala
  6. +42 −0 src/main/scala/com/twitter/querulous/ConnectionDestroying.scala
  7. +8 −4 src/main/scala/com/twitter/querulous/Timeout.scala
  8. +6 −5 src/main/scala/com/twitter/querulous/database/ApachePoolingDatabase.scala
  9. +35 −0 src/main/scala/com/twitter/querulous/database/AutoDisablingDatabase.scala
  10. +15 −6 src/main/scala/com/twitter/querulous/database/Database.scala
  11. +4 −3 src/main/scala/com/twitter/querulous/database/MemoizingDatabase.scala
  12. +4 −5 src/main/scala/com/twitter/querulous/database/SingleConnectionDatabase.scala
  13. +2 −6 src/main/scala/com/twitter/querulous/database/StatsCollectingDatabase.scala
  14. +3 −5 src/main/scala/com/twitter/querulous/database/TimingOutDatabase.scala
  15. +5 −29 src/main/scala/com/twitter/querulous/evaluator/AutoDisablingQueryEvaluator.scala
  16. +31 −12 src/main/scala/com/twitter/querulous/evaluator/QueryEvaluator.scala
  17. +2 −7 src/main/scala/com/twitter/querulous/evaluator/StandardQueryEvaluator.scala
  18. +37 −7 src/main/scala/com/twitter/querulous/query/TimingOutQuery.scala
  19. +17 −4 src/main/scala/com/twitter/querulous/query/TimingOutStatsCollectingQuery.scala
  20. +9 −3 src/test/scala/com/twitter/querulous/integration/QuerySpec.scala
  21. +2 −2 src/test/scala/com/twitter/querulous/unit/MemoizingDatabaseFactorySpec.scala
  22. +3 −2 src/test/scala/com/twitter/querulous/unit/QueryEvaluatorSpec.scala
  23. +6 −2 src/test/scala/com/twitter/querulous/unit/TimingOutQuerySpec.scala
View
@@ -1,4 +1,9 @@
db {
+ hostname = "localhost"
username = "root"
password = ""
-}
+ url_options {
+ useUnicode = "true"
+ characterEncoding = "UTF-8"
+ }
+}
View
@@ -1,8 +1,8 @@
#Project properties
-#Sat Apr 17 13:53:31 PDT 2010
+#Wed Sep 08 16:18:46 PDT 2010
project.organization=com.twitter
project.name=querulous
sbt.version=0.7.4
-project.version=1.1.13
+project.version=1.2.0
build.scala.versions=2.7.7
project.initialize=false
@@ -2,5 +2,5 @@ import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val twitterNest = "com.twitter" at "http://www.lag.net/nest"
- val defaultProject = "com.twitter" % "standard-project" % "0.7.0"
+ val defaultProject = "com.twitter" % "standard-project" % "0.7.1"
}
@@ -1,3 +1,3 @@
#Project properties
-#Wed Sep 01 16:30:10 PDT 2010
+#Wed Sep 08 16:18:11 PDT 2010
plugin.uptodate=true
@@ -0,0 +1,37 @@
+package com.twitter.querulous
+
+import com.twitter.xrayspecs.{Time, Duration}
+import com.twitter.xrayspecs.TimeConversions._
+import java.sql.{SQLException, SQLIntegrityConstraintViolationException}
+
+
+trait AutoDisabler {
+ protected val disableErrorCount: Int
+ protected val disableDuration: Duration
+
+ private var disabledUntil: Time = Time.never
+ private var consecutiveErrors = 0
+
+ protected def throwIfDisabled(throwMessage: String): Unit = {
+ synchronized {
+ if (Time.now < disabledUntil) {
+ throw new SQLException("Server is temporarily disabled: " + throwMessage)
+ }
+ }
+ }
+
+ protected def throwIfDisabled(): Unit = { throwIfDisabled("") }
+
+ protected def noteOperationOutcome(success: Boolean) {
+ synchronized {
+ if (success) {
+ consecutiveErrors = 0
+ } else {
+ consecutiveErrors += 1
+ if (consecutiveErrors >= disableErrorCount) {
+ disabledUntil = disableDuration.fromNow
+ }
+ }
+ }
+ }
+}
@@ -0,0 +1,42 @@
+package com.twitter.querulous.query
+
+import java.sql.Connection
+import org.apache.commons.dbcp.{DelegatingConnection => DBCPConnection}
+import com.mysql.jdbc.{ConnectionImpl => MySQLConnection}
+
+
+// Emergency connection destruction toolkit
+
+trait ConnectionDestroying {
+ def destroyConnection(conn: Connection) {
+ if ( !conn.isClosed )
+ conn match {
+ case c: DBCPConnection =>
+ destroyDbcpWrappedConnection(c)
+ case c: MySQLConnection =>
+ destroyMysqlConnection(c)
+ case _ => error("Unsupported driver type, cannot reliably timeout.")
+ }
+ }
+
+ def destroyDbcpWrappedConnection(conn: DBCPConnection) {
+ val inner = conn.getInnermostDelegate
+
+ if ( inner != null ) {
+ destroyConnection(inner)
+ } else {
+ // this should never happen if we use our own ApachePoolingDatabase to get connections.
+ error("Could not get access to the delegate connection. Make sure the dbcp connection pool allows access to underlying connections.")
+ }
+
+ // "close" the wrapper so that it updates its internal bookkeeping, just do it
+ try { conn.close } catch { case _ => }
+ }
+
+ def destroyMysqlConnection(conn: MySQLConnection) {
+ val abort = Class.forName("com.mysql.jdbc.ConnectionImpl").getDeclaredMethod("abortInternal")
+ abort.setAccessible(true)
+
+ abort.invoke(conn)
+ }
+}
@@ -7,12 +7,12 @@ import com.twitter.xrayspecs.Duration
class TimeoutException extends Exception
object Timeout {
- val timer = new Timer("Timer thread", true)
+ val defaultTimer = new Timer("Timer thread", true)
- def apply[T](timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
+ def apply[T](timer: Timer, timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
@volatile var cancelled = false
val task = if (timeout.inMillis > 0)
- Some(schedule(timeout, { cancelled = true; onTimeout }))
+ Some(schedule(timer, timeout, { cancelled = true; onTimeout }))
else None
try {
@@ -26,7 +26,11 @@ object Timeout {
}
}
- private def schedule(timeout: Duration, f: => Unit) = {
+ def apply[T](timeout: Duration)(f: => T)(onTimeout: => Unit): T = {
+ apply(defaultTimer, timeout)(f)(onTimeout)
+ }
+
+ private def schedule(timer: Timer, timeout: Duration, f: => Unit) = {
val task = new TimerTask() {
override def run() { f }
}
@@ -13,12 +13,13 @@ class ApachePoolingDatabaseFactory(
checkConnectionHealthOnReservation: Boolean,
evictConnectionIfIdleFor: Duration) extends DatabaseFactory {
- def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
val pool = new ApachePoolingDatabase(
dbhosts,
dbname,
username,
password,
+ urlOptions,
minOpenConnections,
maxOpenConnections,
checkConnectionHealthWhenIdleFor,
@@ -27,15 +28,14 @@ class ApachePoolingDatabaseFactory(
evictConnectionIfIdleFor)
pool
}
-
- def apply(dbhosts: List[String], username: String, password: String) = apply(dbhosts, null, username, password)
}
class ApachePoolingDatabase(
dbhosts: List[String],
dbname: String,
username: String,
password: String,
+ urlOptions: Map[String, String],
minOpenConnections: Int,
maxOpenConnections: Int,
checkConnectionHealthWhenIdleFor: Duration,
@@ -52,12 +52,12 @@ class ApachePoolingDatabase(
config.maxWait = maxWaitForConnectionReservation.inMillis
config.timeBetweenEvictionRunsMillis = checkConnectionHealthWhenIdleFor.inMillis
- config.testWhileIdle = true
+ config.testWhileIdle = false
config.testOnBorrow = checkConnectionHealthOnReservation
config.minEvictableIdleTimeMillis = evictConnectionIfIdleFor.inMillis
private val connectionPool = new GenericObjectPool(null, config)
- private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname), username, password)
+ private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname, urlOptions), username, password)
private val poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory,
connectionPool,
@@ -66,6 +66,7 @@ class ApachePoolingDatabase(
false,
true)
private val poolingDataSource = new PoolingDataSource(connectionPool)
+ poolingDataSource.setAccessToUnderlyingConnectionAllowed(true)
def close(connection: Connection) {
try {
@@ -0,0 +1,35 @@
+package com.twitter.querulous.database
+
+import com.twitter.xrayspecs.Duration
+import com.twitter.xrayspecs.TimeConversions._
+import java.sql.{Connection, SQLException, SQLIntegrityConstraintViolationException}
+
+
+class AutoDisablingDatabaseFactory(databaseFactory: DatabaseFactory, disableErrorCount: Int, disableDuration: Duration) extends DatabaseFactory {
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
+ new AutoDisablingDatabase(
+ databaseFactory(dbhosts, dbname, username, password, urlOptions),
+ dbhosts.first,
+ disableErrorCount,
+ disableDuration)
+ }
+}
+
+class AutoDisablingDatabase(database: Database, dbhost: String, protected val disableErrorCount: Int, protected val disableDuration: Duration) extends Database with AutoDisabler {
+ def open() = {
+ throwIfDisabled(dbhost)
+ try {
+ val rv = database.open()
+ noteOperationOutcome(true)
+ rv
+ } catch {
+ case e: SQLException =>
+ noteOperationOutcome(false)
+ throw e
+ case e: Exception =>
+ throw e
+ }
+ }
+
+ def close(connection: Connection) { database.close(connection) }
+}
@@ -34,8 +34,13 @@ object DatabaseFactory {
}
trait DatabaseFactory {
- def apply(dbhosts: List[String], dbname: String, username: String, password: String): Database
- def apply(dbhosts: List[String], username: String, password: String): Database
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]): Database
+
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String): Database =
+ apply(dbhosts, dbname, username, password, null)
+
+ def apply(dbhosts: List[String], username: String, password: String): Database =
+ apply(dbhosts, null, username, password, null)
}
trait Database {
@@ -52,10 +57,14 @@ trait Database {
}
}
- protected def url(dbhosts: List[String], dbname: String) = {
+ protected def url(dbhosts: List[String], dbname: String, urlOptions: Map[String, String]) = {
val dbnameSegment = if (dbname == null) "" else ("/" + dbname)
- "jdbc:mysql://" + dbhosts.mkString(",") + dbnameSegment + "?" + urlOptions
- }
+ val urlOptsSegment = if (urlOptions == null) {
+ "?useUnicode=true&characterEncoding=UTF-8"
+ } else {
+ "?" + urlOptions.keys.map( k => k + "=" + urlOptions(k) ).mkString("&")
+ }
- def urlOptions = "useUnicode=true&characterEncoding=UTF-8"
+ "jdbc:mysql://" + dbhosts.mkString(",") + dbnameSegment + urlOptsSegment
+ }
}
@@ -5,11 +5,12 @@ import scala.collection.mutable
class MemoizingDatabaseFactory(databaseFactory: DatabaseFactory) extends DatabaseFactory {
private val databases = new mutable.HashMap[String, Database] with mutable.SynchronizedMap[String, Database]
- def apply(dbhosts: List[String], dbname: String, username: String, password: String) = synchronized {
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = synchronized {
databases.getOrElseUpdate(
dbhosts.first + "/" + dbname,
- databaseFactory(dbhosts, dbname, username, password))
+ databaseFactory(dbhosts, dbname, username, password, urlOptions))
}
- def apply(dbhosts: List[String], username: String, password: String) = databaseFactory(dbhosts, username, password)
+ // cannot memoize a connection without specifying a database
+ override def apply(dbhosts: List[String], username: String, password: String) = databaseFactory(dbhosts, username, password)
}
@@ -5,16 +5,15 @@ import java.sql.{SQLException, Connection}
class SingleConnectionDatabaseFactory extends DatabaseFactory {
- def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
- new SingleConnectionDatabase(dbhosts, dbname, username, password)
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
+ new SingleConnectionDatabase(dbhosts, dbname, username, password, urlOptions)
}
- def apply(dbhosts: List[String], username: String, password: String) = apply(dbhosts, null, username, password)
}
-class SingleConnectionDatabase(dbhosts: List[String], dbname: String, username: String, password: String)
+class SingleConnectionDatabase(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String])
extends Database {
Class.forName("com.mysql.jdbc.Driver")
- private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname), username, password)
+ private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname, urlOptions), username, password)
def close(connection: Connection) {
try {
@@ -6,12 +6,8 @@ class StatsCollectingDatabaseFactory(
databaseFactory: DatabaseFactory,
stats: StatsCollector) extends DatabaseFactory {
- def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
- new StatsCollectingDatabase(databaseFactory(dbhosts, dbname, username, password), stats)
- }
-
- def apply(dbhosts: List[String], username: String, password: String) = {
- new StatsCollectingDatabase(databaseFactory(dbhosts, username, password), stats)
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
+ new StatsCollectingDatabase(databaseFactory(dbhosts, dbname, username, password, urlOptions), stats)
}
}
@@ -9,12 +9,10 @@ import net.lag.logging.Logger
class SqlDatabaseTimeoutException(msg: String) extends SQLException(msg)
class TimingOutDatabaseFactory(databaseFactory: DatabaseFactory, poolSize: Int, queueSize: Int, openTimeout: Duration, initialTimeout: Duration, maxConnections: Int) extends DatabaseFactory {
- def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
- new TimingOutDatabase(databaseFactory(dbhosts, dbname, username, password), dbhosts, dbname, poolSize, queueSize, openTimeout, initialTimeout, maxConnections)
- }
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
+ val dbLabel = if (dbname != null) dbname else "(null)"
- def apply(dbhosts: List[String], username: String, password: String) = {
- new TimingOutDatabase(databaseFactory(dbhosts, username, password), dbhosts, "(null)", poolSize, queueSize, openTimeout, initialTimeout, maxConnections)
+ new TimingOutDatabase(databaseFactory(dbhosts, dbname, username, password, urlOptions), dbhosts, dbLabel, poolSize, queueSize, openTimeout, initialTimeout, maxConnections)
}
}
@@ -18,19 +18,15 @@ class AutoDisablingQueryEvaluatorFactory(
disableDuration)
}
- def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
- chainEvaluator(queryEvaluatorFactory(dbhosts, dbname, username, password))
- }
-
- def apply(dbhosts: List[String], username: String, password: String) = {
- chainEvaluator(queryEvaluatorFactory(dbhosts, username, password))
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
+ chainEvaluator(queryEvaluatorFactory(dbhosts, dbname, username, password, urlOptions))
}
}
-class AutoDisablingQueryEvaluator(
+class AutoDisablingQueryEvaluator (
queryEvaluator: QueryEvaluator,
- disableErrorCount: Int,
- disableDuration: Duration) extends QueryEvaluatorProxy(queryEvaluator) {
+ protected val disableErrorCount: Int,
+ protected val disableDuration: Duration) extends QueryEvaluatorProxy(queryEvaluator) with AutoDisabler {
private var disabledUntil: Time = Time.never
private var consecutiveErrors = 0
@@ -56,24 +52,4 @@ class AutoDisablingQueryEvaluator(
}
}
- private def noteOperationOutcome(success: Boolean) {
- synchronized {
- if (success) {
- consecutiveErrors = 0
- } else {
- consecutiveErrors += 1
- if (consecutiveErrors >= disableErrorCount) {
- disabledUntil = disableDuration.fromNow
- }
- }
- }
- }
-
- private def throwIfDisabled() {
- synchronized {
- if (Time.now < disabledUntil) {
- throw new SQLException("Server is temporarily disabled")
- }
- }
- }
}
Oops, something went wrong.

0 comments on commit 8931e1a

Please sign in to comment.