Permalink
Browse files

optimize sql

  • Loading branch information...
Kyle Maxwell
Kyle Maxwell committed Sep 28, 2010
1 parent 2f34bd1 commit 8c19cbd3033ef3eaedeed3d23d4df22974168690
View
@@ -3,6 +3,6 @@
project.organization=com.twitter
project.name=flockdb
sbt.version=0.7.4
-project.version=2.0.glock2-SNAPSHOT
+project.version=2.0.1.glock2-SNAPSHOT
build.scala.versions=2.7.7
project.initialize=false
@@ -5,9 +5,9 @@ import com.twitter.sbt._
class FlockDBProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher with NoisyDependencies{
val configgy = "net.lag" % "configgy" % "1.6.1"
val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2"
- val gizzard = "com.twitter" % "gizzard" % "1.3.18"
+ val gizzard = "com.twitter" % "gizzard" % "1.4.2-SNAPSHOT"
val kestrel = "net.lag" % "kestrel" % "1.2"
- val mysqljdbc = "mysql" % "mysql-connector-java" % "5.1.6"
+ val mysqljdbc = "mysql" % "mysql-connector-java" % "5.1.13"
// val ostrich = "com.twitter" % "ostrich" % "1.2.1"
val pool = "commons-pool" % "commons-pool" % "1.3"
val querulous = "com.twitter" % "querulous" % "1.3-SNAPSHOT"
@@ -3,5 +3,5 @@ import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val scalaTools = "scala-tools.org" at "http://scala-tools.org/repo-releases/"
val twitterMaven = "twitter.com" at "http://maven.twttr.com/"
- val defaultProject = "com.twitter" % "standard-project" % "0.7.5"
+ val defaultProject = "com.twitter" % "standard-project" % "0.7.7"
}
@@ -51,6 +51,9 @@ import thrift.FlockException
object FlockDB {
+ val arbitraryDuration = new Duration(6000)
+
+
def statsCollector(w3c: W3CStats) = {
new StatsCollector {
def incr(name: String, count: Int) = w3c.incr(name, count)
@@ -65,15 +68,15 @@ object FlockDB {
val replicationFuture = new Future("ReplicationFuture", config.configMap("edges.replication.future"))
val shardRepository = new nameserver.BasicShardRepository[shards.Shard](
- new shards.ReadWriteShardAdapter(_), replicationFuture)
+ new shards.ReadWriteShardAdapter(_), Some(replicationFuture), arbitraryDuration)
shardRepository += ("com.twitter.flockdb.SqlShard" -> new shards.SqlShardFactory(dbQueryEvaluatorFactory, materializingQueryEvaluatorFactory, config))
// for backward compat:
shardRepository.setupPackage("com.twitter.service.flock.edges")
shardRepository += ("com.twitter.service.flock.edges.SqlShard" -> new shards.SqlShardFactory(dbQueryEvaluatorFactory, materializingQueryEvaluatorFactory, config))
shardRepository += ("com.twitter.service.flock.edges.BlackHoleShard" -> new shards.BlackHoleShardFactory)
val nameServer = nameserver.NameServer(config.configMap("edges.nameservers"), Some(stats),
- shardRepository, replicationFuture)
+ shardRepository, Some(replicationFuture))
val polymorphicJobParser = new PolymorphicJobParser
val jobParser = new LoggingJobParser(Stats, w3c, new JobWithTasksParser(polymorphicJobParser))
@@ -23,12 +23,14 @@ abstract case class State(id: Int, name: String, ordinal: Int) {
object State {
def apply(id: Int) = id match {
+ case Uninitialized.id => Uninitialized
case Normal.id => Normal
case Removed.id => Removed
case Archived.id => Archived
case Negative.id => Negative
}
+ case object Uninitialized extends State(-1, "Uninitialized", -1)
case object Normal extends State(0, "Normal", 0)
case object Negative extends State(3, "Negative", 1)
case object Archived extends State(2, "Archived", 2)
@@ -26,7 +26,7 @@ import com.twitter.xrayspecs.TimeConversions._
import net.lag.logging.Logger
import com.twitter.flockdb.shards.Metadata
import shards.Shard
-
+import net.lag.logging.Logger
object Copy {
type Cursor = (results.Cursor, results.Cursor)
@@ -88,12 +88,18 @@ object MetadataCopyParser extends gizzard.jobs.CopyParser[Shard] {
class MetadataCopy(sourceShardId: ShardId, destinationShardId: ShardId, cursor: MetadataCopy.Cursor,
count: Int)
extends gizzard.jobs.Copy[Shard](sourceShardId, destinationShardId, count) {
+
+ val log = Logger.get(getClass.getName)
+
def this(sourceShardId: ShardId, destinationShardId: ShardId, cursor: MetadataCopy.Cursor) =
this(sourceShardId, destinationShardId, cursor, Copy.COUNT)
-
+
def copyPage(sourceShard: Shard, destinationShard: Shard, count: Int) = {
+ log.info("selecting")
val (items, newCursor) = sourceShard.selectAllMetadata(cursor, count)
+ log.info("updating")
destinationShard.writeMetadataState(items)
+ log.info("updated")
Stats.incr("edges-copy", items.size)
if (newCursor == MetadataCopy.END)
Some(new Copy(sourceShardId, destinationShardId, Copy.START))
@@ -18,7 +18,7 @@ package com.twitter.flockdb.shards
import java.sql.{ResultSet, SQLException, SQLIntegrityConstraintViolationException}
import scala.collection.mutable
-import com.twitter.gizzard.proxy.SqlExceptionWrappingProxy
+import com.twitter.gizzard.proxy.ShardExceptionWrappingQueryEvaluator
import com.twitter.gizzard.shards
import com.twitter.results.{Cursor, ResultWindow}
import com.twitter.querulous.evaluator.{QueryEvaluator, QueryEvaluatorFactory, Transaction}
@@ -62,7 +62,7 @@ CREATE TABLE IF NOT EXISTS %s (
def instantiate(shardInfo: shards.ShardInfo, weight: Int, children: Seq[Shard]) = {
val queryEvaluator = instantiatingQueryEvaluatorFactory(List(shardInfo.hostname), config("edges.db_name"), config("db.username"), config("db.password"))
- SqlExceptionWrappingProxy[Shard](new SqlShard(queryEvaluator, shardInfo, weight, children, config))
+ new SqlShard(new ShardExceptionWrappingQueryEvaluator(shardInfo.id, queryEvaluator), shardInfo, weight, children, config)
}
def materialize(shardInfo: shards.ShardInfo) = {
@@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS %s (
queryEvaluator.execute(METADATA_TABLE_DDL.format(config("edges.db_name") + "." + shardInfo.tablePrefix + "_metadata", shardInfo.sourceType))
} catch {
case e: SQLException => throw new shards.ShardException(e.toString)
- case e: SqlQueryTimeoutException => throw new shards.ShardTimeoutException(e.timeout, e)
+ case e: SqlQueryTimeoutException => throw new shards.ShardTimeoutException(e.timeout, shardInfo.id, e)
}
}
}
@@ -272,31 +272,31 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
}
def add(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time) = {
- write(Seq(Edge(sourceId, destinationId, position, updatedAt, Normal)))
+ write(Set(Edge(sourceId, destinationId, position, updatedAt, Normal)))
}
def add(sourceId: Long, updatedAt: Time) {
updateMetadata(sourceId, Normal, updatedAt)
}
def negate(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time) = {
- write(Seq(Edge(sourceId, destinationId, position, updatedAt, Negative)))
+ write(Set(Edge(sourceId, destinationId, position, updatedAt, Negative)))
}
def negate(sourceId: Long, updatedAt: Time) {
updateMetadata(sourceId, Negative, updatedAt)
}
def remove(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time) = {
- write(Seq(Edge(sourceId, destinationId, position, updatedAt, Removed)))
+ write(Set(Edge(sourceId, destinationId, position, updatedAt, Removed)))
}
def remove(sourceId: Long, updatedAt: Time) {
updateMetadata(sourceId, Removed, updatedAt)
}
def archive(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time) = {
- write(Seq(Edge(sourceId, destinationId, position, updatedAt, Archived)))
+ write(Set(Edge(sourceId, destinationId, position, updatedAt, Archived)))
}
def archive(sourceId: Long, updatedAt: Time) {
@@ -316,7 +316,7 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
private class MissingMetadataRow extends Exception("Missing Count Row")
- private def write(edges: Seq[Edge]) {
+ private def write(edges: Set[Edge]) {
write(edges, config("errors.deadlock_retries").toInt)
}
@@ -327,81 +327,141 @@ class SqlShard(private val queryEvaluator: QueryEvaluator, val shardInfo: shards
private def state_priority(state: String): String = "-IF(" + state + "=0, 4, " + state + ")"
- private def write(edges: Seq[Edge], tries: Int) {
- try {
- initializeMetadata(edges.map(_.sourceId))
- initializeEdges(edges)
- val query = "UPDATE " + tablePrefix + "_metadata AS metadata, " + tablePrefix + "_edges AS edges " +
- "SET " +
- " metadata.count0 = metadata.count0 + " + incr(0, "?") + "," +
- " metadata.count1 = metadata.count1 + " + incr(1, "?") + "," +
- " metadata.count2 = metadata.count2 + " + incr(2, "?") + "," +
- " metadata.count3 = metadata.count3 + " + incr(3, "?") + "," +
- " edges.state = ?, " +
- " edges.position = ?, " +
- " edges.updated_at = ? " +
- "WHERE (edges.updated_at < ? OR (edges.updated_at = ? AND " +
- "(" + state_priority("edges.state") + " < " + state_priority("?") + ")))" +
- " AND edges.source_id = ? " +
- " AND edges.destination_id = ? " +
- " AND metadata.source_id = ? "
- queryEvaluator.executeBatch(query){ p =>
- edges.foreach { edge =>
- p(edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.position, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.state.id, edge.state.id, edge.sourceId, edge.destinationId, edge.sourceId)
+ private def write(edges: Set[Edge], tries: Int) {
+ if (!edges.isEmpty) {
+ try {
+ log.info("starting transaction")
+ queryEvaluator.transaction { transaction =>
+ log.info("inserting metadata")
+ initializeMetadata(transaction, edges.map(_.sourceId))
+ log.info("inserting edges")
+ initializeEdges(transaction, edges)
+ log.info("edges inserted")
+ val query = "UPDATE " + tablePrefix + "_metadata AS metadata, " + tablePrefix + "_edges AS edges " +
+ "SET " +
+ " metadata.count0 = metadata.count0 + " + incr(0, "?") + "," +
+ " metadata.count1 = metadata.count1 + " + incr(1, "?") + "," +
+ " metadata.count2 = metadata.count2 + " + incr(2, "?") + "," +
+ " metadata.count3 = metadata.count3 + " + incr(3, "?") + "," +
+ " edges.state = ?, " +
+ " edges.position = ?, " +
+ " edges.updated_at = ? " +
+ "WHERE (edges.updated_at < ? OR (edges.updated_at = ? AND " +
+ "(" + state_priority("edges.state") + " < " + state_priority("?") + ")))" +
+ " AND edges.source_id = ? " +
+ " AND edges.destination_id = ? " +
+ " AND metadata.source_id = ? "
+ transaction.executeBatch(query){ p =>
+ log.info("writing edges")
+ edges.foreach { edge =>
+ p(edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.state.id, edge.position, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.updatedAt.inSeconds, edge.state.id, edge.state.id, edge.sourceId, edge.destinationId, edge.sourceId)
+ }
+ log.info("executing batch")
+ }
+ log.info("ending transaction")
}
+ log.info("transaction ended")
+ } catch {
+ case e: MySQLTransactionRollbackException if (tries > 0) =>
+ write(edges, tries - 1)
}
- } catch {
- case e: MySQLTransactionRollbackException if (tries > 0) =>
- write(edges, tries - 1)
}
}
- def writeCopies(edges: Seq[Edge]) = write(edges)
+ def writeCopies(edges: Seq[Edge]) = write(Set(edges:_*))
+ def writeCopies(edges: Set[Edge]) = write(edges)
- @deprecated
- private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): A = {
- try {
- queryEvaluator.transaction { transaction =>
- transaction.selectOne("SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ? FOR UPDATE", sourceId) { row =>
- f(transaction, Metadata(sourceId, State(row.getInt("state")), row.getInt("count0"), row.getInt("count1"), row.getInt("count2"), row.getInt("count3"), Time(row.getInt("updated_at").seconds)))
- } getOrElse(throw new MissingMetadataRow)
- }
- } catch {
- case e: MissingMetadataRow =>
- initializeMetadata(sourceId)
- atomically(sourceId)(f)
+ def initializeMetadata(sourceId: Long): Unit = initializeMetadata(Seq(sourceId))
+
+ def initializeMetadata(sourceIds: Seq[Long]): Unit = initializeMetadata(Set(sourceIds: _*))
+
+ def initializeMetadata(sourceIds: Set[Long]): Unit = initializeMetadata(queryEvaluator, sourceIds)
+
+ def initializeMetadata(queryEvaluator: QueryEvaluator, sourceIds: Set[Long]): Unit = {
+ val newIds = sourceIds -- existingMetadata(sourceIds)
+ if (!newIds.isEmpty) {
+ val values = newIds.map("(" + _ + ")").mkString(",")
+ val query = "INSERT IGNORE INTO " + tablePrefix + "_metadata (source_id) VALUES " + values
+ queryEvaluator.execute(query)
}
}
+ private def existingMetadata(ids: Collection[Long]): Seq[Long] = {
+ queryEvaluator.select("SELECT source_id FROM " + tablePrefix + "_metadata WHERE source_id IN (?)", ids.toList) { row =>
+ row.getLong("source_id")
+ }
+ }
- def initializeMetadata(sourceId: Long): Unit = initializeMetadata(Seq(sourceId))
+ private def existingEdges(edges: Collection[Edge]) = {
+ val where = edges.map{edge => "(source_id = " + edge.sourceId + " AND destination_id=" +edge.destinationId+")"}.mkString(" OR ")
+ val query = "SELECT source_id, destination_id FROM " + tablePrefix + "_edges WHERE " + where
- def initializeMetadata(sourceIds: Seq[Long]): Unit = {
- val values = sourceIds.map("(" + _ + ")").mkString(",")
- val query = "INSERT IGNORE INTO " + tablePrefix + "_metadata (source_id) VALUES " + values
- queryEvaluator.execute(query)
+ val set = new mutable.HashSet[(Long, Long)]
+ queryEvaluator.select(query) { row =>
+ set += ((row.getLong("source_id"), row.getLong("destination_id")))
+ }
+ set
}
- def initializeEdges(edges: Seq[Edge]) = {
- val values = edges.map{ edge => "(" + edge.sourceId + ", " + edge.destinationId + ", 0, "+edge.position+", -1)"}.mkString(",")
- val query = "INSERT IGNORE INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, position, state) VALUES " + values
- queryEvaluator.execute(query)
- }
+ def initializeEdges(edges: Set[Edge]): Unit = initializeEdges(queryEvaluator, edges)
- // writeMetadataState(Metadata(sourceId, Normal, 0, Time.now))
+ def initializeEdges(queryEvaluator: QueryEvaluator, edges: Set[Edge]) = {
+ if (!edges.isEmpty) {
+ val existing = existingEdges(edges)
+ val filtered = edges.filter{ edge => !existing.contains((edge.sourceId, edge.destinationId)) }
+ if(!filtered.isEmpty){
+ val values = filtered.map{ edge => "(" + edge.sourceId + ", " + edge.destinationId + ", 0, "+edge.position+", -1)"}.mkString(",")
+ val query = "INSERT IGNORE INTO " + tablePrefix + "_edges (source_id, destination_id, updated_at, position, state) VALUES " + values
+ queryEvaluator.execute(query)
+ }
+ }
+ }
def writeMetadataState(metadatas: Seq[Metadata]) = {
- def update_value_if_newer_or_better(column: String) = column + "=IF(updated_at < VALUES(updated_at) OR (updated_at = VALUES(updated_at) AND "+state_priority("state") + " < "+state_priority("VALUES(state)")+"), VALUES(" + column + "), " + column + ")"
+ if (metadatas.length > 0) {
+ def update_value_if_newer_or_better(column: String) = {
+ column +
+ "=IF(updated_at < VALUES(updated_at) OR (updated_at = VALUES(updated_at) AND "+
+ state_priority("state") + " < "+state_priority("VALUES(state)")+"), VALUES(" + column + "), " + column + ")"
+ }
- val query = "INSERT INTO " + tablePrefix + "_metadata " +
- "(source_id, state, updated_at) VALUES " +
- List.make(metadatas.length, "(?, ?, ?)").mkString(", ") +
- " ON DUPLICATE KEY UPDATE " +
- update_value_if_newer_or_better("state") + " , " +
- update_value_if_newer_or_better("updated_at")
- val params = metadatas.foldLeft(List[Any]())((memo, m) => memo ++ List(m.sourceId, m.state.id, m.updatedAt.inSeconds))
+ val mSet = Set(metadatas:_*)
+ val existingIds = Set(existingMetadata(metadatas.map(_.sourceId)):_*)
+ val existingMetadatas = metadatas.filter{ metadata => existingIds.contains(metadata.sourceId) }
+ val insertable = mSet -- existingMetadatas
+
+ if (!insertable.isEmpty) {
+ val query = "INSERT INTO " + tablePrefix + "_metadata " +
+ "(source_id, state, updated_at) VALUES " +
+ List.make(metadatas.length, "(?, ?, ?)").mkString(", ") +
+ " ON DUPLICATE KEY UPDATE " +
+ update_value_if_newer_or_better("state") + " , " +
+ update_value_if_newer_or_better("updated_at")
+
+ val params = insertable.foldLeft(List[Any]())((memo, m) => memo ++ List(m.sourceId, m.state.id, m.updatedAt.inSeconds))
+ log.info("executing insert")
+ queryEvaluator.execute(query, params: _*)
+ log.info("executed insert")
+ }
- queryEvaluator.execute(query, params: _*)
+ if(!existingMetadatas.isEmpty) {
+ val updateQuery = "UPDATE " + tablePrefix + "_metadata SET " +
+ "updated_at = ?, state = ? WHERE " +
+ "source_id = ? " +
+ "AND (updated_at < ? OR (updated_at = ? AND " +
+ state_priority("state") + " < " + state_priority("?") + "))"
+
+ log.info("batching update")
+ queryEvaluator.executeBatch(updateQuery){ p =>
+ log.info("appending rows")
+ existingMetadatas.foreach { data =>
+ p(data.updatedAt.inSeconds, data.state.id, data.sourceId, data.updatedAt.inSeconds, data.updatedAt.inSeconds, data.state.id, data.state.id)
+ }
+ log.info("executing update")
+ }
+ log.info("executed update")
+ }
+ }
}
def writeMetadataState(metadata: Metadata) = this.writeMetadataState(List(metadata))

0 comments on commit 8c19cbd

Please sign in to comment.