Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

DS-144 #81

wants to merge 5 commits into from
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/flockdb/State.scala
Expand Up @@ -16,7 +16,7 @@

package com.twitter.flockdb

abstract class State(val id: Int, val name: String, val ordinal: Int) extends Ordered[State] {
sealed abstract class State(val id: Int, val name: String, val ordinal: Int) extends Ordered[State] {
def max(other: State) = if (this > other) this else other
def compare(s: State) =
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/twitter/flockdb/shards/Optimism.scala
Expand Up @@ -118,6 +118,7 @@ object LockingNodeSet {
implicit def asLockingNodeSet(n: NodeSet[Shard]) = new LockingNodeSet(n)

// TODO: metadataForWrite does not lock the metadata?
class LockingNodeSet(node: NodeSet[Shard]) extends OptimisticStateMonitor {
def getMetadatas(id: Long) = node.all { _.getMetadataForWrite(id)() }
222 changes: 128 additions & 94 deletions src/main/scala/com/twitter/flockdb/shards/SqlShard.scala
Expand Up @@ -115,7 +115,7 @@ CREATE TABLE IF NOT EXISTS %s (

* All methods are externally asynchronous via Futures, but Transactions are only available in a
* context where it is safe to block (a FuturePool), so private methods may take Transactions, with
* context where it is safe to block (a FuturePool), so private methods may take Transactions with
* the understanding that they will be executed in a blocking fashion.
class SqlShard(
Expand All @@ -128,6 +128,7 @@ extends Shard {
private val tablePrefix = shardInfo.tablePrefix
private val randomGenerator = new Random

type EdgeStateChange = (Option[State],State)
import QueryClass._

def get(sourceId: Long, destinationId: Long) = {
Expand All @@ -136,14 +137,13 @@ extends Shard {

def getMetadata(sourceId: Long) = {
lowLatencyQueryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at")))
def getMetadata(sourceId: Long) = getMetadata(lowLatencyQueryEvaluator, sourceId)

def getMetadataForWrite(sourceId: Long) = {
queryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
def getMetadataForWrite(sourceId: Long) = getMetadata(queryEvaluator, sourceId)

/** TODO: separate effectively-static methods like this into a companion object. */
private def getMetadata(localEvaluator: AsyncQueryEvaluator, sourceId: Long) = {
localEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at")))
Expand Down Expand Up @@ -180,29 +180,30 @@ extends Shard {

f flatMap {
_ map (Future.value(_)) getOrElse {
populateMetadata(sourceId, Normal)
count(sourceId, states)
case Some(count) =>
case None =>
// insert metadata, and directly return the computed count
queryEvaluator.transaction { txn =>
populateMetadata(txn, sourceId, Normal)
}.map(_.count).rescue {
case e: SQLIntegrityConstraintViolationException =>
// lost a race: recurse to use the newly inserted value
count(sourceId, states)

private def populateMetadata(sourceId: Long, state: State): Future[Unit] =
populateMetadata(sourceId, state, Time.epoch)

/** TODO: bulk insert? */
private def populateMetadata(sourceId: Long, state: State, updatedAt: Time): Future[Unit] = {
val f = computeCount(sourceId, state) flatMap { count =>
"INSERT INTO " + tablePrefix + "_metadata (source_id, count, state, updated_at) VALUES (?, ?, ?, ?)",
f.unit handle {
case e: SQLIntegrityConstraintViolationException => ()
private def populateMetadata(transaction: Transaction, sourceId: Long, state: State, updatedAt: Time = Time.epoch): Metadata = {
val count = computeCount(transaction, sourceId, state)
"INSERT INTO " + tablePrefix + "_metadata (source_id, count, state, updated_at) VALUES (?, ?, ?, ?)",
new Metadata(sourceId, state, count, updatedAt)

private def computeCount(transaction: Transaction, sourceId: Long, state: State): Int = {
Expand Down Expand Up @@ -390,13 +391,12 @@ extends Shard {

private def insertEdge(transaction: Transaction, metadata: Metadata, edge: Edge): Int = {
val insertedRows =
transaction.execute("INSERT INTO " + tablePrefix + "_edges (source_id, position, " +
"updated_at, destination_id, count, state) VALUES (?, ?, ?, ?, ?, ?)",
edge.sourceId, edge.position, edge.updatedAt.inSeconds,
edge.destinationId, edge.count,
if (edge.state == metadata.state) insertedRows else 0
private def insertEdge(transaction: Transaction, edge: Edge): EdgeStateChange = {
transaction.execute("INSERT INTO " + tablePrefix + "_edges (source_id, position, " +
"updated_at, destination_id, count, state) VALUES (?, ?, ?, ?, ?, ?)",
edge.sourceId, edge.position, edge.updatedAt.inSeconds,
edge.destinationId, edge.count,
(None, edge.state)

def bulkUnsafeInsertEdges(edges: Seq[Edge]): Future[Unit] = {
Expand Down Expand Up @@ -429,9 +429,9 @@ extends Shard {

private def updateEdge(transaction: Transaction, metadata: Metadata, edge: Edge,
oldEdge: Edge): Int = {
if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state) return 0
private def updateEdge(transaction: Transaction, edge: Edge, oldEdge: Edge): EdgeStateChange = {
if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state)
return (Some(oldEdge.state), oldEdge.state)

val updatedRows = if (
oldEdge.state != Archived && // Only update position when coming from removed or negated into normal
Expand All @@ -440,8 +440,7 @@ extends Shard {
) {
transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " +
"position = ?, count = 0, state = ? " +
"WHERE source_id = ? AND destination_id = ? AND " +
"updated_at <= ?",
"WHERE source_id = ? AND destination_id = ? AND updated_at <= ?",
edge.updatedAt.inSeconds, edge.position,,
edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds)
} else {
Expand All @@ -463,56 +462,73 @@ extends Shard {
edge.destinationId, edge.updatedAt.inSeconds)
if (edge.state != oldEdge.state &&
(oldEdge.state == metadata.state || edge.state == metadata.state)) updatedRows else 0

// returns +1, 0, or -1, depending on how the metadata count should change after this operation.
// `predictExistence`=true for normal operations, false for copy/migrate.
val newEdgeState =
updatedRows match {
case 1 => edge.state
case 0 => oldEdge.state
case x =>
throw new AssertionError(
"Invalid update count " + x + ": querying by primary key should make this impossible?"
(Some(oldEdge.state), newEdgeState)

private def writeEdge(transaction: Transaction, metadata: Metadata, edge: Edge,
predictExistence: Boolean): Int = {
val countDelta = if (predictExistence) {
// returns the old and new edge states. `predictExistence`=true for normal
// operations, false for copy/migrate
private def writeEdge(transaction: Transaction, edge: Edge,
predictExistence: Boolean): EdgeStateChange = {
if (predictExistence) {
"SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " +
"and destination_id = ?", edge.sourceId, edge.destinationId) { row =>
}.map { oldRow =>
updateEdge(transaction, metadata, edge, oldRow)
updateEdge(transaction, edge, oldRow)
}.getOrElse {
insertEdge(transaction, metadata, edge)
insertEdge(transaction, edge)
} else {
try {
insertEdge(transaction, metadata, edge)
insertEdge(transaction, edge)
} catch {
case e: SQLIntegrityConstraintViolationException =>
"SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " +
"and destination_id = ?", edge.sourceId, edge.destinationId) { row =>
}.map { oldRow =>
updateEdge(transaction, metadata, edge, oldRow)
updateEdge(transaction, edge, oldRow)
}.getOrElse {
// edge removed within transaction: nothing obvious to do
throw new RuntimeException("Edge disappeared during transaction?", e)
if (edge.state == metadata.state) countDelta else -countDelta

private def write(edge: Edge): Future[Unit] = {
write(edge, deadlockRetries, true)

private def write(edge: Edge, tries: Int, predictExistence: Boolean): Future[Unit] = {
try {
atomically(edge.sourceId) { (transaction, metadata) =>
val countDelta = writeEdge(transaction, metadata, edge, predictExistence)
if (countDelta != 0) {
transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " +
"WHERE source_id = ?", countDelta, edge.sourceId)
queryEvaluator.transaction { transaction =>
// insert the edge, and then acquire the metadata to update/populate its count
val preAndPostStates = writeEdge(transaction, edge, predictExistence)
atomically(transaction, edge.sourceId) { metadataOption => { metadata =>
// metadata already existed: update its count
val countDelta = countDeltaFor(preAndPostStates, metadata.state)
if (countDelta != 0) {
updateCount(transaction, edge.sourceId, countDelta)
}.getOrElse {
// metadata doesn't exist: populate it from scratch (post-edge-insert)
populateMetadata(transaction, edge.sourceId, Normal)
} catch {
}.unit.rescue {
case e: MySQLTransactionRollbackException if (tries > 0) =>
write(edge, tries - 1, predictExistence)
case e: SQLIntegrityConstraintViolationException if (tries > 0) =>
Expand Down Expand Up @@ -549,8 +565,17 @@ extends Shard {

private def countDeltaFor(oldAndNewEdgeState: EdgeStateChange, metadataState: State): Int =
oldAndNewEdgeState match {
case (None, `metadataState`) => 1
case (Some(o), n) if o == n => 0
case (Some(_), `metadataState`) => 1
case (Some(`metadataState`), _) => -1
case (_, _) => 0

private def updateCount(transaction: Transaction, sourceId: Long, countDelta: Int) = {
transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = count + ? " +
transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " +
"WHERE source_id = ?", countDelta, sourceId)

Expand Down Expand Up @@ -591,7 +616,8 @@ extends Shard {
currentSourceId = edge.sourceId
countDelta = 0
countDelta += writeEdge(transaction, metadataById(edge.sourceId), edge, false)
val preAndPostStates = writeEdge(transaction, edge, false)
countDelta += countDeltaFor(preAndPostStates, metadataById(edge.sourceId).state)
updateCount(transaction, currentSourceId, countDelta)
Expand All @@ -602,45 +628,53 @@ extends Shard {

private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): Future[A] = {
private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): Future[A] =
atomically(Seq(sourceId)) { (t, map) => f(t, map(sourceId)) }

* Acquire the given metadata sourceIds FOR UPDATE if they exist, and create them
* if they do not exist.
private def atomically[A](sourceIds: Seq[Long])(f: (Transaction, Map[Long, Metadata]) => A): Future[A] = {
queryEvaluator.transaction { transaction =>

val mdMapBuilder = Map.newBuilder[Long, Metadata]
"SELECT * FROM " + tablePrefix + "_metadata WHERE source_id in (?) FOR UPDATE",
) { row =>
val md = new Metadata(

mdMapBuilder += (row.getLong("source_id") -> md)
queryEvaluator.transaction { txn =>
atomically(txn, sourceIds) { partialMd =>
val fullMd =
if (partialMd.size == sourceIds.length) {
} else {
val missingIds = sourceIds.filterNot(partialMd.contains _)
// TODO: populate should definitely be bulk for this usecase
partialMd ++ { id => (id, populateMetadata(txn, id, Normal)) }
f(txn, fullMd)

val mdMap = mdMapBuilder.result
private def atomically[A](transaction: Transaction, sourceId: Long)(f: Option[Metadata] => A): A =
atomically(transaction, Seq(sourceId)) { md => f(md.get(sourceId)) }

if (mdMap.size < sourceIds.length) {
Left(sourceIds filterNot { mdMap contains _ })
} else {
Right(f(transaction, mdMap))
} flatMap {
case Left(missingMeta) =>
// insert metadata in parallel, then recurse to retry TODO: termination?
Future.join(missingMeta map { populateMetadata(_, Normal) }) flatMap { _ =>
case Right(rv) => Future.value(rv)
* Acquire the given metadata sourceIds FOR UPDATE if they exist: if they do not exist,
* they will be missing from the output map.
private def atomically[A](transaction: Transaction, sourceIds: Seq[Long])(f: Map[Long, Metadata] => A): A = {
val mdMapBuilder = Map.newBuilder[Long, Metadata]
"SELECT * FROM " + tablePrefix + "_metadata WHERE source_id in (?) FOR UPDATE",
) { row =>
val sourceId = row.getLong("source_id")
val md = new Metadata(
mdMapBuilder += (sourceId -> md)

def writeMetadata(metadata: Metadata): Future[Unit] = {
Expand Down