Permalink
Browse files

Merge branch 'master' into improved_stats

Conflicts:
	project/build.properties
  • Loading branch information...
2 parents b88a640 + ae05bd7 commit b1e60d9ba048a37d1e47e28fd76ea6e472e47568 @freels freels committed Jun 28, 2011
Showing with 647 additions and 111 deletions.
  1. +27 −0 README.markdown
  2. +2 −2 project/build.properties
  3. +15 −14 project/build/QuerulousProject.scala
  4. +1 −1 project/plugins/Plugins.scala
  5. +4 −0 project/release.properties
  6. +22 −0 src/main/scala/com/twitter/querulous/DaemonThreadFactory.scala
  7. +9 −18 src/main/scala/com/twitter/querulous/FutureTimeout.scala
  8. +27 −0 src/main/scala/com/twitter/querulous/async/AsyncDatabase.scala
  9. +112 −0 src/main/scala/com/twitter/querulous/async/AsyncQueryEvaluator.scala
  10. +76 −0 src/main/scala/com/twitter/querulous/async/BlockingDatabaseWrapper.scala
  11. +87 −0 src/main/scala/com/twitter/querulous/async/StandardAsyncQueryEvaluator.scala
  12. +30 −0 src/main/scala/com/twitter/querulous/config/AsyncQueryEvaluator.scala
  13. +1 −1 src/main/scala/com/twitter/querulous/config/Database.scala
  14. +10 −9 src/main/scala/com/twitter/querulous/database/ApachePoolingDatabase.scala
  15. +8 −3 src/main/scala/com/twitter/querulous/database/AutoDisablingDatabase.scala
  16. +32 −14 src/main/scala/com/twitter/querulous/database/Database.scala
  17. +13 −4 src/main/scala/com/twitter/querulous/database/SingleConnectionDatabase.scala
  18. +3 −2 src/main/scala/com/twitter/querulous/database/StatsCollectingDatabase.scala
  19. +8 −8 src/main/scala/com/twitter/querulous/database/ThrottledPoolingDatabase.scala
  20. +18 −9 src/main/scala/com/twitter/querulous/database/TimingOutDatabase.scala
  21. +1 −1 src/main/scala/com/twitter/querulous/evaluator/QueryEvaluator.scala
  22. +1 −1 src/main/scala/com/twitter/querulous/evaluator/Transaction.scala
  23. +1 −1 src/main/scala/com/twitter/querulous/query/TimingOutQuery.scala
  24. +12 −2 src/main/scala/com/twitter/querulous/test/FakeDatabase.scala
  25. +0 −14 src/test/scala/com/twitter/querulous/unit/DatabaseSpec.scala
  26. +2 −2 src/test/scala/com/twitter/querulous/unit/QueryEvaluatorSpec.scala
  27. +116 −0 src/test/scala/com/twitter/querulous/unit/StandardAsyncQueryEvaluatorSpec.scala
  28. +5 −2 src/test/scala/com/twitter/querulous/unit/StatsCollectingDatabaseSpec.scala
  29. +4 −3 src/test/scala/com/twitter/querulous/unit/TimingOutDatabaseSpec.scala
View
@@ -122,6 +122,33 @@ Suppose you want to automatically disable all connections to a particular host a
val queryEvaluatorFactory = new AutoDisablingQueryEvaluatorFactory(new StandardQueryEvaluatorFactory(databaseFactory, queryFactory))
+### Async API
+
+Querulous also contains an async API based on
+[`com.twitter.util.Future`](http://github.com/twitter/util). The trait
+`AsyncQueryEvaluator` mirrors `QueryEvaluator` in terms of
+functionality, the key difference being that methods immediately
+return values wrapped in a `Future`. Internally, blocking JDBC calls
+are executed within a thread pool.
+
+ // returns Future[Seq[User]]
+ val future = queryEvaluator.select("SELECT * FROM users WHERE id IN (?) OR name = ?", List(1,2,3), "Jacques") { row =>
+ new User(row.getInt("id"), row.getString("name"))
+ }
+
+ // Futures support a functional, monadic interface:
+ val tweetsFuture = future flatMap { users =>
+ queryEvaluator.select("SELECT * FROM tweets WHERE user_id IN (?)", users.map(_.id)) { row =>
+ new Tweet(row.getInt("id"), row.getString("text"))
+ }
+ }
+
+ // futures only block when unwrapped.
+ val tweets = tweetsFuture.apply()
+
+See [the Future API reference](http://twitter.github.com/util/util-core/target/site/doc/main/api/com/twitter/util/Future.html)
+for more information.
+
### Recommended Configuration Options
* Set minActive equal to maxActive. This ensures that the system is fully utilizing the connection resource even when the system is idle. This is good because you will not be surprised by connection usage (and e.g., unexpectedly hit server-side connection limits) during peak load.
View
@@ -1,8 +1,8 @@
#Project properties
-#Thu May 19 13:06:29 PDT 2011
+#Mon Jun 06 13:52:29 PDT 2011
project.organization=com.twitter
project.name=querulous
sbt.version=0.7.4
-project.version=2.1.6-stats-alpha1-SNAPSHOT
+project.version=2.2.1-stats-alpha2-SNAPSHOT
build.scala.versions=2.8.1
project.initialize=false
@@ -2,22 +2,23 @@ import sbt._
import Process._
import com.twitter.sbt._
-class QuerulousProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher {
- override def filterScalaJars = false
+class QuerulousProject(info: ProjectInfo) extends StandardLibraryProject(info)
+with DefaultRepos
+with SubversionPublisher {
- val util = "com.twitter" % "util" % "1.8.11"
+ val utilCore = "com.twitter" % "util-core" % "1.8.13"
+ val dbcp = "commons-dbcp" % "commons-dbcp" % "1.4"
+ val mysqljdbc = "mysql" % "mysql-connector-java" % "5.1.13"
+ val pool = "commons-pool" % "commons-pool" % "1.5.4"
- val dbcp = "commons-dbcp" % "commons-dbcp" % "1.4"
- val mysqljdbc = "mysql" % "mysql-connector-java" % "5.1.13"
- val pool = "commons-pool" % "commons-pool" % "1.5.4"
-
- val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1" % "test"
- val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1" % "test"
- val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test"
- val objenesis = "org.objenesis" % "objenesis" % "1.1" % "test"
- val jmock = "org.jmock" % "jmock" % "2.4.0" % "test"
- val cglib = "cglib" % "cglib" % "2.2" % "test"
- val asm = "asm" % "asm" % "1.5.3" % "test"
+ val utilEval = "com.twitter" % "util-eval" % "1.8.13" % "test"
+ val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1" % "test"
+ val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1" % "test"
+ val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test"
+ val objenesis = "org.objenesis" % "objenesis" % "1.1" % "test"
+ val jmock = "org.jmock" % "jmock" % "2.4.0" % "test"
+ val cglib = "cglib" % "cglib" % "2.2" % "test"
+ val asm = "asm" % "asm" % "1.5.3" % "test"
override def subversionRepository = Some("http://svn.local.twitter.com/maven-public/")
}
@@ -2,5 +2,5 @@ import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val twitter = "twitter.com" at "http://maven.twttr.com/"
- val defaultProject = "com.twitter" % "standard-project" % "0.7.17"
+ val standardProject = "com.twitter" % "standard-project" % "0.12.7"
}
@@ -0,0 +1,4 @@
+#Automatically generated by ReleaseManagement
+#Mon Jun 06 13:52:29 PDT 2011
+version=2.2.0
+sha1=c8773aac4ff6f4890d8b7dbb59fd68ea60d53118
@@ -0,0 +1,22 @@
+package com.twitter.querulous
+
+import java.util.concurrent.{ThreadFactory, TimeoutException => JTimeoutException, _}
+import java.util.concurrent.atomic.AtomicInteger
+import com.twitter.util.Duration
+
+
+class DaemonThreadFactory extends ThreadFactory {
+ val group = new ThreadGroup(Thread.currentThread().getThreadGroup(), "querulous")
+ val threadNumber = new AtomicInteger(1)
+
+ def newThread(r: Runnable) = {
+ val thread = new Thread(group, r, "querulous-" + threadNumber.getAndIncrement())
+ if (!thread.isDaemon) {
+ thread.setDaemon(true)
+ }
+ if (thread.getPriority != Thread.NORM_PRIORITY) {
+ thread.setPriority(Thread.NORM_PRIORITY)
+ }
+ thread
+ }
+}
@@ -4,25 +4,16 @@ import java.util.concurrent.{ThreadFactory, TimeoutException => JTimeoutExceptio
import java.util.concurrent.atomic.AtomicInteger
import com.twitter.util.Duration
-class FutureTimeout(poolSize: Int, queueSize: Int) {
- object DaemonThreadFactory extends ThreadFactory {
- val group = new ThreadGroup(Thread.currentThread().getThreadGroup(), "querulous")
- val threadNumber = new AtomicInteger(1)
- def newThread(r: Runnable) = {
- val thread = new Thread(group, r, "querulous-" + threadNumber.getAndIncrement())
- if (!thread.isDaemon) {
- thread.setDaemon(true)
- }
- if (thread.getPriority != Thread.NORM_PRIORITY) {
- thread.setPriority(Thread.NORM_PRIORITY)
- }
- thread
- }
- }
- private val executor = new ThreadPoolExecutor(1, poolSize, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue[Runnable](queueSize),
- DaemonThreadFactory)
+class FutureTimeout(poolSize: Int, queueSize: Int) {
+ private val executor = new ThreadPoolExecutor(
+ 1,
+ poolSize,
+ 60,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable](queueSize),
+ new DaemonThreadFactory()
+ )
class Task[T](f: => T)(onTimeout: T => Unit) extends Callable[T] {
private var cancelled = false
@@ -0,0 +1,27 @@
+package com.twitter.querulous.async
+
+import java.sql.Connection
+import com.twitter.util.Future
+
+
+trait AsyncDatabaseFactory {
+ def apply(
+ hosts: List[String],
+ name: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncDatabase
+
+ def apply(hosts: List[String], name: String, username: String, password: String): AsyncDatabase = {
+ apply(hosts, name, username, password, Map.empty)
+ }
+
+ def apply(hosts: List[String], username: String, password: String): AsyncDatabase = {
+ apply(hosts, null, username, password, Map.empty)
+ }
+}
+
+trait AsyncDatabase {
+ def withConnection[R](f: Connection => R): Future[R]
+}
@@ -0,0 +1,112 @@
+package com.twitter.querulous.async
+
+import java.util.concurrent.Executors
+import java.sql.ResultSet
+import com.twitter.util.{Future, FuturePool}
+import com.twitter.querulous.config.{Connection => ConnectionConfig}
+import com.twitter.querulous.DaemonThreadFactory
+import com.twitter.querulous.evaluator._
+import com.twitter.querulous.query.{QueryClass, SqlQueryFactory}
+import com.twitter.querulous.database.ThrottledPoolingDatabaseFactory
+import com.twitter.conversions.time._
+
+
+object AsyncQueryEvaluator extends AsyncQueryEvaluatorFactory {
+ lazy val defaultFuturePool = FuturePool(Executors.newCachedThreadPool(new DaemonThreadFactory))
+
+ private def createEvaluatorFactory() = {
+ new StandardAsyncQueryEvaluatorFactory(
+ new BlockingDatabaseWrapperFactory(
+ defaultFuturePool,
+ new ThrottledPoolingDatabaseFactory(10, 100.millis, 10.seconds, 1.second)
+ ),
+ new SqlQueryFactory
+ )
+ }
+
+ def apply(
+ dbhosts: List[String],
+ dbname: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncQueryEvaluator = {
+ createEvaluatorFactory()(dbhosts, dbname, username, password, urlOptions)
+ }
+}
+
+trait AsyncQueryEvaluatorFactory {
+ def apply(
+ dbhosts: List[String],
+ dbname: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncQueryEvaluator
+
+ def apply(dbhost: String, dbname: String, username: String, password: String, urlOptions: Map[String, String]): AsyncQueryEvaluator = {
+ apply(List(dbhost), dbname, username, password, urlOptions)
+ }
+
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String): AsyncQueryEvaluator = {
+ apply(dbhosts, dbname, username, password, Map[String,String]())
+ }
+
+ def apply(dbhost: String, dbname: String, username: String, password: String): AsyncQueryEvaluator = {
+ apply(List(dbhost), dbname, username, password, Map[String,String]())
+ }
+
+ def apply(dbhost: String, username: String, password: String): AsyncQueryEvaluator = {
+ apply(List(dbhost), null, username, password, Map[String,String]())
+ }
+
+ def apply(dbhosts: List[String], username: String, password: String): AsyncQueryEvaluator = {
+ apply(dbhosts, null, username, password, Map[String,String]())
+ }
+
+ def apply(connection: ConnectionConfig): AsyncQueryEvaluator = {
+ apply(connection.hostnames.toList, connection.database, connection.username, connection.password, connection.urlOptions)
+ }
+}
+
+trait AsyncQueryEvaluator {
+ def select[A](queryClass: QueryClass, query: String, params: Any*)(f: ResultSet => A): Future[Seq[A]]
+
+ def select[A](query: String, params: Any*)(f: ResultSet => A): Future[Seq[A]] = {
+ select(QueryClass.Select, query, params: _*)(f)
+ }
+
+ def selectOne[A](queryClass: QueryClass, query: String, params: Any*)(f: ResultSet => A): Future[Option[A]]
+
+ def selectOne[A](query: String, params: Any*)(f: ResultSet => A): Future[Option[A]] = {
+ selectOne(QueryClass.Select, query, params: _*)(f)
+ }
+
+ def count(queryClass: QueryClass, query: String, params: Any*): Future[Int]
+
+ def count(query: String, params: Any*): Future[Int] = {
+ count(QueryClass.Select, query, params: _*)
+ }
+
+ def execute(queryClass: QueryClass, query: String, params: Any*): Future[Int]
+
+ def execute(query: String, params: Any*): Future[Int] = {
+ execute(QueryClass.Execute, query, params: _*)
+ }
+
+ def executeBatch(queryClass: QueryClass, query: String)(f: ParamsApplier => Unit): Future[Int]
+
+ def executeBatch(query: String)(f: ParamsApplier => Unit): Future[Int] = {
+ executeBatch(QueryClass.Execute, query)(f)
+ }
+
+ def nextId(tableName: String): Future[Long]
+
+ def insert(queryClass: QueryClass, query: String, params: Any*): Future[Long]
+
+ def insert(query: String, params: Any*): Future[Long] = {
+ insert(QueryClass.Execute, query, params: _*)
+ }
+
+ def transaction[T](f: Transaction => T): Future[T]
+}
@@ -0,0 +1,76 @@
+package com.twitter.querulous.async
+
+import java.util.concurrent.Executors
+import java.sql.Connection
+import com.twitter.util.{Return, Throw, Future, Promise, FuturePool, JavaTimer, TimeoutException}
+import com.twitter.querulous.DaemonThreadFactory
+import com.twitter.querulous.database.{Database, DatabaseFactory}
+
+
+class BlockingDatabaseWrapperFactory(pool: => FuturePool, factory: DatabaseFactory)
+extends AsyncDatabaseFactory {
+ def apply(
+ hosts: List[String],
+ name: String,
+ username: String,
+ password: String,
+ urlOptions: Map[String, String]
+ ): AsyncDatabase = {
+ new BlockingDatabaseWrapper(
+ pool,
+ factory(hosts, name, username, password, urlOptions)
+ )
+ }
+}
+
+private object AsyncConnectionCheckout {
+ lazy val checkoutTimer = new JavaTimer(true)
+}
+
+class BlockingDatabaseWrapper(
+ pool: FuturePool,
+ protected[async] val database: Database)
+extends AsyncDatabase {
+
+ import AsyncConnectionCheckout._
+
+ // XXX: this probably should be configurable as well.
+ private val checkoutPool = FuturePool(Executors.newSingleThreadExecutor(new DaemonThreadFactory))
+ private val openTimeout = database.openTimeout
+
+ def withConnection[R](f: Connection => R) = {
+ checkoutConnection() flatMap { conn =>
+ pool {
+ try {
+ f(conn)
+ } finally {
+ database.close(conn)
+ }
+ }
+ }
+ }
+
+ private def checkoutConnection(): Future[Connection] = {
+ val promise = new Promise[Connection]
+
+ checkoutPool(database.open()) respond { rv =>
+ // if the promise has already timed out, we need to close the connection here.
+ if (!promise.updateIfEmpty(rv)) rv foreach database.close
+ }
+
+ checkoutTimer.schedule(openTimeout.fromNow) {
+ promise.updateIfEmpty(Throw(new TimeoutException(openTimeout.toString)))
+ }
+
+ promise
+ }
+
+ // equality overrides
+
+ override def equals(other: Any) = other match {
+ case other: BlockingDatabaseWrapper => database eq other.database
+ case _ => false
+ }
+
+ override def hashCode = database.hashCode
+}
Oops, something went wrong.

0 comments on commit b1e60d9

Please sign in to comment.