Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

DS-144 #81

Open
wants to merge 5 commits into from

2 participants

@stuhood
Collaborator

Moves writeEdge outside the critical section which updates the metadata count, to avoid or eliminate cache misses while the metadata is locked. Hasn't had any performance testing yet... sometime this week.

src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
@@ -441,14 +441,14 @@ extends Shard {
transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " +
"position = ?, count = 0, state = ? " +
"WHERE source_id = ? AND destination_id = ? AND " +
- "updated_at <= ?",
+ "updated_at <= ? LIMIT 1",

Could you clarify the reason for adding the "LIMIT 1" to all of these UPDATEs? Given that they all specify "source_id = ? AND destination_id = ?", and (source_id, destination_id) is a unique key, this doesn't really change anything, but kind of muddles the understanding of the query (i.e. the human reading may think that this could match multiple rows and only the first should be updated).

Might be worthwhile as a minor tweak to move this onto the WHERE line above just for symmetry with the below two examples that are line broken differently.

@stuhood Collaborator
stuhood added a note

My understanding was originally muddied by the fact that the code directly returned the 'updateCount', so it wasn't clear that it was always either 0 or 1: the case statement below this probably now makes it clear enough that the update statement should hit exactly 0 or 1, so we can remove the limit again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@jeremycole

Assuming that your description of the re-ordered transaction is correct, this looks good.

@stuhood
Collaborator

Hey @eaceaser: would you mind checking this out?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
2  src/main/scala/com/twitter/flockdb/State.scala
@@ -16,7 +16,7 @@
package com.twitter.flockdb
-abstract class State(val id: Int, val name: String, val ordinal: Int) extends Ordered[State] {
+sealed abstract class State(val id: Int, val name: String, val ordinal: Int) extends Ordered[State] {
def max(other: State) = if (this > other) this else other
def compare(s: State) = ordinal.compare(s.ordinal)
}
View
1  src/main/scala/com/twitter/flockdb/shards/Optimism.scala
@@ -118,6 +118,7 @@ object LockingNodeSet {
implicit def asLockingNodeSet(n: NodeSet[Shard]) = new LockingNodeSet(n)
}
+// TODO: metadataForWrite does not lock the metadata?
class LockingNodeSet(node: NodeSet[Shard]) extends OptimisticStateMonitor {
def getMetadatas(id: Long) = node.all { _.getMetadataForWrite(id)() }
}
View
222 src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
@@ -115,7 +115,7 @@ CREATE TABLE IF NOT EXISTS %s (
/**
* All methods are externally asynchronous via Futures, but Transactions are only available in a
- * context where it is safe to block (a FuturePool), so private methods may take Transactions, with
+ * context where it is safe to block (a FuturePool), so private methods may take Transactions with
* the understanding that they will be executed in a blocking fashion.
*/
class SqlShard(
@@ -128,6 +128,7 @@ extends Shard {
private val tablePrefix = shardInfo.tablePrefix
private val randomGenerator = new Random
+ type EdgeStateChange = (Option[State],State)
import QueryClass._
def get(sourceId: Long, destinationId: Long) = {
@@ -136,14 +137,13 @@ extends Shard {
}
}
- def getMetadata(sourceId: Long) = {
- lowLatencyQueryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
- new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at")))
- }
- }
+ def getMetadata(sourceId: Long) = getMetadata(lowLatencyQueryEvaluator, sourceId)
- def getMetadataForWrite(sourceId: Long) = {
- queryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
+ def getMetadataForWrite(sourceId: Long) = getMetadata(queryEvaluator, sourceId)
+
+ /** TODO: separate effectively-static methods like this into a companion object. */
+ private def getMetadata(localEvaluator: AsyncQueryEvaluator, sourceId: Long) = {
+ localEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at")))
}
}
@@ -180,29 +180,30 @@ extends Shard {
}
f flatMap {
- _ map (Future.value(_)) getOrElse {
- populateMetadata(sourceId, Normal)
- count(sourceId, states)
- }
+ case Some(count) =>
+ Future.value(count)
+ case None =>
+ // insert metadata, and directly return the computed count
+ queryEvaluator.transaction { txn =>
+ populateMetadata(txn, sourceId, Normal)
+ }.map(_.count).rescue {
+ case e: SQLIntegrityConstraintViolationException =>
+ // lost a race: recurse to use the newly inserted value
+ count(sourceId, states)
+ }
}
}
- private def populateMetadata(sourceId: Long, state: State): Future[Unit] =
- populateMetadata(sourceId, state, Time.epoch)
-
/** TODO: bulk insert? */
- private def populateMetadata(sourceId: Long, state: State, updatedAt: Time): Future[Unit] = {
- val f = computeCount(sourceId, state) flatMap { count =>
- queryEvaluator.execute(
- "INSERT INTO " + tablePrefix + "_metadata (source_id, count, state, updated_at) VALUES (?, ?, ?, ?)",
- sourceId,
- count,
- state.id,
- updatedAt.inSeconds)
- }
- f.unit handle {
- case e: SQLIntegrityConstraintViolationException => ()
- }
+ private def populateMetadata(transaction: Transaction, sourceId: Long, state: State, updatedAt: Time = Time.epoch): Metadata = {
+ val count = computeCount(transaction, sourceId, state)
+ transaction.execute(
+ "INSERT INTO " + tablePrefix + "_metadata (source_id, count, state, updated_at) VALUES (?, ?, ?, ?)",
+ sourceId,
+ count,
+ state.id,
+ updatedAt.inSeconds)
+ new Metadata(sourceId, state, count, updatedAt)
}
private def computeCount(transaction: Transaction, sourceId: Long, state: State): Int = {
@@ -390,13 +391,12 @@ extends Shard {
}
- private def insertEdge(transaction: Transaction, metadata: Metadata, edge: Edge): Int = {
- val insertedRows =
- transaction.execute("INSERT INTO " + tablePrefix + "_edges (source_id, position, " +
- "updated_at, destination_id, count, state) VALUES (?, ?, ?, ?, ?, ?)",
- edge.sourceId, edge.position, edge.updatedAt.inSeconds,
- edge.destinationId, edge.count, edge.state.id)
- if (edge.state == metadata.state) insertedRows else 0
+ private def insertEdge(transaction: Transaction, edge: Edge): EdgeStateChange = {
+ transaction.execute("INSERT INTO " + tablePrefix + "_edges (source_id, position, " +
+ "updated_at, destination_id, count, state) VALUES (?, ?, ?, ?, ?, ?)",
+ edge.sourceId, edge.position, edge.updatedAt.inSeconds,
+ edge.destinationId, edge.count, edge.state.id)
+ (None, edge.state)
}
def bulkUnsafeInsertEdges(edges: Seq[Edge]): Future[Unit] = {
@@ -429,9 +429,9 @@ extends Shard {
}
}
- private def updateEdge(transaction: Transaction, metadata: Metadata, edge: Edge,
- oldEdge: Edge): Int = {
- if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state) return 0
+ private def updateEdge(transaction: Transaction, edge: Edge, oldEdge: Edge): EdgeStateChange = {
+ if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state)
+ return (Some(oldEdge.state), oldEdge.state)
val updatedRows = if (
oldEdge.state != Archived && // Only update position when coming from removed or negated into normal
@@ -440,8 +440,7 @@ extends Shard {
) {
transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " +
"position = ?, count = 0, state = ? " +
- "WHERE source_id = ? AND destination_id = ? AND " +
- "updated_at <= ?",
+ "WHERE source_id = ? AND destination_id = ? AND updated_at <= ?",
edge.updatedAt.inSeconds, edge.position, edge.state.id,
edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds)
} else {
@@ -463,28 +462,36 @@ extends Shard {
edge.destinationId, edge.updatedAt.inSeconds)
}
}
- if (edge.state != oldEdge.state &&
- (oldEdge.state == metadata.state || edge.state == metadata.state)) updatedRows else 0
- }
- // returns +1, 0, or -1, depending on how the metadata count should change after this operation.
- // `predictExistence`=true for normal operations, false for copy/migrate.
+ val newEdgeState =
+ updatedRows match {
+ case 1 => edge.state
+ case 0 => oldEdge.state
+ case x =>
+ throw new AssertionError(
+ "Invalid update count " + x + ": querying by primary key should make this impossible?"
+ )
+ }
+ (Some(oldEdge.state), newEdgeState)
+ }
- private def writeEdge(transaction: Transaction, metadata: Metadata, edge: Edge,
- predictExistence: Boolean): Int = {
- val countDelta = if (predictExistence) {
+ // returns the old and new edge states. `predictExistence`=true for normal
+ // operations, false for copy/migrate
+ private def writeEdge(transaction: Transaction, edge: Edge,
+ predictExistence: Boolean): EdgeStateChange = {
+ if (predictExistence) {
transaction.selectOne(SelectModify,
"SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " +
"and destination_id = ?", edge.sourceId, edge.destinationId) { row =>
makeEdge(row)
}.map { oldRow =>
- updateEdge(transaction, metadata, edge, oldRow)
+ updateEdge(transaction, edge, oldRow)
}.getOrElse {
- insertEdge(transaction, metadata, edge)
+ insertEdge(transaction, edge)
}
} else {
try {
- insertEdge(transaction, metadata, edge)
+ insertEdge(transaction, edge)
} catch {
case e: SQLIntegrityConstraintViolationException =>
transaction.selectOne(SelectModify,
@@ -492,11 +499,13 @@ extends Shard {
"and destination_id = ?", edge.sourceId, edge.destinationId) { row =>
makeEdge(row)
}.map { oldRow =>
- updateEdge(transaction, metadata, edge, oldRow)
- }.getOrElse(0)
+ updateEdge(transaction, edge, oldRow)
+ }.getOrElse {
+ // edge removed within transaction: nothing obvious to do
+ throw new RuntimeException("Edge disappeared during transaction?", e)
+ }
}
}
- if (edge.state == metadata.state) countDelta else -countDelta
}
private def write(edge: Edge): Future[Unit] = {
@@ -504,15 +513,22 @@ extends Shard {
}
private def write(edge: Edge, tries: Int, predictExistence: Boolean): Future[Unit] = {
- try {
- atomically(edge.sourceId) { (transaction, metadata) =>
- val countDelta = writeEdge(transaction, metadata, edge, predictExistence)
- if (countDelta != 0) {
- transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " +
- "WHERE source_id = ?", countDelta, edge.sourceId)
+ queryEvaluator.transaction { transaction =>
+ // insert the edge, and then acquire the metadata to update/populate its count
+ val preAndPostStates = writeEdge(transaction, edge, predictExistence)
+ atomically(transaction, edge.sourceId) { metadataOption =>
+ metadataOption.map { metadata =>
+ // metadata already existed: update its count
+ val countDelta = countDeltaFor(preAndPostStates, metadata.state)
+ if (countDelta != 0) {
+ updateCount(transaction, edge.sourceId, countDelta)
+ }
+ }.getOrElse {
+ // metadata doesn't exist: populate it from scratch (post-edge-insert)
+ populateMetadata(transaction, edge.sourceId, Normal)
}
}
- } catch {
+ }.unit.rescue {
case e: MySQLTransactionRollbackException if (tries > 0) =>
write(edge, tries - 1, predictExistence)
case e: SQLIntegrityConstraintViolationException if (tries > 0) =>
@@ -549,8 +565,17 @@ extends Shard {
}
}
+ private def countDeltaFor(oldAndNewEdgeState: EdgeStateChange, metadataState: State): Int =
+ oldAndNewEdgeState match {
+ case (None, `metadataState`) => 1
+ case (Some(o), n) if o == n => 0
+ case (Some(_), `metadataState`) => 1
+ case (Some(`metadataState`), _) => -1
+ case (_, _) => 0
+ }
+
private def updateCount(transaction: Transaction, sourceId: Long, countDelta: Int) = {
- transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = count + ? " +
+ transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " +
"WHERE source_id = ?", countDelta, sourceId)
}
@@ -591,7 +616,8 @@ extends Shard {
currentSourceId = edge.sourceId
countDelta = 0
}
- countDelta += writeEdge(transaction, metadataById(edge.sourceId), edge, false)
+ val preAndPostStates = writeEdge(transaction, edge, false)
+ countDelta += countDeltaFor(preAndPostStates, metadataById(edge.sourceId).state)
}
updateCount(transaction, currentSourceId, countDelta)
}
@@ -602,45 +628,53 @@ extends Shard {
}
}
- private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): Future[A] = {
+ private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): Future[A] =
atomically(Seq(sourceId)) { (t, map) => f(t, map(sourceId)) }
- }
+ /**
+ * Acquire the given metadata sourceIds FOR UPDATE if they exist, and create them
+ * if they do not exist.
+ */
private def atomically[A](sourceIds: Seq[Long])(f: (Transaction, Map[Long, Metadata]) => A): Future[A] = {
- queryEvaluator.transaction { transaction =>
-
- val mdMapBuilder = Map.newBuilder[Long, Metadata]
-
- transaction.select(
- SelectModify,
- "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id in (?) FOR UPDATE",
- sourceIds
- ) { row =>
- val md = new Metadata(
- row.getLong("source_id"),
- State(row.getInt("state")),
- row.getInt("count"),
- Time.fromSeconds(row.getInt("updated_at"))
- )
-
- mdMapBuilder += (row.getLong("source_id") -> md)
+ queryEvaluator.transaction { txn =>
+ atomically(txn, sourceIds) { partialMd =>
+ val fullMd =
+ if (partialMd.size == sourceIds.length) {
+ partialMd
+ } else {
+ val missingIds = sourceIds.filterNot(partialMd.contains _)
+ // TODO: populate should definitely be bulk for this usecase
+ partialMd ++ missingIds.map { id => (id, populateMetadata(txn, id, Normal)) }
+ }
+ f(txn, fullMd)
}
+ }
+ }
- val mdMap = mdMapBuilder.result
+ private def atomically[A](transaction: Transaction, sourceId: Long)(f: Option[Metadata] => A): A =
+ atomically(transaction, Seq(sourceId)) { md => f(md.get(sourceId)) }
- if (mdMap.size < sourceIds.length) {
- Left(sourceIds filterNot { mdMap contains _ })
- } else {
- Right(f(transaction, mdMap))
- }
- } flatMap {
- case Left(missingMeta) =>
- // insert metadata in parallel, then recurse to retry TODO: termination?
- Future.join(missingMeta map { populateMetadata(_, Normal) }) flatMap { _ =>
- atomically(sourceIds)(f)
- }
- case Right(rv) => Future.value(rv)
+ /**
+ * Acquire the given metadata sourceIds FOR UPDATE if they exist: if they do not exist,
+ * they will be missing from the output map.
+ */
+ private def atomically[A](transaction: Transaction, sourceIds: Seq[Long])(f: Map[Long, Metadata] => A): A = {
+ val mdMapBuilder = Map.newBuilder[Long, Metadata]
+ transaction.select(
+ SelectModify,
+ "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id in (?) FOR UPDATE",
+ sourceIds
+ ) { row =>
+ val sourceId = row.getLong("source_id")
+ val md = new Metadata(
+ sourceId,
+ State(row.getInt("state")),
+ row.getInt("count"),
+ Time.fromSeconds(row.getInt("updated_at"))
+ )
+ mdMapBuilder += (sourceId -> md)
}
+ f(mdMapBuilder.result)
}
def writeMetadata(metadata: Metadata): Future[Unit] = {
Something went wrong with that request. Please try again.