Permalink
Browse files

Merge branch 'connection_decoration'

Conflicts:
	src/main/scala/com/twitter/querulous/connectionpool/MemoizingConnectionPoolFactory.scala
  • Loading branch information...
2 parents b569d4e + 852c4d0 commit 0d8e2d8fb81723a349fed8b37995bcb893d9da40 Ed Ceaser committed Feb 12, 2010
Showing with 280 additions and 239 deletions.
  1. +14 −15 README.markdown
  2. +1 −1 ivy/ivy.xml
  3. +15 −0 src/main/scala/com/twitter/querulous/Connection.scala
  4. +0 −20 src/main/scala/com/twitter/querulous/connectionpool/ConnectionPool.scala
  5. +0 −18 src/main/scala/com/twitter/querulous/connectionpool/MemoizingConnectionPoolFactory.scala
  6. +0 −32 src/main/scala/com/twitter/querulous/connectionpool/StatsCollectingConnectionPool.scala
  7. +10 −14 ...tter/querulous/{connectionpool/ApacheConnectionPool.scala → database/ApachePoolingDatabase.scala}
  8. +25 −0 src/main/scala/com/twitter/querulous/database/Database.scala
  9. +13 −0 src/main/scala/com/twitter/querulous/database/MemoizingDatabase.scala
  10. +26 −0 src/main/scala/com/twitter/querulous/database/StatsCollectingDatabase.scala
  11. +3 −3 src/main/scala/com/twitter/querulous/evaluator/QueryEvaluator.scala
  12. +9 −18 src/main/scala/com/twitter/querulous/evaluator/StandardQueryEvaluator.scala
  13. +1 −1 src/main/scala/com/twitter/querulous/evaluator/Transaction.scala
  14. +1 −1 src/main/scala/com/twitter/querulous/query/DebuggingQuery.scala
  15. +1 −1 src/main/scala/com/twitter/querulous/query/Query.scala
  16. +1 −1 src/main/scala/com/twitter/querulous/query/RetryingQuery.scala
  17. +1 −1 src/main/scala/com/twitter/querulous/query/SqlQuery.scala
  18. +1 −1 src/main/scala/com/twitter/querulous/query/StatsCollectingQuery.scala
  19. +1 −1 src/main/scala/com/twitter/querulous/query/TimingOutQuery.scala
  20. +16 −0 src/main/scala/com/twitter/querulous/test/FakeDatabase.scala
  21. +1 −1 src/{test/scala/com/twitter/querulous/fake → main/scala/com/twitter/querulous/test}/FakeQuery.scala
  22. +1 −1 ...cala/com/twitter/querulous/fake → main/scala/com/twitter/querulous/test}/FakeQueryEvaluator.scala
  23. +1 −1 ...cala/com/twitter/querulous/fake → main/scala/com/twitter/querulous/test}/FakeStatsCollector.scala
  24. +0 −19 src/test/scala/com/twitter/querulous/fake/FakeConnectionPool.scala
  25. +1 −1 src/test/scala/com/twitter/querulous/integration/QuerySpec.scala
  26. +4 −4 src/test/scala/com/twitter/querulous/integration/TimeoutSpec.scala
  27. +1 −1 src/test/scala/com/twitter/querulous/unit/AutoDisablingQueryEvaluatorSpec.scala
  28. +54 −0 src/test/scala/com/twitter/querulous/unit/ConnectionSpec.scala
  29. +1 −1 src/test/scala/com/twitter/querulous/unit/DebuggingQuerySpec.scala
  30. +0 −29 src/test/scala/com/twitter/querulous/unit/MemoizingConnectionPoolFactorySpec.scala
  31. +29 −0 src/test/scala/com/twitter/querulous/unit/MemoizingDatabaseFactorySpec.scala
  32. +10 −14 src/test/scala/com/twitter/querulous/unit/QueryEvaluatorSpec.scala
  33. +1 −1 src/test/scala/com/twitter/querulous/unit/RetryingQuerySpec.scala
  34. +3 −3 src/test/scala/com/twitter/querulous/unit/SqlQuerySpec.scala
  35. +0 −32 src/test/scala/com/twitter/querulous/unit/StatsCollectingConnectionPoolSpec.scala
  36. +31 −0 src/test/scala/com/twitter/querulous/unit/StatsCollectingDatabaseSpec.scala
  37. +1 −1 src/test/scala/com/twitter/querulous/unit/StatsCollectingQuerySpec.scala
  38. +2 −2 src/test/scala/com/twitter/querulous/unit/TimingOutQuerySpec.scala
View
@@ -19,15 +19,15 @@ welcome.
## Understanding the Implementation
-`Querulous` is made out of three components: QueryEvaluators, Queries, and ConnectionPools.
+`Querulous` is made out of three components: QueryEvaluators, Queries, and Databases.
* QueryEvaluators are a convenient procedural interface for executing queries.
* Queries are objects representing a SELECT/UPDATE/INSERT/DELETE SQL Query. They are responsible for most type-casting, timeouts, and so forth. You will rarely interact with Queries directly.
-* ConnectionPools reserve and release connections to the database.
+* Databases reserve and release connections an actual database.
Each of these three kinds of objects implement an interface. Enhanced functionality is meant to be "layered-on" by wrapping decorators around these objects that implement the enhanced functionality and delegate the primitive functionality.
-Each of the three components are meant to be instantiated with their corresponding factories (e.g., QueryEvaluatorFactory, ConnectionPoolFactory, etc.). The system is made configurable by constructing factories that manufacture the Decorators you're interested in. For example,
+Each of the three components are meant to be instantiated with their corresponding factories (e.g., QueryEvaluatorFactory, DatabaseFactory, etc.). The system is made configurable by constructing factories that manufacture the Decorators you're interested in. For example,
val queryFactory = new DebuggingQueryFactory(new TimingOutQueryFactory(new SqlQueryFactory))
val query = queryFactory(...) // this query will have debugging information and timeouts!
@@ -66,19 +66,18 @@ For production-quality use of `Querulous` you'll want to set configuration optio
import com.twitter.querulous.evaluator._
import com.twitter.querulous.query._
- import com.twitter.querulous.connectionpool._
+ import com.twitter.querulous.database._
val queryFactory = new SqlQueryFactory
- val apacheConnectionPoolConfig = new ApacheConnectionPoolConfig()
- val connectionPoolFactory = new ApacheConnectionPoolFactory(
+ val apachePoolingDatabaseFactory = new apachePoolingDatabaseFactory(
minOpenConnections: Int, // minimum number of open/active connections at all times
maxOpenConnections: Int, // minimum number of open/active connections at all times
checkConnectionHealthWhenIdleFor: Duration, // asynchronously check the health of open connections every `checkConnectionHealthWhenIdleFor` amount of time
maxWaitForConnectionReservation: Duration, // maximum amount of time you're willing to wait to reserve a connection from the pool; throw an exception otherwise
checkConnectionHealthOnReservation: Boolean, // check connection health when reserving the connection from the pool
evictConnectionIfIdleFor: Duration // destroy connections if they are idle for longer than `evictConnectionIfIdleFor` amount of time
)
- val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(connectionPoolFactory, queryFactory)
+ val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(apachePoolingDatabaseFactory, queryFactory)
val queryEvaluator = queryEvaluatorFactory(List("primaryhost", "fallbackhost1", "fallbackhost2", ...), "username", "password")
Now comes the fun part.
@@ -106,22 +105,22 @@ Suppose you want to measure average and standard deviation of latency, and query
See the section [Statistics Collection] for more information.
-#### ConnectionPool Decorators
+#### Database Decorators
-Suppose you want to measure latency around the reserve/release operations of the ConnectionPool:
+Suppose you want to measure latency around the reserve/release operations of the Database:
val stats = new StatsCollector
- val connectionPoolFactory = new StatsCollectingConnectionPoolFactory(new ApacheConnectionPoolFactory(...), stats)
+ val databaseFactory = new StatsCollectingDatabase(new ApachePoolingDatabaseFactory(...), stats)
-Suppose you are actually dynamically connecting to dozens of hosts (because of a sharding strategy or something similar) and you want to maintain proper connection limits. You can memoize your connection pools like this:
+Suppose you are actually dynamically connecting to dozens of hosts (because of a sharding strategy or something similar) and you want to maintain proper connection limits. You can memoize your database connections like this:
- val connectionPoolFactory = new MemoizingConnectionPoolFactory(new ApacheConnectionPoolFactory(...))
+ val databaseFactory = new MemoizingDatabaseFactory(new ApachePoolingDatabaseFactory(...))
#### QueryEvaluator Decorators
Suppose you want to automatically disable all connections to a particular host after a certain number of SQL Exceptions (timeouts, etc.):
- val queryEvaluatorFactory = new AutoDisablingQueryEvaluatorFactory(new StandardQueryEvaluatorFactory(connectionPoolFactory, queryFactory))
+ val queryEvaluatorFactory = new AutoDisablingQueryEvaluatorFactory(new StandardQueryEvaluatorFactory(databaseFactory, queryFactory))
### Recommended Configuration Options
@@ -139,7 +138,7 @@ StatsCollector is actually just a trait that you'll need to implement using your
def incr(name: String, count: Int) = Stats.incr(name, count)
def time[A](name: String)(f: => A): A = Stats.time(name)(f)
}
- val connectionPoolFactory = new StatsCollectingConnectionPoolFactory(new ApacheConnectionPoolFactory(...), stats)
+ val databaseFactory = new StatsCollectingDatabaseFactory(new ApachePoolingDatabaseFactory(...), stats)
## Installation
@@ -170,4 +169,4 @@ The Github issue tracker is {here}[http://github.com/nkallen/querulous/issues].
* Nick Kallen
* Robey Pointer
* Ed Ceaser
-* Utkarsh Srivastava
+* Utkarsh Srivastava
View
@@ -1,7 +1,7 @@
<ivy-module version="1.0" xmlns:e="http://ant.apache.org/ivy/extra">
<info organisation="com.twitter"
module="querulous"
- revision="1.0.0"
+ revision="1.1.0"
e:buildpackage="com.twitter.querulous"
e:testclass="com.twitter.querulous.TestRunner"
e:jarclassname="com.twitter.querulous.Main"
@@ -0,0 +1,15 @@
+package com.twitter.querulous
+
+import java.sql.{Connection => JConnection, PreparedStatement}
+
+class Connection(connection: JConnection, val hosts: Seq[String]) {
+ def commit(): Unit = connection.commit()
+
+ def close(): Unit = connection.close()
+
+ def rollback(): Unit = connection.rollback()
+
+ def prepareStatement(query: String): PreparedStatement = connection.prepareStatement(query)
+
+ def setAutoCommit(value: Boolean): Unit = connection.setAutoCommit(value)
+}
@@ -1,20 +0,0 @@
-package com.twitter.querulous.connectionpool
-
-import java.sql.Connection
-
-trait ConnectionPoolFactory {
- def apply(dbhosts: List[String], dbname: String, username: String, password: String): ConnectionPool
-}
-
-trait ConnectionPool {
- def reserve(): Connection
-
- def release(connection: Connection)
-
- protected def url(dbhosts: List[String], dbname: String) = {
- val dbnameSegment = if (dbname == null) "" else ("/" + dbname)
- "jdbc:mysql://" + dbhosts.mkString(",") + dbnameSegment + "?useUnicode=true&characterEncoding=UTF-8"
- }
-
- def close()
-}
@@ -1,18 +0,0 @@
-package com.twitter.querulous.connectionpool
-
-import scala.collection.mutable
-
-class MemoizingConnectionPoolFactory(connectionPoolFactory: ConnectionPoolFactory) extends ConnectionPoolFactory {
- private val connectionPools = new mutable.HashMap[String, ConnectionPool] with mutable.SynchronizedMap[String, ConnectionPool]
-
- def apply(dbhosts: List[String], dbname: String, username: String, password: String) = synchronized {
- connectionPools.getOrElseUpdate(
- dbhosts.first + "/" + dbname,
- connectionPoolFactory(dbhosts, dbname, username, password))
- }
-
- def close() {
- for ((name, pool) <- connectionPools) pool.close()
- connectionPools.clear()
- }
-}
@@ -1,32 +0,0 @@
-package com.twitter.querulous.connectionpool
-
-import java.sql.Connection
-
-class StatsCollectingConnectionPoolFactory(
- connectionPoolFactory: ConnectionPoolFactory,
- stats: StatsCollector) extends ConnectionPoolFactory {
-
- def apply(dbhosts: List[String], dbname: String, username: String, password: String): ConnectionPool = {
- new StatsCollectingConnectionPool(connectionPoolFactory(dbhosts, dbname, username, password), stats)
- }
-}
-
-class StatsCollectingConnectionPool(pool: ConnectionPool, stats: StatsCollector)
- extends ConnectionPool {
-
- override def reserve(): Connection = {
- stats.time("connection-pool-reserve-timing") {
- pool.reserve()
- }
- }
-
- override def release(connection: Connection) = {
- stats.time("connection-pool-release-timing") {
- pool.release(connection)
- }
- }
-
- override def close() = {
- pool.close()
- }
-}
@@ -1,20 +1,20 @@
-package com.twitter.querulous.connectionpool
+package com.twitter.querulous.database
-import java.sql.{Connection, SQLException}
+import java.sql.SQLException
import org.apache.commons.dbcp.{PoolableConnectionFactory, DriverManagerConnectionFactory, PoolingDataSource}
import org.apache.commons.pool.impl.{GenericObjectPool, StackKeyedObjectPoolFactory}
import com.twitter.xrayspecs.Duration
-class ApacheConnectionPoolFactory(
+class ApachePoolingDatabaseFactory(
minOpenConnections: Int,
maxOpenConnections: Int,
checkConnectionHealthWhenIdleFor: Duration,
maxWaitForConnectionReservation: Duration,
checkConnectionHealthOnReservation: Boolean,
- evictConnectionIfIdleFor: Duration) extends ConnectionPoolFactory {
+ evictConnectionIfIdleFor: Duration) extends DatabaseFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
- val pool = new ApacheConnectionPool(
+ val pool = new ApachePoolingDatabase(
dbhosts,
dbname,
username,
@@ -29,7 +29,7 @@ class ApacheConnectionPoolFactory(
}
}
-class ApacheConnectionPool(
+class ApachePoolingDatabase(
dbhosts: List[String],
dbname: String,
username: String,
@@ -39,7 +39,7 @@ class ApacheConnectionPool(
checkConnectionHealthWhenIdleFor: Duration,
maxWaitForConnectionReservation: Duration,
checkConnectionHealthOnReservation: Boolean,
- evictConnectionIfIdleFor: Duration) extends ConnectionPool {
+ evictConnectionIfIdleFor: Duration) extends Database {
Class.forName("com.mysql.jdbc.Driver")
@@ -65,20 +65,16 @@ class ApacheConnectionPool(
true)
private val poolingDataSource = new PoolingDataSource(connectionPool)
- def release(connection: Connection) {
+ def close(connection: Connection) {
try {
connection.close()
} catch {
case _: SQLException =>
}
}
- def reserve() = {
- poolingDataSource.getConnection()
- }
-
- def close() {
- connectionPool.close()
+ def open() = {
+ new Connection(poolingDataSource.getConnection(), dbhosts)
}
override def toString = dbhosts.first + "_" + dbname
@@ -0,0 +1,25 @@
+package com.twitter.querulous.database
+
+trait DatabaseFactory {
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String): Database
+}
+
+trait Database {
+ def open(): Connection
+
+ def close(connection: Connection)
+
+ def withConnection[A](f: Connection => A): A = {
+ val connection = open()
+ try {
+ f(connection)
+ } finally {
+ close(connection)
+ }
+ }
+
+ protected def url(dbhosts: List[String], dbname: String) = {
+ val dbnameSegment = if (dbname == null) "" else ("/" + dbname)
+ "jdbc:mysql://" + dbhosts.mkString(",") + dbnameSegment + "?useUnicode=true&characterEncoding=UTF-8"
+ }
+}
@@ -0,0 +1,13 @@
+package com.twitter.querulous.database
+
+import scala.collection.mutable
+
+class MemoizingDatabaseFactory(databaseFactory: DatabaseFactory) extends DatabaseFactory {
+ private val databases = new mutable.HashMap[String, Database] with mutable.SynchronizedMap[String, Database]
+
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String) = synchronized {
+ databases.getOrElseUpdate(
+ dbhosts.first + "/" + dbname,
+ databaseFactory(dbhosts, dbname, username, password))
+ }
+}
@@ -0,0 +1,26 @@
+package com.twitter.querulous.database
+
+class StatsCollectingDatabaseFactory(
+ databaseFactory: DatabaseFactory,
+ stats: StatsCollector) extends DatabaseFactory {
+
+ def apply(dbhosts: List[String], dbname: String, username: String, password: String): Database = {
+ new StatsCollectingDatabase(databaseFactory(dbhosts, dbname, username, password), stats)
+ }
+}
+
+class StatsCollectingDatabase(database: Database, stats: StatsCollector)
+ extends Database {
+
+ override def open(): Connection = {
+ stats.time("database-open-timing") {
+ database.open()
+ }
+ }
+
+ override def close(connection: Connection) = {
+ stats.time("database-close-timing") {
+ database.close(connection)
+ }
+ }
+}
@@ -2,14 +2,14 @@ package com.twitter.querulous.evaluator
import java.sql.ResultSet
import com.twitter.querulous.query.SqlQueryFactory
-import com.twitter.querulous.connectionpool.ApacheConnectionPoolFactory
+import com.twitter.querulous.database.ApachePoolingDatabaseFactory
import com.twitter.xrayspecs.TimeConversions._
object QueryEvaluator extends QueryEvaluatorFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
val queryFactory = new SqlQueryFactory
- val connectionPoolFactory = new ApacheConnectionPoolFactory(10, 10, 1.second, 10.millis, false, 0.seconds)
- val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(connectionPoolFactory, queryFactory)
+ val databaseFactory = new ApachePoolingDatabaseFactory(10, 10, 1.second, 10.millis, false, 0.seconds)
+ val queryEvaluatorFactory = new StandardQueryEvaluatorFactory(databaseFactory, queryFactory)
queryEvaluatorFactory(dbhosts, dbname, username, password)
}
}
@@ -1,22 +1,22 @@
package com.twitter.querulous.evaluator
-import java.sql.{Connection, ResultSet}
+import java.sql.ResultSet
import org.apache.commons.dbcp.{DriverManagerConnectionFactory, PoolableConnectionFactory, PoolingDataSource}
import org.apache.commons.pool.impl.GenericObjectPool
-import com.twitter.querulous.connectionpool.{ConnectionPool, ConnectionPoolFactory}
+import com.twitter.querulous.database.{Database, DatabaseFactory}
import com.twitter.querulous.query.QueryFactory
class StandardQueryEvaluatorFactory(
- connectionPoolFactory: ConnectionPoolFactory,
+ databaseFactory: DatabaseFactory,
queryFactory: QueryFactory) extends QueryEvaluatorFactory {
def apply(dbhosts: List[String], dbname: String, username: String, password: String) = {
- val connectionPool = connectionPoolFactory(dbhosts, dbname, username, password)
- new StandardQueryEvaluator(connectionPool, queryFactory)
+ val database = databaseFactory(dbhosts, dbname, username, password)
+ new StandardQueryEvaluator(database, queryFactory)
}
}
-class StandardQueryEvaluator(protected val connectionPool: ConnectionPool, queryFactory: QueryFactory)
+class StandardQueryEvaluator(protected val database: Database, queryFactory: QueryFactory)
extends QueryEvaluator {
def select[A](query: String, params: Any*)(f: ResultSet => A) = withTransaction(_.select(query, params: _*)(f))
@@ -42,26 +42,17 @@ class StandardQueryEvaluator(protected val connectionPool: ConnectionPool, query
}
private def withTransaction[A](f: Transaction => A) = {
- withConnection { connection => f(new Transaction(queryFactory, connection)) }
- }
-
- def withConnection[A](f: Connection => A): A = {
- val connection = connectionPool.reserve()
- try {
- f(connection)
- } finally {
- connectionPool.release(connection)
- }
+ database.withConnection { connection => f(new Transaction(queryFactory, connection)) }
}
override def equals(other: Any) = {
other match {
case other: StandardQueryEvaluator =>
- connectionPool eq other.connectionPool
+ database eq other.database
case _ =>
false
}
}
- override def hashCode = connectionPool.hashCode
+ override def hashCode = database.hashCode
}
Oops, something went wrong.

0 comments on commit 0d8e2d8

Please sign in to comment.