Permalink
Browse files

add a query class for SelectModify ("select_modify") and use it for s…

…elect operations that happen inside a modify job.
  • Loading branch information...
1 parent a437fc2 commit bc0d34f2bac98651d047426f2bb2d00835b81233 Robey Pointer committed Nov 9, 2010
View
@@ -26,9 +26,10 @@ db {
seconds = 60
}
- query_timeout_default = 2000
- queries {
- select_source_id_for_update = ["SELECT * FROM ? WHERE source_id = ? FOR UPDATE", 2000]
+ timeouts {
+ select = 2000
+ select_modify = 2000
+ execute = 2000
}
}
View
@@ -26,9 +26,10 @@ db {
seconds = 60
}
- query_timeout_default = 3000
- queries {
- select_source_id_for_update = ["SELECT * FROM ? WHERE source_id = ? FOR UPDATE", 3000]
+ timeouts {
+ select = 3000
+ select_modify = 3000
+ execute = 3000
}
}
View
@@ -26,9 +26,10 @@ db {
seconds = 60
}
- query_timeout_default = 2000
- queries {
- select_source_id_for_update = ["SELECT * FROM ? WHERE source_id = ? FOR UPDATE", 2000]
+ timeouts {
+ select = 2000
+ select_modify = 2000
+ execute = 2000
}
}
@@ -30,7 +30,7 @@ import com.twitter.ostrich.{Stats, W3CStats}
import com.twitter.querulous.StatsCollector
import com.twitter.querulous.database.DatabaseFactory
import com.twitter.querulous.evaluator.QueryEvaluatorFactory
-import com.twitter.querulous.query.QueryFactory
+import com.twitter.querulous.query.{QueryClass, QueryFactory}
import com.twitter.flockdb.conversions.Edge._
import com.twitter.flockdb.conversions.EdgeQuery._
import com.twitter.flockdb.conversions.EdgeResults._
@@ -59,6 +59,9 @@ object FlockDB {
}
def apply(config: ConfigMap, w3c: W3CStats): FlockDB = {
+ // make a new query class for select-during-modify.
+ QueryClass.register(shards.SelectModify)
+
val stats = statsCollector(w3c)
val dbQueryEvaluatorFactory = QueryEvaluatorFactory.fromConfig(config.configMap("db"), Some(stats))
val materializingQueryEvaluatorFactory = QueryEvaluatorFactory.fromConfig(config.configMap("materializing_db"), Some(stats))
@@ -22,7 +22,7 @@ import com.twitter.gizzard.proxy.SqlExceptionWrappingProxy
import com.twitter.gizzard.shards
import com.twitter.results.{Cursor, ResultWindow}
import com.twitter.querulous.evaluator.{QueryEvaluator, QueryEvaluatorFactory, Transaction}
-import com.twitter.querulous.query.SqlQueryTimeoutException
+import com.twitter.querulous.query.{QueryClass, SqlQueryTimeoutException}
import com.twitter.xrayspecs.Time
import com.twitter.xrayspecs.TimeConversions._
import com.mysql.jdbc.exceptions.MySQLTransactionRollbackException
@@ -31,6 +31,8 @@ import net.lag.logging.Logger
import State._
+object SelectModify extends QueryClass("select_modify")
+
class SqlShardFactory(instantiatingQueryEvaluatorFactory: QueryEvaluatorFactory, materializingQueryEvaluatorFactory: QueryEvaluatorFactory, config: ConfigMap)
extends shards.ShardFactory[Shard] {
@@ -182,7 +184,7 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
}
def selectIncludingArchived(sourceId: Long, count: Int, cursor: Cursor) = {
- select("destination_id", "unique_source_id_destination_id", count, cursor,
+ select(SelectModify, "destination_id", "unique_source_id_destination_id", count, cursor,
"source_id = ? AND state != ?",
sourceId, Removed.id)
}
@@ -193,14 +195,21 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
List(sourceId, states.map(_.id).toList): _*)
}
- private def select(cursorName: String, index: String, count: Int, cursor: Cursor, conditions: String, args: Any*) = {
+ private def select(cursorName: String, index: String, count: Int,
+ cursor: Cursor, conditions: String, args: Any*): ResultWindow[Long] = {
+ select(QueryClass.Select, cursorName, index, count, cursor, conditions, args: _*)
+ }
+
+ private def select(queryClass: QueryClass, cursorName: String, index: String, count: Int,
+ cursor: Cursor, conditions: String, args: Any*): ResultWindow[Long] = {
var edges = new mutable.ArrayBuffer[(Long, Cursor)]
val order = if (cursor < Cursor.Start) "ASC" else "DESC"
val inequality = if (order == "DESC") "<" else ">"
val (continueCursorQuery, args1) = query(cursorName, index, 1, cursor, opposite(order), opposite(inequality), conditions, args)
- val (edgesQuery, args2) = query(cursorName, index, count + 1, cursor, order, inequality, conditions, args)
- queryEvaluator.select(continueCursorQuery + " UNION " + edgesQuery, args1 ++ args2: _*) { row =>
+ val (edgesQuery, args2) = query(cursorName, index, count + 1, cursor, order, inequality, conditions, args)
+ val totalQuery = continueCursorQuery + " UNION " + edgesQuery
+ queryEvaluator.select(queryClass, totalQuery, args1 ++ args2: _*) { row =>
edges += (row.getLong("destination_id"), Cursor(row.getLong(cursorName)))
}
@@ -386,7 +395,8 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
private def writeEdge(transaction: Transaction, metadata: Metadata, edge: Edge,
predictExistence: Boolean): Int = {
val countDelta = if (predictExistence) {
- transaction.selectOne("SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " +
+ transaction.selectOne(SelectModify,
+ "SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " +
"and destination_id = ?", edge.sourceId, edge.destinationId) { row =>
makeEdge(row)
}.map { oldRow =>
@@ -399,7 +409,8 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
insertEdge(transaction, metadata, edge)
} catch {
case e: SQLIntegrityConstraintViolationException =>
- transaction.selectOne("SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " +
+ transaction.selectOne(SelectModify,
+ "SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " +
"and destination_id = ?", edge.sourceId, edge.destinationId) { row =>
makeEdge(row)
}.map { oldRow =>
@@ -473,7 +484,8 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): A = {
try {
queryEvaluator.transaction { transaction =>
- transaction.selectOne("SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ? FOR UPDATE", sourceId) { row =>
+ transaction.selectOne(SelectModify,
+ "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ? FOR UPDATE", sourceId) { row =>
f(transaction, Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time(row.getInt("updated_at").seconds)))
} getOrElse(throw new MissingMetadataRow)
}

0 comments on commit bc0d34f

Please sign in to comment.