Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' into improved_stats

Conflicts:
	project/build.properties
  • Loading branch information...
commit b1e60d9ba048a37d1e47e28fd76ea6e472e47568 2 parents b88a640 + ae05bd7
@freels freels authored
Showing with 647 additions and 111 deletions.
  1. +27 −0 README.markdown
  2. +2 −2 project/build.properties
  3. +15 −14 project/build/QuerulousProject.scala
  4. +1 −1  project/plugins/Plugins.scala
  5. +4 −0 project/release.properties
  6. +22 −0 src/main/scala/com/twitter/querulous/DaemonThreadFactory.scala
  7. +9 −18 src/main/scala/com/twitter/querulous/FutureTimeout.scala
  8. +27 −0 src/main/scala/com/twitter/querulous/async/AsyncDatabase.scala
  9. +112 −0 src/main/scala/com/twitter/querulous/async/AsyncQueryEvaluator.scala
  10. +76 −0 src/main/scala/com/twitter/querulous/async/BlockingDatabaseWrapper.scala
  11. +87 −0 src/main/scala/com/twitter/querulous/async/StandardAsyncQueryEvaluator.scala
  12. +30 −0 src/main/scala/com/twitter/querulous/config/AsyncQueryEvaluator.scala
  13. +1 −1  src/main/scala/com/twitter/querulous/config/Database.scala
  14. +10 −9 src/main/scala/com/twitter/querulous/database/ApachePoolingDatabase.scala
  15. +8 −3 src/main/scala/com/twitter/querulous/database/AutoDisablingDatabase.scala
  16. +32 −14 src/main/scala/com/twitter/querulous/database/Database.scala
  17. +13 −4 src/main/scala/com/twitter/querulous/database/SingleConnectionDatabase.scala
  18. +3 −2 src/main/scala/com/twitter/querulous/database/StatsCollectingDatabase.scala
  19. +8 −8 src/main/scala/com/twitter/querulous/database/ThrottledPoolingDatabase.scala
  20. +18 −9 src/main/scala/com/twitter/querulous/database/TimingOutDatabase.scala
  21. +1 −1  src/main/scala/com/twitter/querulous/evaluator/QueryEvaluator.scala
  22. +1 −1  src/main/scala/com/twitter/querulous/evaluator/Transaction.scala
  23. +1 −1  src/main/scala/com/twitter/querulous/query/TimingOutQuery.scala
  24. +12 −2 src/main/scala/com/twitter/querulous/test/FakeDatabase.scala
  25. +0 −14 src/test/scala/com/twitter/querulous/unit/DatabaseSpec.scala
  26. +2 −2 src/test/scala/com/twitter/querulous/unit/QueryEvaluatorSpec.scala
  27. +116 −0 src/test/scala/com/twitter/querulous/unit/StandardAsyncQueryEvaluatorSpec.scala
  28. +5 −2 src/test/scala/com/twitter/querulous/unit/StatsCollectingDatabaseSpec.scala
  29. +4 −3 src/test/scala/com/twitter/querulous/unit/TimingOutDatabaseSpec.scala
View
27 README.markdown
@@ -122,6 +122,33 @@ Suppose you want to automatically disable all connections to a particular host a
val queryEvaluatorFactory = new AutoDisablingQueryEvaluatorFactory(new StandardQueryEvaluatorFactory(databaseFactory, queryFactory))
+### Async API
+
+Querulous also contains an async API based on
+[`com.twitter.util.Future`](http://github.com/twitter/util). The trait
+`AsyncQueryEvaluator` mirrors `QueryEvaluator` in terms of
+functionality, the key difference being that methods immediately
+return values wrapped in a `Future`. Internally, blocking JDBC calls
+are executed within a thread pool.
+
+ // returns Future[Seq[User]]
+ val future = queryEvaluator.select("SELECT * FROM users WHERE id IN (?) OR name = ?", List(1,2,3), "Jacques") { row =>
+ new User(row.getInt("id"), row.getString("name"))
+ }
+
+ // Futures support a functional, monadic interface:
+ val tweetsFuture = future flatMap { users =>
+ queryEvaluator.select("SELECT * FROM tweets WHERE user_id IN (?)", users.map(_.id)) { row =>
+ new Tweet(row.getInt("id"), row.getString("text"))
+ }
+ }
+
+ // futures only block when unwrapped.
+ val tweets = tweetsFuture.apply()
+
+See [the Future API reference](http://twitter.github.com/util/util-core/target/site/doc/main/api/com/twitter/util/Future.html)
+for more information.
+
### Recommended Configuration Options
* Set minActive equal to maxActive. This ensures that the system is fully utilizing the connection resource even when the system is idle. This is good because you will not be surprised by connection usage (and e.g., unexpectedly hit server-side connection limits) during peak load.
View
4 project/build.properties
@@ -1,8 +1,8 @@
#Project properties
-#Thu May 19 13:06:29 PDT 2011
+#Mon Jun 06 13:52:29 PDT 2011
project.organization=com.twitter
project.name=querulous
sbt.version=0.7.4
-project.version=2.1.6-stats-alpha1-SNAPSHOT
+project.version=2.2.1-stats-alpha2-SNAPSHOT
build.scala.versions=2.8.1
project.initialize=false
View
29 project/build/QuerulousProject.scala
@@ -2,22 +2,23 @@ import sbt._
import Process._
import com.twitter.sbt._
-class QuerulousProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher {
- override def filterScalaJars = false
+class QuerulousProject(info: ProjectInfo) extends StandardLibraryProject(info)
+with DefaultRepos
+with SubversionPublisher {
- val util = "com.twitter" % "util" % "1.8.11"
+ val utilCore = "com.twitter" % "util-core" % "1.8.13"
+ val dbcp = "commons-dbcp" % "commons-dbcp" % "1.4"
+ val mysqljdbc = "mysql" % "mysql-connector-java" % "5.1.13"
+ val pool = "commons-pool" % "commons-pool" % "1.5.4"
- val dbcp = "commons-dbcp" % "commons-dbcp" % "1.4"
- val mysqljdbc = "mysql" % "mysql-connector-java" % "5.1.13"
- val pool = "commons-pool" % "commons-pool" % "1.5.4"
-
- val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1" % "test"
- val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1" % "test"
- val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test"
- val objenesis = "org.objenesis" % "objenesis" % "1.1" % "test"
- val jmock = "org.jmock" % "jmock" % "2.4.0" % "test"
- val cglib = "cglib" % "cglib" % "2.2" % "test"
- val asm = "asm" % "asm" % "1.5.3" % "test"
+ val utilEval = "com.twitter" % "util-eval" % "1.8.13" % "test"
+ val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1" % "test"
+ val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1" % "test"
+ val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test"
+ val objenesis = "org.objenesis" % "objenesis" % "1.1" % "test"
+ val jmock = "org.jmock" % "jmock" % "2.4.0" % "test"
+ val cglib = "cglib" % "cglib" % "2.2" % "test"
+ val asm = "asm" % "asm" % "1.5.3" % "test"
override def subversionRepository = Some("http://svn.local.twitter.com/maven-public/")
}
View
2  project/plugins/Plugins.scala
@@ -2,5 +2,5 @@ import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val twitter = "twitter.com" at "http://maven.twttr.com/"
- val defaultProject = "com.twitter" % "standard-project" % "0.7.17"
+ val standardProject = "com.twitter" % "standard-project" % "0.12.7"
}
View
4 project/release.properties
@@ -0,0 +1,4 @@
+#Automatically generated by ReleaseManagement
+#Mon Jun 06 13:52:29 PDT 2011
+version=2.2.0
+sha1=c8773aac4ff6f4890d8b7dbb59fd68ea60d53118
View
22 src/main/scala/com/twitter/querulous/DaemonThreadFactory.scala
@@ -0,0 +1,22 @@
+package com.twitter.querulous
+
+import java.util.concurrent.{ThreadFactory, TimeoutException => JTimeoutException, _}
+import java.util.concurrent.atomic.AtomicInteger
+import com.twitter.util.Duration
+
+
+class DaemonThreadFactory extends ThreadFactory {
+ val group = new ThreadGroup(Thread.currentThread().getThreadGroup(), "querulous")
+ val threadNumber = new AtomicInteger(1)
+
+ def newThread(r: Runnable) = {
+ val thread = new Thread(group, r, "querulous-" + threadNumber.getAndIncrement())
+ if (!thread.isDaemon) {
+ thread.setDaemon(true)
+ }
+ if (thread.getPriority != Thread.NORM_PRIORITY) {
+ thread.setPriority(Thread.NORM_PRIORITY)
+ }
+ thread
+ }
+}
View
27 src/main/scala/com/twitter/querulous/FutureTimeout.scala
@@ -4,25 +4,16 @@ import java.util.concurrent.{ThreadFactory, TimeoutException => JTimeoutExceptio
import java.util.concurrent.atomic.AtomicInteger
import com.twitter.util.Duration
-class FutureTimeout(poolSize: Int, queueSize: Int) {
- object DaemonThreadFactory extends ThreadFactory {
- val group = new ThreadGroup(Thread.currentThread().getThreadGroup(), "querulous")
- val threadNumber = new AtomicInteger(1)
- def newThread(r: Runnable) = {
- val thread = new Thread(group, r, "querulous-" + threadNumber.getAndIncrement())
- if (!thread.isDaemon) {
- thread.setDaemon(true)
- }
- if (thread.getPriority != Thread.NORM_PRIORITY) {
- thread.setPriority(Thread.NORM_PRIORITY)
- }
- thread
- }
- }
- private val executor = new ThreadPoolExecutor(1, poolSize, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue[Runnable](queueSize),
- DaemonThreadFactory)
+class FutureTimeout(poolSize: Int, queueSize: Int) {
+ private val executor = new ThreadPoolExecutor(
+ 1,
+ poolSize,
+ 60,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable](queueSize),
+ new DaemonThreadFactory()
+ )
class Task[T](f: => T)(onTimeout: T => Unit) extends Callable[T] {
private var cancelled = false
View
27 src/main/scala/com/twitter/querulous/async/AsyncDatabase.scala
@@ -0,0 +1,27 @@
+package com.twitter.querulous.async
+
+import java.sql.Connection
+import com.twitter.util.Future
+
+
+trait AsyncDatabaseFactory {
+ def apply(
+ hosts: List[String],
+ name: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncDatabase
+
+ def apply(hosts: List[String], name: String, username: String, password: String): AsyncDatabase = {
+ apply(hosts, name, username, password, Map.empty)
+ }
+
+ def apply(hosts: List[String], username: String, password: String): AsyncDatabase = {
+ apply(hosts, null, username, password, Map.empty)
+ }
+}
+
+trait AsyncDatabase {
+ def withConnection[R](f: Connection => R): Future[R]
+}
View
112 src/main/scala/com/twitter/querulous/async/AsyncQueryEvaluator.scala
@@ -0,0 +1,112 @@
+package com.twitter.querulous.async
+
+import java.util.concurrent.Executors
+import java.sql.ResultSet
+import com.twitter.util.{Future, FuturePool}
+import com.twitter.querulous.config.{Connection => ConnectionConfig}
+import com.twitter.querulous.DaemonThreadFactory
+import com.twitter.querulous.evaluator._
+import com.twitter.querulous.query.{QueryClass, SqlQueryFactory}
+import com.twitter.querulous.database.ThrottledPoolingDatabaseFactory
+import com.twitter.conversions.time._
+
+
+object AsyncQueryEvaluator extends AsyncQueryEvaluatorFactory {
+ lazy val defaultFuturePool = FuturePool(Executors.newCachedThreadPool(new DaemonThreadFactory))
+
+ private def createEvaluatorFactory() = {
+ new StandardAsyncQueryEvaluatorFactory(
+ new BlockingDatabaseWrapperFactory(
+ defaultFuturePool,
+ new ThrottledPoolingDatabaseFactory(10, 100.millis, 10.seconds, 1.second)
+ ),
+ new SqlQueryFactory
+ )
+ }
+
+ def apply(
+ dbhosts: List[String],
+ dbname: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncQueryEvaluator = {
+ createEvaluatorFactory()(dbhosts, dbname, username, password, urlOptions)
+ }
+}
+
+trait AsyncQueryEvaluatorFactory {
+ def apply(
+ dbhosts: List[String],
+ dbname: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncQueryEvaluator
+
+ def apply(dbhost: String, dbname: String, username: String, password: String, urlOptions: Map[String, String]): AsyncQueryEvaluator = {
+ apply(List(dbhost), dbname, username, password, urlOptions)
+ }
+
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String): AsyncQueryEvaluator = {
+ apply(dbhosts, dbname, username, password, Map[String,String]())
+ }
+
+ def apply(dbhost: String, dbname: String, username: String, password: String): AsyncQueryEvaluator = {
+ apply(List(dbhost), dbname, username, password, Map[String,String]())
+ }
+
+ def apply(dbhost: String, username: String, password: String): AsyncQueryEvaluator = {
+ apply(List(dbhost), null, username, password, Map[String,String]())
+ }
+
+ def apply(dbhosts: List[String], username: String, password: String): AsyncQueryEvaluator = {
+ apply(dbhosts, null, username, password, Map[String,String]())
+ }
+
+ def apply(connection: ConnectionConfig): AsyncQueryEvaluator = {
+ apply(connection.hostnames.toList, connection.database, connection.username, connection.password, connection.urlOptions)
+ }
+}
+
+trait AsyncQueryEvaluator {
+ def select[A](queryClass: QueryClass, query: String, params: Any*)(f: ResultSet => A): Future[Seq[A]]
+
+ def select[A](query: String, params: Any*)(f: ResultSet => A): Future[Seq[A]] = {
+ select(QueryClass.Select, query, params: _*)(f)
+ }
+
+ def selectOne[A](queryClass: QueryClass, query: String, params: Any*)(f: ResultSet => A): Future[Option[A]]
+
+ def selectOne[A](query: String, params: Any*)(f: ResultSet => A): Future[Option[A]] = {
+ selectOne(QueryClass.Select, query, params: _*)(f)
+ }
+
+ def count(queryClass: QueryClass, query: String, params: Any*): Future[Int]
+
+ def count(query: String, params: Any*): Future[Int] = {
+ count(QueryClass.Select, query, params: _*)
+ }
+
+ def execute(queryClass: QueryClass, query: String, params: Any*): Future[Int]
+
+ def execute(query: String, params: Any*): Future[Int] = {
+ execute(QueryClass.Execute, query, params: _*)
+ }
+
+ def executeBatch(queryClass: QueryClass, query: String)(f: ParamsApplier => Unit): Future[Int]
+
+ def executeBatch(query: String)(f: ParamsApplier => Unit): Future[Int] = {
+ executeBatch(QueryClass.Execute, query)(f)
+ }
+
+ def nextId(tableName: String): Future[Long]
+
+ def insert(queryClass: QueryClass, query: String, params: Any*): Future[Long]
+
+ def insert(query: String, params: Any*): Future[Long] = {
+ insert(QueryClass.Execute, query, params: _*)
+ }
+
+ def transaction[T](f: Transaction => T): Future[T]
+}
View
76 src/main/scala/com/twitter/querulous/async/BlockingDatabaseWrapper.scala
@@ -0,0 +1,76 @@
+package com.twitter.querulous.async
+
+import java.util.concurrent.Executors
+import java.sql.Connection
+import com.twitter.util.{Return, Throw, Future, Promise, FuturePool, JavaTimer, TimeoutException}
+import com.twitter.querulous.DaemonThreadFactory
+import com.twitter.querulous.database.{Database, DatabaseFactory}
+
+
+class BlockingDatabaseWrapperFactory(pool: => FuturePool, factory: DatabaseFactory)
+extends AsyncDatabaseFactory {
+ def apply(
+ hosts: List[String],
+ name: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncDatabase = {
+ new BlockingDatabaseWrapper(
+ pool,
+ factory(hosts, name, username, password, urlOptions)
+ )
+ }
+}
+
+private object AsyncConnectionCheckout {
+ lazy val checkoutTimer = new JavaTimer(true)
+}
+
+class BlockingDatabaseWrapper(
+ pool: FuturePool,
+ protected[async] val database: Database)
+extends AsyncDatabase {
+
+ import AsyncConnectionCheckout._
+
+ // XXX: this probably should be configurable as well.
+ private val checkoutPool = FuturePool(Executors.newSingleThreadExecutor(new DaemonThreadFactory))
+ private val openTimeout = database.openTimeout
+
+ def withConnection[R](f: Connection => R) = {
+ checkoutConnection() flatMap { conn =>
+ pool {
+ try {
+ f(conn)
+ } finally {
+ database.close(conn)
+ }
+ }
+ }
+ }
+
+ private def checkoutConnection(): Future[Connection] = {
+ val promise = new Promise[Connection]
+
+ checkoutPool(database.open()) respond { rv =>
+ // if the promise has already timed out, we need to close the connection here.
+ if (!promise.updateIfEmpty(rv)) rv foreach database.close
+ }
+
+ checkoutTimer.schedule(openTimeout.fromNow) {
+ promise.updateIfEmpty(Throw(new TimeoutException(openTimeout.toString)))
+ }
+
+ promise
+ }
+
+ // equality overrides
+
+ override def equals(other: Any) = other match {
+ case other: BlockingDatabaseWrapper => database eq other.database
+ case _ => false
+ }
+
+ override def hashCode = database.hashCode
+}
View
87 src/main/scala/com/twitter/querulous/async/StandardAsyncQueryEvaluator.scala
@@ -0,0 +1,87 @@
+package com.twitter.querulous.async
+
+import java.sql.ResultSet
+import com.twitter.util.Future
+import com.twitter.querulous.query.{QueryClass, QueryFactory}
+import com.twitter.querulous.evaluator.{Transaction, ParamsApplier}
+
+
+class StandardAsyncQueryEvaluatorFactory(
+ databaseFactory: AsyncDatabaseFactory,
+ queryFactory: QueryFactory)
+extends AsyncQueryEvaluatorFactory {
+ def apply(
+ hosts: List[String],
+ name: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncQueryEvaluator = {
+ new StandardAsyncQueryEvaluator(
+ databaseFactory(hosts, name, username, password, urlOptions),
+ queryFactory
+ )
+ }
+}
+
+class StandardAsyncQueryEvaluator(val database: AsyncDatabase, queryFactory: QueryFactory)
+extends AsyncQueryEvaluator {
+ def select[A](queryClass: QueryClass, query: String, params: Any*)(f: ResultSet => A) = {
+ withTransaction(_.select(queryClass, query, params: _*)(f))
+ }
+
+ def selectOne[A](queryClass: QueryClass, query: String, params: Any*)(f: ResultSet => A) = {
+ withTransaction(_.selectOne(queryClass, query, params: _*)(f))
+ }
+
+ def count(queryClass: QueryClass, query: String, params: Any*) = {
+ withTransaction(_.count(queryClass, query, params: _*))
+ }
+
+ def execute(queryClass: QueryClass, query: String, params: Any*) = {
+ withTransaction(_.execute(queryClass, query, params: _*))
+ }
+
+ def executeBatch(queryClass: QueryClass, query: String)(f: ParamsApplier => Unit) = {
+ withTransaction(_.executeBatch(queryClass, query)(f))
+ }
+
+ def nextId(tableName: String) = {
+ withTransaction(_.nextId(tableName))
+ }
+
+ def insert(queryClass: QueryClass, query: String, params: Any*) = {
+ withTransaction(_.insert(queryClass, query, params: _*))
+ }
+
+ def transaction[T](f: Transaction => T) = {
+ withTransaction { transaction =>
+ transaction.begin()
+ try {
+ val rv = f(transaction)
+ transaction.commit()
+ rv
+ } catch {
+ case e: Throwable => {
+ try {
+ transaction.rollback()
+ } catch { case _ => () }
+ throw e
+ }
+ }
+ }
+ }
+
+ private def withTransaction[R](f: Transaction => R): Future[R] = {
+ database.withConnection { c => f(new Transaction(queryFactory, c)) }
+ }
+
+ // equality overrides
+
+ override def equals(other: Any) = other match {
+ case other: StandardAsyncQueryEvaluator => database eq other.database
+ case _ => false
+ }
+
+ override def hashCode = database.hashCode
+}
View
30 src/main/scala/com/twitter/querulous/config/AsyncQueryEvaluator.scala
@@ -0,0 +1,30 @@
+package com.twitter.querulous.config
+
+import com.twitter.util
+import com.twitter.querulous
+import com.twitter.querulous.async
+
+trait FuturePool {
+ def apply(): util.FuturePool
+}
+
+object DefaultFuturePool extends FuturePool {
+ def apply() = async.AsyncQueryEvaluator.defaultFuturePool
+}
+
+class AsyncQueryEvaluator {
+ var futurePool: FuturePool = DefaultFuturePool
+ var database: Database = new Database
+ var query: Query = new Query
+
+ def apply(stats: querulous.StatsCollector): async.AsyncQueryEvaluatorFactory = {
+ val db = new async.BlockingDatabaseWrapperFactory(
+ futurePool(),
+ database(stats)
+ )
+
+ new async.StandardAsyncQueryEvaluatorFactory(db, query(stats))
+ }
+
+ def apply(): async.AsyncQueryEvaluatorFactory = apply(querulous.NullStatsCollector)
+}
View
2  src/main/scala/com/twitter/querulous/config/Database.scala
@@ -42,7 +42,7 @@ class TimingOutDatabase {
var open: Duration = 1.second
def apply(factory: DatabaseFactory) = {
- new TimingOutDatabaseFactory(factory, poolSize, queueSize, open, poolSize)
+ new TimingOutDatabaseFactory(factory, poolSize, queueSize, open)
}
}
View
19 src/main/scala/com/twitter/querulous/database/ApachePoolingDatabase.scala
@@ -43,17 +43,18 @@ class ApachePoolingDatabaseFactory(
}
class ApachePoolingDatabase(
- dbhosts: List[String],
- dbname: String,
- username: String,
+ val hosts: List[String],
+ val name: String,
+ val username: String,
password: String,
- urlOptions: Map[String, String],
+ val extraUrlOptions: Map[String, String],
minOpenConnections: Int,
maxOpenConnections: Int,
checkConnectionHealthWhenIdleFor: Duration,
- maxWaitForConnectionReservation: Duration,
+ val openTimeout: Duration,
checkConnectionHealthOnReservation: Boolean,
- evictConnectionIfIdleFor: Duration) extends Database {
+ evictConnectionIfIdleFor: Duration)
+extends Database {
Class.forName("com.mysql.jdbc.Driver")
@@ -61,7 +62,7 @@ class ApachePoolingDatabase(
config.maxActive = maxOpenConnections
config.maxIdle = maxOpenConnections
config.minIdle = minOpenConnections
- config.maxWait = maxWaitForConnectionReservation.inMillis
+ config.maxWait = openTimeout.inMillis
config.timeBetweenEvictionRunsMillis = checkConnectionHealthWhenIdleFor.inMillis
config.testWhileIdle = false
@@ -71,7 +72,7 @@ class ApachePoolingDatabase(
config.lifo = false
private val connectionPool = new GenericObjectPool(null, config)
- private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname, urlOptions), username, password)
+ private val connectionFactory = new DriverManagerConnectionFactory(url(hosts, name, urlOptions), username, password)
private val poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory,
connectionPool,
@@ -92,5 +93,5 @@ class ApachePoolingDatabase(
def open() = poolingDataSource.getConnection()
- override def toString = dbhosts.head + "_" + dbname
+ override def toString = hosts.head + "_" + name
}
View
11 src/main/scala/com/twitter/querulous/database/AutoDisablingDatabase.scala
@@ -10,15 +10,20 @@ class AutoDisablingDatabaseFactory(val databaseFactory: DatabaseFactory, val dis
def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]) = {
new AutoDisablingDatabase(
databaseFactory(dbhosts, dbname, username, password, urlOptions),
- dbhosts.head,
disableErrorCount,
disableDuration)
}
}
-class AutoDisablingDatabase(database: Database, dbhost: String, protected val disableErrorCount: Int, protected val disableDuration: Duration) extends Database with AutoDisabler {
+class AutoDisablingDatabase(
+ val database: Database,
+ protected val disableErrorCount: Int,
+ protected val disableDuration: Duration)
+extends Database
+with DatabaseProxy
+with AutoDisabler {
def open() = {
- throwIfDisabled(dbhost)
+ throwIfDisabled(database.hosts.head)
try {
val rv = database.open()
noteOperationOutcome(true)
View
46 src/main/scala/com/twitter/querulous/database/Database.scala
@@ -1,9 +1,17 @@
package com.twitter.querulous.database
-import com.twitter.querulous._
import java.sql.Connection
import com.twitter.querulous.StatsCollector
-import com.twitter.util.TimeConversions._
+import com.twitter.util.Duration
+
+
+object Database {
+ val defaultUrlOptions = Map(
+ "useUnicode" -> "true",
+ "characterEncoding" -> "UTF-8",
+ "connectTimeout" -> "500"
+ )
+}
trait DatabaseFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String]): Database
@@ -15,7 +23,25 @@ trait DatabaseFactory {
apply(dbhosts, null, username, password, Map.empty)
}
+trait DatabaseProxy extends Database {
+ def database: Database
+
+ def hosts = database.hosts
+ def name = database.name
+ def username = database.username
+ def extraUrlOptions = database.extraUrlOptions
+ def openTimeout = database.openTimeout
+}
+
trait Database {
+ def hosts: List[String]
+ def name: String
+ def username: String
+ def extraUrlOptions: Map[String, String]
+ def openTimeout: Duration
+
+ def urlOptions = Database.defaultUrlOptions ++ extraUrlOptions
+
def open(): Connection
def close(connection: Connection)
@@ -29,18 +55,10 @@ trait Database {
}
}
- val defaultUrlOptions = Map(
- "useUnicode" -> "true",
- "characterEncoding" -> "UTF-8",
- "connectTimeout" -> "500"
- )
-
- protected def url(dbhosts: List[String], dbname: String, urlOptions: Map[String, String]) = {
- val dbnameSegment = if (dbname == null) "" else ("/" + dbname)
-
- val finalUrlOpts = defaultUrlOptions ++ urlOptions
- val urlOptsSegment = finalUrlOpts.map(Function.tupled((k, v) => k+"="+v )).mkString("&")
+ protected def url(hosts: List[String], name: String, urlOptions: Map[String, String]) = {
+ val nameSegment = if (name == null) "" else ("/" + name)
+ val urlOptsSegment = urlOptions.map(Function.tupled((k, v) => k+"="+v )).mkString("&")
- "jdbc:mysql://" + dbhosts.mkString(",") + dbnameSegment + "?" + urlOptsSegment
+ "jdbc:mysql://" + hosts.mkString(",") + nameSegment + "?" + urlOptsSegment
}
}
View
17 src/main/scala/com/twitter/querulous/database/SingleConnectionDatabase.scala
@@ -2,6 +2,7 @@ package com.twitter.querulous.database
import org.apache.commons.dbcp.DriverManagerConnectionFactory
import java.sql.{SQLException, Connection}
+import com.twitter.conversions.time._
class SingleConnectionDatabaseFactory(defaultUrlOptions: Map[String, String]) extends DatabaseFactory {
@@ -19,10 +20,18 @@ class SingleConnectionDatabaseFactory(defaultUrlOptions: Map[String, String]) ex
}
}
-class SingleConnectionDatabase(dbhosts: List[String], dbname: String, username: String, password: String, urlOptions: Map[String, String])
- extends Database {
+class SingleConnectionDatabase(
+ val hosts: List[String],
+ val name: String,
+ val username: String,
+ password: String,
+ val extraUrlOptions: Map[String, String])
+extends Database {
Class.forName("com.mysql.jdbc.Driver")
- private val connectionFactory = new DriverManagerConnectionFactory(url(dbhosts, dbname, urlOptions), username, password)
+
+ val openTimeout = urlOptions("connectTimeout").toInt.millis
+
+ private val connectionFactory = new DriverManagerConnectionFactory(url(hosts, name, urlOptions), username, password)
def close(connection: Connection) {
try {
@@ -33,5 +42,5 @@ class SingleConnectionDatabase(dbhosts: List[String], dbname: String, username:
}
def open() = connectionFactory.createConnection()
- override def toString = dbhosts.head + "_" + dbname
+ override def toString = hosts.head + "_" + name
}
View
5 src/main/scala/com/twitter/querulous/database/StatsCollectingDatabase.scala
@@ -12,8 +12,9 @@ class StatsCollectingDatabaseFactory(
}
}
-class StatsCollectingDatabase(database: Database, stats: StatsCollector)
- extends Database {
+class StatsCollectingDatabase(val database: Database, stats: StatsCollector)
+extends Database
+with DatabaseProxy {
override def open(): Connection = {
stats.time("db-open-timing") {
View
16 src/main/scala/com/twitter/querulous/database/ThrottledPoolingDatabase.scala
@@ -153,13 +153,13 @@ class ThrottledPoolingDatabaseFactory(
}
class ThrottledPoolingDatabase(
- dbhosts: List[String],
- dbname: String,
- username: String,
+ val hosts: List[String],
+ val name: String,
+ val username: String,
password: String,
- urlOptions: Map[String, String],
+ val extraUrlOptions: Map[String, String],
numConnections: Int,
- openTimeout: Duration,
+ val openTimeout: Duration,
idleTimeout: Duration,
repopulateInterval: Duration) extends Database {
@@ -169,7 +169,7 @@ class ThrottledPoolingDatabase(
private val poolingDataSource = new PoolingDataSource(pool)
poolingDataSource.setAccessToUnderlyingConnectionAllowed(true)
private val watchdogTask = new PoolWatchdog(pool)
- private val watchdog = new Timer(dbhosts.mkString(",") + "-pool-watchdog", true)
+ private val watchdog = new Timer(hosts.mkString(",") + "-pool-watchdog", true)
watchdog.scheduleAtFixedRate(watchdogTask, 0, repopulateInterval.inMillis)
def open() = {
@@ -177,7 +177,7 @@ class ThrottledPoolingDatabase(
poolingDataSource.getConnection()
} catch {
case e: PoolTimeoutException =>
- throw new SqlDatabaseTimeoutException(dbhosts.mkString(",") + "/" + dbname, openTimeout)
+ throw new SqlDatabaseTimeoutException(hosts.mkString(",") + "/" + name, openTimeout)
}
}
@@ -186,6 +186,6 @@ class ThrottledPoolingDatabase(
}
protected def mkConnection(): Connection = {
- DriverManager.getConnection(url(dbhosts, dbname, urlOptions), username, password)
+ DriverManager.getConnection(url(hosts, name, urlOptions), username, password)
}
}
View
27 src/main/scala/com/twitter/querulous/database/TimingOutDatabase.scala
@@ -12,24 +12,33 @@ class TimingOutDatabaseFactory(
val databaseFactory: DatabaseFactory,
val poolSize: Int,
val queueSize: Int,
- val openTimeout: Duration,
- val maxConnections: Int)
+ val openTimeout: Duration)
extends DatabaseFactory {
private def newTimeoutPool() = new FutureTimeout(poolSize, queueSize)
def apply(dbhosts: List[String], dbname: String, username: String, password: String,
urlOptions: Map[String, String]) = {
- val dbLabel = if (dbname != null) dbname else "(null)"
- new TimingOutDatabase(databaseFactory(dbhosts, dbname, username, password, urlOptions),
- dbhosts, dbLabel, newTimeoutPool(), openTimeout, maxConnections)
+ new TimingOutDatabase(
+ databaseFactory(dbhosts, dbname, username, password, urlOptions),
+ newTimeoutPool(),
+ openTimeout
+ )
}
}
-class TimingOutDatabase(database: Database, dbhosts: List[String], dbname: String,
- timeout: FutureTimeout, openTimeout: Duration,
- maxConnections: Int) extends Database {
+class TimingOutDatabase(
+ val database: Database,
+ timeout: FutureTimeout,
+ openTimeout: Duration)
+extends Database
+with DatabaseProxy {
+ val label = database.name match {
+ case null => database.hosts.mkString(",") +"/ (null)"
+ case name => database.hosts.mkString(",") +"/"+ name
+ }
+
private def getConnection(wait: Duration) = {
try {
timeout(wait) {
@@ -39,7 +48,7 @@ class TimingOutDatabase(database: Database, dbhosts: List[String], dbname: Strin
}
} catch {
case e: TimeoutException =>
- throw new SqlDatabaseTimeoutException(dbhosts.mkString(",") + "/" + dbname, wait)
+ throw new SqlDatabaseTimeoutException(label, wait)
}
}
View
2  src/main/scala/com/twitter/querulous/evaluator/QueryEvaluator.scala
@@ -49,7 +49,7 @@ trait QueryEvaluatorFactory {
}
class ParamsApplier(query: Query) {
- def apply(params: Any*) = query.addParams(params)
+ def apply(params: Any*) = query.addParams(params: _*)
}
trait QueryEvaluator {
View
2  src/main/scala/com/twitter/querulous/evaluator/Transaction.scala
@@ -23,7 +23,7 @@ class Transaction(queryFactory: QueryFactory, connection: Connection) extends Qu
def executeBatch(queryClass: QueryClass, queryString: String)(f: ParamsApplier => Unit) = {
val query = queryFactory(connection, queryClass, queryString)
f(new ParamsApplier(query))
- query.execute
+ query.execute()
}
def nextId(tableName: String) = {
View
2  src/main/scala/com/twitter/querulous/query/TimingOutQuery.scala
@@ -40,7 +40,7 @@ class PerQueryTimingOutQueryFactory(queryFactory: QueryFactory, val timeouts: Ma
}
private object QueryCancellation {
- val cancelTimer = new java.util.Timer("global query cancellation timer", true)
+ lazy val cancelTimer = new java.util.Timer("global query cancellation timer", true)
}
/**
View
14 src/main/scala/com/twitter/querulous/test/FakeDatabase.scala
@@ -2,10 +2,20 @@ package com.twitter.querulous.test
import java.sql.Connection
import com.twitter.util.{Duration, Time}
-import com.twitter.util.TimeConversions._
+import com.twitter.conversions.time._
import com.twitter.querulous.database.Database
-class FakeDatabase(connection: Connection, before: Option[String => Unit]) extends Database {
+trait FakeDatabase extends Database {
+ val hosts = List("fakehost")
+ val name = "fake"
+ val username = "fakeuser"
+ val openTimeout = 500.millis
+ val extraUrlOptions: Map[String,String] = Map.empty
+}
+
+class FakeDBConnectionWrapper(connection: Connection, before: Option[String => Unit])
+extends Database
+with FakeDatabase {
def this(connection: Connection) = this(connection, None)
def this(connection: Connection, before: String => Unit) = this(connection, Some(before))
View
14 src/test/scala/com/twitter/querulous/unit/DatabaseSpec.scala
@@ -54,18 +54,4 @@ class DatabaseSpec extends ConfiguredSpecification with JMocker with ClassMocker
testFactory(factory)
}
-
- "Database#url" should {
- val fake = new Object with Database {
- def open() = null
- def close(connection: Connection) = ()
- }.asInstanceOf[{def url(a: List[String], b:String, c:Map[String, String]): String}]
-
- "add default unicode urlOptions" in {
- val url = fake.url(List("host"), "db", Map())
-
- url mustMatch "useUnicode=true"
- url mustMatch "characterEncoding=UTF-8"
- }
- }
}
View
4 src/test/scala/com/twitter/querulous/unit/QueryEvaluatorSpec.scala
@@ -7,7 +7,7 @@ import com.twitter.querulous.{StatsCollector, TestEvaluator}
import com.twitter.querulous.database.{ApachePoolingDatabaseFactory, MemoizingDatabaseFactory, Database}
import com.twitter.querulous.evaluator.{StandardQueryEvaluator, StandardQueryEvaluatorFactory, QueryEvaluator}
import com.twitter.querulous.query._
-import com.twitter.querulous.test.FakeDatabase
+import com.twitter.querulous.test.FakeDBConnectionWrapper
import com.twitter.querulous.ConfiguredSpecification
import com.twitter.util.Time
import com.twitter.util.TimeConversions._
@@ -32,7 +32,7 @@ class QueryEvaluatorSpec extends ConfiguredSpecification with JMocker with Class
"connection pooling" in {
val connection = mock[Connection]
- val database = new FakeDatabase(connection)
+ val database = new FakeDBConnectionWrapper(connection)
"transactionally" >> {
val queryEvaluator = new StandardQueryEvaluator(database, queryFactory)
View
116 src/test/scala/com/twitter/querulous/unit/StandardAsyncQueryEvaluatorSpec.scala
@@ -0,0 +1,116 @@
+package com.twitter.querulous.unit
+
+import org.specs.Specification
+import org.specs.mock.{ClassMocker, JMocker}
+import java.sql.{Connection, ResultSet}
+import com.twitter.conversions.time._
+import com.twitter.querulous.TestEvaluator
+import com.twitter.querulous.evaluator.{QueryEvaluator, ParamsApplier, Transaction}
+import com.twitter.querulous.database.Database
+import com.twitter.querulous.query._
+import com.twitter.querulous.async._
+
+
+class StandardAsyncQueryEvaluatorSpec extends Specification with JMocker with ClassMocker {
+
+ val futurePool = AsyncQueryEvaluator.defaultFuturePool
+
+ val database = mock[Database]
+ val connection = mock[Connection]
+ val query = mock[Query]
+ val queryFactory = mock[QueryFactory]
+
+ def newEvaluator() = {
+ new StandardAsyncQueryEvaluator(
+ new BlockingDatabaseWrapper(futurePool, database),
+ queryFactory
+ )
+ }
+
+ // operator functions. Declared here so that identity equality works for expectations
+ val fromRow = (r: ResultSet) => r.getInt("1")
+
+ "BlockingEvaluatorWrapper" should {
+ "select" in {
+ expect {
+ one(database).openTimeout willReturn 500.millis
+ one(database).open() willReturn connection
+ one(queryFactory).apply(connection, QueryClass.Select, "SELECT 1") willReturn query
+ one(query).select(fromRow) willReturn Seq(1)
+ one(database).close(connection)
+ }
+
+ newEvaluator().select("SELECT 1")(fromRow).get()
+ }
+
+ "selectOne" in {
+ expect {
+ one(database).openTimeout willReturn 500.millis
+ one(database).open() willReturn connection
+ one(queryFactory).apply(connection, QueryClass.Select, "SELECT 1") willReturn query
+ one(query).select(fromRow) willReturn Seq(1)
+ one(database).close(connection)
+ }
+
+ newEvaluator().selectOne("SELECT 1")(fromRow).get()
+ }
+
+ "count" in {
+ expect {
+ one(database).openTimeout willReturn 500.millis
+ one(database).open() willReturn connection
+ one(queryFactory).apply(connection, QueryClass.Select, "SELECT 1") willReturn query
+ one(query).select(any[ResultSet => Int]) willReturn Seq(1)
+ one(database).close(connection)
+ }
+
+ newEvaluator().count("SELECT 1").get()
+ }
+
+ "execute" in {
+ val sql = "INSERT INTO foo (id) VALUES (1)"
+
+ expect {
+ one(database).openTimeout willReturn 500.millis
+ one(database).open() willReturn connection
+ one(queryFactory).apply(connection, QueryClass.Execute, sql) willReturn query
+ one(query).execute() willReturn 1
+ one(database).close(connection)
+ }
+
+ newEvaluator().execute("INSERT INTO foo (id) VALUES (1)").get()
+ }
+
+ "executeBatch" in {
+ val sql = "INSERT INTO foo (id) VALUES (?)"
+
+ expect {
+ one(database).openTimeout willReturn 500.millis
+ one(database).open() willReturn connection
+ one(queryFactory).apply(connection, QueryClass.Execute, sql) willReturn query
+ one(query).addParams(1)
+ one(query).execute() willReturn 1
+ one(database).close(connection)
+ }
+
+ newEvaluator().executeBatch("INSERT INTO foo (id) VALUES (?)")(_(1)).get()
+ }
+
+ "transaction" in {
+ val sql = "INSERT INTO foo (id) VALUES (1)"
+
+ expect {
+ one(database).openTimeout willReturn 500.millis
+ one(database).open() willReturn connection
+ one(connection).setAutoCommit(false)
+ one(queryFactory).apply(connection, QueryClass.Execute, sql) willReturn query
+ one(query).execute() willReturn 1
+ one(connection).commit()
+ one(connection).setAutoCommit(true)
+ one(database).close(connection)
+ }
+
+ newEvaluator().transaction(_.execute("INSERT INTO foo (id) VALUES (1)")).get()
+ }
+ }
+}
View
7 src/test/scala/com/twitter/querulous/unit/StatsCollectingDatabaseSpec.scala
@@ -5,7 +5,7 @@ import java.sql.Connection
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
import com.twitter.querulous.database.{SqlDatabaseTimeoutException, StatsCollectingDatabase}
-import com.twitter.querulous.test.{FakeStatsCollector, FakeDatabase}
+import com.twitter.querulous.test.{FakeStatsCollector, FakeDBConnectionWrapper}
import com.twitter.util.Time
import com.twitter.util.TimeConversions._
@@ -15,7 +15,10 @@ class StatsCollectingDatabaseSpec extends Specification with JMocker with ClassM
val latency = 1.second
val connection = mock[Connection]
val stats = new FakeStatsCollector
- def pool(callback: String => Unit) = new StatsCollectingDatabase(new FakeDatabase(connection, callback), stats)
+ def pool(callback: String => Unit) = new StatsCollectingDatabase(
+ new FakeDBConnectionWrapper(connection, callback),
+ stats
+ )
"collect stats" in {
"when closing" >> {
View
7 src/test/scala/com/twitter/querulous/unit/TimingOutDatabaseSpec.scala
@@ -4,7 +4,8 @@ import com.twitter.querulous._
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.sql.Connection
import com.twitter.querulous.TimeoutException
-import com.twitter.querulous.database.{SqlDatabaseTimeoutException, TimingOutDatabase, Database}
+import com.twitter.querulous.database.{SqlDatabaseTimeoutException, TimingOutDatabase}
+import com.twitter.querulous.test.FakeDatabase
import com.twitter.util.Time
import com.twitter.util.TimeConversions._
import org.specs.Specification
@@ -18,7 +19,7 @@ class TimingOutDatabaseSpec extends Specification with JMocker with ClassMocker
var shouldWait = false
val connection = mock[Connection]
val future = new FutureTimeout(1, 1)
- val database = new Database {
+ val database = new FakeDatabase {
def open() = {
if (shouldWait) latch.await(100.seconds.inMillis, TimeUnit.MILLISECONDS)
connection
@@ -30,7 +31,7 @@ class TimingOutDatabaseSpec extends Specification with JMocker with ClassMocker
// one(connection).close()
}
- val timingOutDatabase = new TimingOutDatabase(database, List("dbhost"), "dbname", future, timeout, 1)
+ val timingOutDatabase = new TimingOutDatabase(database, future, timeout)
shouldWait = true
"timeout" in {
Please sign in to comment.
Something went wrong with that request. Please try again.