Skip to content
Browse files

Users can now add annotations to queries. These get passed along with…

… the queries as sql comments. Useful for finding the services or trace ids involved in long running queries.
  • Loading branch information...
1 parent 65f1804 commit 475d31a280bfcd5a3bf7fc88269c8e71123e257b @johanoskarsson johanoskarsson committed Feb 7, 2012
View
9 querulous-core/src/main/scala/com/twitter/querulous/query/Query.scala
@@ -1,6 +1,7 @@
package com.twitter.querulous.query
import java.sql.{ResultSet, Connection}
+import scala.collection.immutable.Map
trait QueryFactory {
def apply(connection: Connection, queryClass: QueryClass, queryString: String, params: Any*): Query
@@ -12,4 +13,12 @@ trait Query {
def execute(): Int
def addParams(params: Any*)
def cancel()
+
+ /**
+ * Add an annotation you want to be sent along with the query as a comment.
+ * Could for example be information you want to find in a slow query log.
+ */
+ private var ann = Map[String, String]()
+ def addAnnotation(key: String, value:String) = ann = ann + (key -> value)
+ def annotations: Map[String, String] = ann
}
View
77 querulous-core/src/main/scala/com/twitter/querulous/query/SqlQuery.scala
@@ -4,6 +4,7 @@ import java.sql.{Connection, PreparedStatement, ResultSet, SQLException, Timesta
import java.lang.reflect.{Field, Modifier}
import java.util.regex.Pattern
import scala.collection.mutable
+import json.JSONObject
class SqlQueryFactory extends QueryFactory {
def apply(connection: Connection, queryClass: QueryClass, query: String, params: Any*) = {
@@ -48,9 +49,35 @@ class SqlQuery(connection: Connection, val query: String, params: Any*) extends
}
var paramsInitialized = false
- var statement = buildStatement(connection, query, params: _*)
+ // we need to delay creation of the statement till it's needed
+ // so users can add annotations if they want to
+ var statementInstance: Option[PreparedStatement] = None
+ var additionalParams: Seq[Seq[Any]] = Seq[Seq[Any]]()
var batchMode = false
+ /**
+ * Get the statement from the connection, query and params specified.
+ */
+ def statement = {
+ statementInstance.getOrElse {
+ val s = buildStatement(connection, query, params: _*)
+ // if the user has added more params we need to add those as separate batches
+ if (!additionalParams.isEmpty) {
+ // add a batch if params have been initialized already
+ if (paramsInitialized) s.addBatch()
+
+ // add a batch for each of the additional params
+ additionalParams foreach { p =>
+ setBindVariable(s, 1, p)
+ s.addBatch()
+ }
+ }
+
+ statementInstance = Some(s)
+ s
+ }
+ }
+
def select[A](f: ResultSet => A): Seq[A] = {
withStatement {
statement.executeQuery()
@@ -68,11 +95,7 @@ class SqlQuery(connection: Connection, val query: String, params: Any*) extends
}
def addParams(params: Any*) = {
- if(paramsInitialized && !batchMode) {
- statement.addBatch()
- }
- setBindVariable(statement, 1, params)
- statement.addBatch()
+ additionalParams = additionalParams :+ params
batchMode = true
}
@@ -107,8 +130,48 @@ class SqlQuery(connection: Connection, val query: String, params: Any*) extends
}
}
+ /**
+ * Convert the annotations into an sql comment.
+ */
+ private def annotationsAsComment: String = {
+ if (annotations.isEmpty) {
+ ""
+ } else {
+ " /*~{" + annotations.map({ case (k,v) => "\"" + quoteString(k) + "\" : \"" +
+ quoteString(v) + "\"" }).mkString(", ") + "}*/"
+ }
+ }
+
+ /**
+ * TODO remove: this was lifted from Parser.scala in Scala 2.9, since the 2.8 version didn't escape
+ *
+ * This function can be used to properly quote Strings
+ * for JSON output.
+ */
+ def quoteString (s : String) : String =
+ s.map {
+ case '"' => "\\\""
+ case '\\' => "\\\\"
+ case '/' => "\\/"
+ case '\b' => "\\b"
+ case '\f' => "\\f"
+ case '\n' => "\\n"
+ case '\r' => "\\r"
+ case '\t' => "\\t"
+ /* We'll unicode escape any control characters. These include:
+ * 0x0 -> 0x1f : ASCII Control (C0 Control Codes)
+ * 0x7f : ASCII DELETE
+ * 0x80 -> 0x9f : C1 Control Codes
+ *
+ * Per RFC4627, section 2.5, we're not technically required to
+ * encode the C1 codes, but we do to be safe.
+ */
+ case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int)
+ case c => c
+ }.mkString
+
private def buildStatement(connection: Connection, query: String, params: Any*) = {
- val statement = connection.prepareStatement(expandArrayParams(query, params: _*))
+ val statement = connection.prepareStatement(expandArrayParams(query + annotationsAsComment, params: _*))
setBindVariable(statement, 1, params)
statement
}
View
14 querulous-core/src/test/scala/com/twitter/querulous/integration/QuerySpec.scala
@@ -36,6 +36,20 @@ class QuerySpec extends ConfiguredSpecification {
} mustEqual 2
}
+ "add annotations" >> {
+ val connection = testDatabaseFactory(config.hostnames.toList, config.database, config.username,
+ config.password, config.urlOptions, config.driverName).open()
+
+ try {
+ val query = testQueryFactory(connection, QueryClass.Select, "SELECT 1")
+ query.addAnnotation("key", "value")
+ query.addAnnotation("key2", "value2")
+ query.select(rv => rv.getInt(1) mustEqual 1)
+ } finally {
+ connection.close()
+ }
+ }
+
"with just the right number of arguments" >> {
queryEvaluator.select("SELECT 1 FROM DUAL WHERE 1 IN (?)", List(1, 2, 3))(_.getInt(1)).toList mustEqual List(1)
}
View
30 querulous-core/src/test/scala/com/twitter/querulous/unit/SqlQuerySpec.scala
@@ -93,9 +93,10 @@ class SqlQuerySpec extends Specification with JMocker with ClassMocker {
one(statement).setInt(3, 0x03) then
one(statement).setLong(4, 4) then
one(statement).setDouble(5, 5.0)
+ one(statement).executeUpdate()
}
- new SqlQuery(connection, queryString, "one", 2, 0x03, 4L, 5.0)
+ new SqlQuery(connection, queryString, "one", 2, 0x03, 4L, 5.0).execute()
}
"insert nulls" in {
@@ -110,9 +111,10 @@ class SqlQuerySpec extends Specification with JMocker with ClassMocker {
one(statement).setNull(4, Types.BOOLEAN)
one(statement).setNull(5, Types.BIGINT)
one(statement).setNull(6, Types.VARBINARY)
+ one(statement).executeUpdate()
}
- new SqlQuery(connection, queryString, NullString, NullInt, NullDouble, NullBoolean, NullLong, NullValues(Types.VARBINARY))
+ new SqlQuery(connection, queryString, NullString, NullInt, NullDouble, NullBoolean, NullLong, NullValues(Types.VARBINARY)).execute()
}
"handle exceptions" in {
@@ -124,7 +126,7 @@ class SqlQuerySpec extends Specification with JMocker with ClassMocker {
expect {
one(connection).prepareStatement(queryString) willReturn statement
}
- new SqlQuery(connection, queryString, unrecognizedType) must throwAn[IllegalArgumentException]
+ new SqlQuery(connection, queryString, unrecognizedType).execute() must throwAn[IllegalArgumentException]
}
"throw chained-exception" in {
val expectedCauseException = new SQLException("")
@@ -133,7 +135,7 @@ class SqlQuerySpec extends Specification with JMocker with ClassMocker {
one(statement).setString(1, "one") willThrow expectedCauseException
}
try {
- new SqlQuery(connection, queryString, "one")
+ new SqlQuery(connection, queryString, "one").execute()
fail("should throw")
} catch {
case e: Exception => {
@@ -143,5 +145,25 @@ class SqlQuerySpec extends Specification with JMocker with ClassMocker {
}
}
}
+
+ "add annotations to query" in {
+ val queryString = "select * from table"
+ val connection = mock[Connection]
+ val statement = mock[PreparedStatement]
+
+ expect {
+ one(connection).prepareStatement("select * from table /*~{\"key\" : \"value2\", " +
+ "\"key2\" : \"*\\/select 1\", \"key3\" : \"{:}\"}*/") willReturn statement then
+ one(statement).executeQuery() then
+ one(statement).getResultSet
+ }
+
+ val query = new SqlQuery(connection, queryString)
+ query.addAnnotation("key", "value")
+ query.addAnnotation("key", "value2") // we'll only keep this
+ query.addAnnotation("key2", "*/select 1") // trying to end the comment early
+ query.addAnnotation("key3", "{:}") // going all json on your ass
+ query.select(result => fail("should not return any data"))
+ }
}
}
View
3 ...lous-tracing/src/main/scala/com/twitter/querulous/config/TracingAsyncQueryEvaluator.scala
@@ -8,9 +8,10 @@ class TracingAsyncQueryEvaluator extends AsyncQueryEvaluator {
var tracerFactory: tracing.Tracer.Factory = tracing.NullTracer.factory
var serviceName: String = ""
+ var annotateQuery: Boolean = true // send info such as service name, ip and trace id with query
protected override def newQueryFactory(stats: querulous.StatsCollector) = {
- new TracingQueryFactory(super.newQueryFactory(stats), serviceName, tracerFactory())
+ new TracingQueryFactory(super.newQueryFactory(stats), serviceName, tracerFactory(), annotateQuery)
}
}
View
50 querulous-tracing/src/main/scala/com/twitter/querulous/query/TracingQuery.scala
@@ -8,9 +8,21 @@ import java.nio.ByteBuffer
/**
* Adds trace annotations to capture data about the query.
* This data is then processed and sent off with a Finagle compatible tracer.
+ * @param query The query to execute
+ * @param connection Connection to execute query on
+ * @param queryClass Class of Query
+ * @param serviceName The service name we want to associate with the traces generated by this query
+ * @param tracer The tracer we want to send traces to
+ * @param annotateQuery Do we want to annotate the query or not? Annotate attaches information about this
+ * trace to the queries we send. For example we attach the trace id so that we can look up
+ * information about where this query comes from.w
*/
-class TracingQuery(query: Query, connection: Connection, queryClass: QueryClass,
- serviceName: String, tracer: Tracer) extends QueryProxy(query: Query) {
+class TracingQuery(query: Query,
+ connection: Connection,
+ queryClass: QueryClass,
+ serviceName: String,
+ tracer: Tracer,
+ annotateQuery: Boolean) extends QueryProxy(query: Query) {
override protected def delegate[A](f: => A) = {
Trace.unwind {
@@ -20,12 +32,19 @@ class TracingQuery(query: Query, connection: Connection, queryClass: QueryClass,
val sampled = Trace.id.sampled orElse tracer.sampleTrace(nextId)
Trace.pushId(nextId.copy(sampled = sampled))
- try {
- // don't know the port
- Trace.recordClientAddr(new InetSocketAddress(
- InetAddress.getByName(connection.getClientInfo("ClientHostname")), 0))
- } catch {
- case e: UnknownHostException => () // just don't set it if we can't find it
+ val address = getLocalAddress(connection)
+ address foreach { Trace.recordClientAddr(_) }
+
+ // do we want to annotate this query at all?
+ if (annotateQuery) {
+ // set the ip and service name to help debugging
+ address foreach { addr => query.addAnnotation("client_host", addr.getAddress.getHostAddress) }
+ query.addAnnotation("service_name", serviceName)
+
+ // only set trace id if we have decided to sample this trace
+ if (sampled.getOrElse(false)) {
+ query.addAnnotation("trace_id", nextId.traceId.toString())
+ }
}
// we want to know which query caused these timings
@@ -43,15 +62,26 @@ class TracingQuery(query: Query, connection: Connection, queryClass: QueryClass,
rv
}
}
+
+ def getLocalAddress(connection: Connection): Option[InetSocketAddress] = {
+ try {
+ // don't know the port
+ Some(new InetSocketAddress(
+ InetAddress.getByName(connection.getClientInfo("ClientHostname")), 0))
+ } catch {
+ case e: UnknownHostException => None
+ }
+ }
}
class TracingQueryFactory(queryFactory: QueryFactory,
serviceName: String,
- tracer: Tracer) extends QueryFactory {
+ tracer: Tracer,
+ annotateQuery: Boolean) extends QueryFactory {
def apply(connection: Connection, queryClass: QueryClass, query: String, params: Any*) = {
new TracingQuery(queryFactory(connection, queryClass, query, params: _*),
- connection, queryClass, serviceName, tracer)
+ connection, queryClass, serviceName, tracer, annotateQuery)
}
override def shutdown() = { tracer.release() }
View
9 querulous-tracing/src/test/scala/com/twitter/querulous/unit/TracingQuerySpec.scala
@@ -3,8 +3,8 @@ package com.twitter.querulous.unit
import java.sql.Connection
import org.specs.Specification
import org.specs.mock.JMocker
-import com.twitter.finagle.tracing.{Record, TraceId, Tracer}
import com.twitter.querulous.query._
+import com.twitter.finagle.tracing._
class TracingQuerySpec extends Specification with JMocker {
"TracingQuery" should {
@@ -13,17 +13,18 @@ class TracingQuerySpec extends Specification with JMocker {
val queryString = "select * from users"
val tracer = mock[Tracer]
val connection = mock[Connection]
+ Trace.pushId(TraceId(Some(SpanId(1)), None, SpanId(1), Some(true)))
expect {
- one(tracer).sampleTrace(a[TraceId])
one(connection).getClientInfo("ClientHostname")
- one(connection).prepareStatement(queryString)
+ one(connection).prepareStatement("select * from users /*~{\"client_host\" : \"127.0.0.1\", " +
+ "\"service_name\" : \"service\", \"trace_id\" : \"0000000000000001\"}*/")
exactly(5).of(tracer).record(a[Record])
}
val query = new SqlQuery(connection, queryString)
val tracingQuery = new TracingQuery(query, connection, QueryClass.Select,
- "service", tracer)
+ "service", tracer, true)
tracingQuery.execute()
}
}

0 comments on commit 475d31a

Please sign in to comment.
Something went wrong with that request. Please try again.