From ad9b0b6d644596fefc040be632350b755c98c238 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Thu, 1 Mar 2012 21:20:37 -0800 Subject: [PATCH 1/5] Extra metadata from the insert/update methods, to make them more reusable --- .../scala/com/twitter/flockdb/State.scala | 2 +- .../com/twitter/flockdb/shards/SqlShard.scala | 78 ++++++++++++------- 2 files changed, 50 insertions(+), 30 deletions(-) diff --git a/src/main/scala/com/twitter/flockdb/State.scala b/src/main/scala/com/twitter/flockdb/State.scala index f54fa093..e4599512 100644 --- a/src/main/scala/com/twitter/flockdb/State.scala +++ b/src/main/scala/com/twitter/flockdb/State.scala @@ -16,7 +16,7 @@ package com.twitter.flockdb -abstract class State(val id: Int, val name: String, val ordinal: Int) extends Ordered[State] { +sealed abstract class State(val id: Int, val name: String, val ordinal: Int) extends Ordered[State] { def max(other: State) = if (this > other) this else other def compare(s: State) = ordinal.compare(s.ordinal) } diff --git a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala index c211e8ee..044bbe4d 100644 --- a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala +++ b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala @@ -115,7 +115,7 @@ CREATE TABLE IF NOT EXISTS %s ( /** * All methods are externally asynchronous via Futures, but Transactions are only available in a - * context where it is safe to block (a FuturePool), so private methods may take Transactions, with + * context where it is safe to block (a FuturePool), so private methods may take Transactions with * the understanding that they will be executed in a blocking fashion. */ class SqlShard( @@ -128,6 +128,7 @@ extends Shard { private val tablePrefix = shardInfo.tablePrefix private val randomGenerator = new Random + type EdgeStateChange = (Option[State],State) import QueryClass._ def get(sourceId: Long, destinationId: Long) = { @@ -390,13 +391,12 @@ extends Shard { } - private def insertEdge(transaction: Transaction, metadata: Metadata, edge: Edge): Int = { - val insertedRows = - transaction.execute("INSERT INTO " + tablePrefix + "_edges (source_id, position, " + - "updated_at, destination_id, count, state) VALUES (?, ?, ?, ?, ?, ?)", - edge.sourceId, edge.position, edge.updatedAt.inSeconds, - edge.destinationId, edge.count, edge.state.id) - if (edge.state == metadata.state) insertedRows else 0 + private def insertEdge(transaction: Transaction, edge: Edge): EdgeStateChange = { + transaction.execute("INSERT INTO " + tablePrefix + "_edges (source_id, position, " + + "updated_at, destination_id, count, state) VALUES (?, ?, ?, ?, ?, ?)", + edge.sourceId, edge.position, edge.updatedAt.inSeconds, + edge.destinationId, edge.count, edge.state.id) + (None, edge.state) } def bulkUnsafeInsertEdges(edges: Seq[Edge]): Future[Unit] = { @@ -429,9 +429,9 @@ extends Shard { } } - private def updateEdge(transaction: Transaction, metadata: Metadata, edge: Edge, - oldEdge: Edge): Int = { - if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state) return 0 + private def updateEdge(transaction: Transaction, edge: Edge, oldEdge: Edge): EdgeStateChange = { + if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state) + return (Some(oldEdge.state), oldEdge.state) val updatedRows = if ( oldEdge.state != Archived && // Only update position when coming from removed or negated into normal @@ -441,14 +441,14 @@ extends Shard { transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " + "position = ?, count = 0, state = ? " + "WHERE source_id = ? AND destination_id = ? AND " + - "updated_at <= ?", + "updated_at <= ? LIMIT 1", edge.updatedAt.inSeconds, edge.position, edge.state.id, edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds) } else { try { transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " + "count = 0, state = ? " + - "WHERE source_id = ? AND destination_id = ? AND updated_at <= ?", + "WHERE source_id = ? AND destination_id = ? AND updated_at <= ? LIMIT 1", edge.updatedAt.inSeconds, edge.state.id, edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds) } catch { @@ -457,34 +457,40 @@ extends Shard { // FIXME: hacky. remove with the new schema. transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " + "count = 0, state = ?, position = position + ? " + - "WHERE source_id = ? AND destination_id = ? AND updated_at <= ?", + "WHERE source_id = ? AND destination_id = ? AND updated_at <= ? LIMIT 1", edge.updatedAt.inSeconds, edge.state.id, (randomGenerator.nextInt() % 999) + 1, edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds) } } - if (edge.state != oldEdge.state && - (oldEdge.state == metadata.state || edge.state == metadata.state)) updatedRows else 0 - } - // returns +1, 0, or -1, depending on how the metadata count should change after this operation. - // `predictExistence`=true for normal operations, false for copy/migrate. + val newEdgeState = + updatedRows match { + case 1 => edge.state + case 0 => oldEdge.state + case x => + throw new AssertionError("Invalid update count " + x + ": LIMIT statement not applied?") + } + (Some(oldEdge.state), newEdgeState) + } + // returns the old and new edge states. `predictExistence`=true for normal + // operations, false for copy/migrate private def writeEdge(transaction: Transaction, metadata: Metadata, edge: Edge, - predictExistence: Boolean): Int = { - val countDelta = if (predictExistence) { + predictExistence: Boolean): EdgeStateChange = { + if (predictExistence) { transaction.selectOne(SelectModify, "SELECT * FROM " + tablePrefix + "_edges WHERE source_id = ? " + "and destination_id = ?", edge.sourceId, edge.destinationId) { row => makeEdge(row) }.map { oldRow => - updateEdge(transaction, metadata, edge, oldRow) + updateEdge(transaction, edge, oldRow) }.getOrElse { - insertEdge(transaction, metadata, edge) + insertEdge(transaction, edge) } } else { try { - insertEdge(transaction, metadata, edge) + insertEdge(transaction, edge) } catch { case e: SQLIntegrityConstraintViolationException => transaction.selectOne(SelectModify, @@ -492,11 +498,13 @@ extends Shard { "and destination_id = ?", edge.sourceId, edge.destinationId) { row => makeEdge(row) }.map { oldRow => - updateEdge(transaction, metadata, edge, oldRow) - }.getOrElse(0) + updateEdge(transaction, edge, oldRow) + }.getOrElse { + // edge removed within transaction: nothing obvious to do + throw new RuntimeException("Edge disappeared during transaction?", e) + } } } - if (edge.state == metadata.state) countDelta else -countDelta } private def write(edge: Edge): Future[Unit] = { @@ -506,7 +514,8 @@ extends Shard { private def write(edge: Edge, tries: Int, predictExistence: Boolean): Future[Unit] = { try { atomically(edge.sourceId) { (transaction, metadata) => - val countDelta = writeEdge(transaction, metadata, edge, predictExistence) + val preAndPostStates = writeEdge(transaction, metadata, edge, predictExistence) + val countDelta = countDeltaFor(preAndPostStates, metadata.state) if (countDelta != 0) { transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " + "WHERE source_id = ?", countDelta, edge.sourceId) @@ -549,6 +558,15 @@ extends Shard { } } + private def countDeltaFor(oldAndNewEdgeState: EdgeStateChange, metadataState: State): Int = + oldAndNewEdgeState match { + case (None, `metadataState`) => 1 + case (Some(o), n) if o == n => 0 + case (Some(_), `metadataState`) => 1 + case (Some(`metadataState`), _) => -1 + case (_, _) => 0 + } + private def updateCount(transaction: Transaction, sourceId: Long, countDelta: Int) = { transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = count + ? " + "WHERE source_id = ?", countDelta, sourceId) @@ -591,7 +609,9 @@ extends Shard { currentSourceId = edge.sourceId countDelta = 0 } - countDelta += writeEdge(transaction, metadataById(edge.sourceId), edge, false) + val metadataForId = metadataById(edge.sourceId) + val preAndPostStates = writeEdge(transaction, metadataForId, edge, false) + countDelta += countDeltaFor(preAndPostStates, metadataForId.state) } updateCount(transaction, currentSourceId, countDelta) } From 1c3d2285d9ac12ba640d8a42afb994c1e3debca3 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Fri, 2 Mar 2012 19:39:36 -0800 Subject: [PATCH 2/5] Oops, forgot to remove a param --- src/main/scala/com/twitter/flockdb/shards/SqlShard.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala index 044bbe4d..87e84cba 100644 --- a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala +++ b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala @@ -476,7 +476,7 @@ extends Shard { // returns the old and new edge states. `predictExistence`=true for normal // operations, false for copy/migrate - private def writeEdge(transaction: Transaction, metadata: Metadata, edge: Edge, + private def writeEdge(transaction: Transaction, edge: Edge, predictExistence: Boolean): EdgeStateChange = { if (predictExistence) { transaction.selectOne(SelectModify, @@ -514,7 +514,7 @@ extends Shard { private def write(edge: Edge, tries: Int, predictExistence: Boolean): Future[Unit] = { try { atomically(edge.sourceId) { (transaction, metadata) => - val preAndPostStates = writeEdge(transaction, metadata, edge, predictExistence) + val preAndPostStates = writeEdge(transaction, edge, predictExistence) val countDelta = countDeltaFor(preAndPostStates, metadata.state) if (countDelta != 0) { transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " + @@ -609,9 +609,8 @@ extends Shard { currentSourceId = edge.sourceId countDelta = 0 } - val metadataForId = metadataById(edge.sourceId) - val preAndPostStates = writeEdge(transaction, metadataForId, edge, false) - countDelta += countDeltaFor(preAndPostStates, metadataForId.state) + val preAndPostStates = writeEdge(transaction, edge, false) + countDelta += countDeltaFor(preAndPostStates, metadataById(edge.sourceId).state) } updateCount(transaction, currentSourceId, countDelta) } From 95d446f9e201fbbae4d24a4501deb72604409c2a Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Fri, 2 Mar 2012 21:22:39 -0800 Subject: [PATCH 3/5] Split atomically() to allow it to receive a transaction, rather than creating one. --- .../com/twitter/flockdb/shards/Optimism.scala | 1 + .../com/twitter/flockdb/shards/SqlShard.scala | 124 ++++++++++-------- 2 files changed, 67 insertions(+), 58 deletions(-) diff --git a/src/main/scala/com/twitter/flockdb/shards/Optimism.scala b/src/main/scala/com/twitter/flockdb/shards/Optimism.scala index c1bd5c08..7dc19046 100644 --- a/src/main/scala/com/twitter/flockdb/shards/Optimism.scala +++ b/src/main/scala/com/twitter/flockdb/shards/Optimism.scala @@ -118,6 +118,7 @@ object LockingNodeSet { implicit def asLockingNodeSet(n: NodeSet[Shard]) = new LockingNodeSet(n) } +// TODO: metadataForWrite does not lock the metadata? class LockingNodeSet(node: NodeSet[Shard]) extends OptimisticStateMonitor { def getMetadatas(id: Long) = node.all { _.getMetadataForWrite(id)() } } diff --git a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala index 87e84cba..f661c8f3 100644 --- a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala +++ b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala @@ -137,14 +137,13 @@ extends Shard { } } - def getMetadata(sourceId: Long) = { - lowLatencyQueryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row => - new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at"))) - } - } + def getMetadata(sourceId: Long) = getMetadata(lowLatencyQueryEvaluator, sourceId) + + def getMetadataForWrite(sourceId: Long) = getMetadata(queryEvaluator, sourceId) - def getMetadataForWrite(sourceId: Long) = { - queryEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row => + /** TODO: separate effectively-static methods like this into a companion object. */ + private def getMetadata(localEvaluator: AsyncQueryEvaluator, sourceId: Long) = { + localEvaluator.selectOne(SelectMetadata, "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row => new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at"))) } } @@ -181,29 +180,30 @@ extends Shard { } f flatMap { - _ map (Future.value(_)) getOrElse { - populateMetadata(sourceId, Normal) - count(sourceId, states) - } + case Some(count) => + Future.value(count) + case None => + // insert metadata, and directly return the computed count + queryEvaluator.transaction { txn => + populateMetadata(txn, sourceId, Normal) + }.map(_.count).rescue { + case e: SQLIntegrityConstraintViolationException => + // lost a race: recurse to use the newly inserted value + count(sourceId, states) + } } } - private def populateMetadata(sourceId: Long, state: State): Future[Unit] = - populateMetadata(sourceId, state, Time.epoch) - /** TODO: bulk insert? */ - private def populateMetadata(sourceId: Long, state: State, updatedAt: Time): Future[Unit] = { - val f = computeCount(sourceId, state) flatMap { count => - queryEvaluator.execute( - "INSERT INTO " + tablePrefix + "_metadata (source_id, count, state, updated_at) VALUES (?, ?, ?, ?)", - sourceId, - count, - state.id, - updatedAt.inSeconds) - } - f.unit handle { - case e: SQLIntegrityConstraintViolationException => () - } + private def populateMetadata(transaction: Transaction, sourceId: Long, state: State, updatedAt: Time = Time.epoch): Metadata = { + val count = computeCount(transaction, sourceId, state) + transaction.execute( + "INSERT INTO " + tablePrefix + "_metadata (source_id, count, state, updated_at) VALUES (?, ?, ?, ?)", + sourceId, + count, + state.id, + updatedAt.inSeconds) + new Metadata(sourceId, state, count, updatedAt) } private def computeCount(transaction: Transaction, sourceId: Long, state: State): Int = { @@ -621,45 +621,53 @@ extends Shard { } } - private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): Future[A] = { + private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): Future[A] = atomically(Seq(sourceId)) { (t, map) => f(t, map(sourceId)) } - } + /** + * Acquire the given metadata sourceIds FOR UPDATE if they exist, and create them + * if they do not exist. + */ private def atomically[A](sourceIds: Seq[Long])(f: (Transaction, Map[Long, Metadata]) => A): Future[A] = { - queryEvaluator.transaction { transaction => - - val mdMapBuilder = Map.newBuilder[Long, Metadata] - - transaction.select( - SelectModify, - "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id in (?) FOR UPDATE", - sourceIds - ) { row => - val md = new Metadata( - row.getLong("source_id"), - State(row.getInt("state")), - row.getInt("count"), - Time.fromSeconds(row.getInt("updated_at")) - ) - - mdMapBuilder += (row.getLong("source_id") -> md) + queryEvaluator.transaction { txn => + atomically(txn, sourceIds) { partialMd => + val fullMd = + if (partialMd.size == sourceIds.length) { + partialMd + } else { + val missingIds = sourceIds.filterNot(partialMd.contains _) + // TODO: populate should definitely be bulk for this usecase + partialMd ++ missingIds.map { id => (id, populateMetadata(txn, id, Normal)) } + } + f(txn, fullMd) } + } + } - val mdMap = mdMapBuilder.result + private def atomically[A](transaction: Transaction, sourceId: Long)(f: Option[Metadata] => A): A = + atomically(transaction, Seq(sourceId)) { md => f(md.get(sourceId)) } - if (mdMap.size < sourceIds.length) { - Left(sourceIds filterNot { mdMap contains _ }) - } else { - Right(f(transaction, mdMap)) - } - } flatMap { - case Left(missingMeta) => - // insert metadata in parallel, then recurse to retry TODO: termination? - Future.join(missingMeta map { populateMetadata(_, Normal) }) flatMap { _ => - atomically(sourceIds)(f) - } - case Right(rv) => Future.value(rv) + /** + * Acquire the given metadata sourceIds FOR UPDATE if they exist: if they do not exist, + * they will be missing from the output map. + */ + private def atomically[A](transaction: Transaction, sourceIds: Seq[Long])(f: Map[Long, Metadata] => A): A = { + val mdMapBuilder = Map.newBuilder[Long, Metadata] + transaction.select( + SelectModify, + "SELECT * FROM " + tablePrefix + "_metadata WHERE source_id in (?) FOR UPDATE", + sourceIds + ) { row => + val sourceId = row.getLong("source_id") + val md = new Metadata( + sourceId, + State(row.getInt("state")), + row.getInt("count"), + Time.fromSeconds(row.getInt("updated_at")) + ) + mdMapBuilder += (sourceId -> md) } + f(mdMapBuilder.result) } def writeMetadata(metadata: Metadata): Future[Unit] = { From 0d56728db919383928528578751eef52518a4bd0 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Sat, 3 Mar 2012 00:54:19 -0800 Subject: [PATCH 4/5] Atomically choose between incrementing and populating in write(edge). --- .../com/twitter/flockdb/shards/SqlShard.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala index f661c8f3..9cf04675 100644 --- a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala +++ b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala @@ -512,16 +512,22 @@ extends Shard { } private def write(edge: Edge, tries: Int, predictExistence: Boolean): Future[Unit] = { - try { - atomically(edge.sourceId) { (transaction, metadata) => - val preAndPostStates = writeEdge(transaction, edge, predictExistence) - val countDelta = countDeltaFor(preAndPostStates, metadata.state) - if (countDelta != 0) { - transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " + - "WHERE source_id = ?", countDelta, edge.sourceId) + queryEvaluator.transaction { transaction => + // insert the edge, and then acquire the metadata to update/populate its count + val preAndPostStates = writeEdge(transaction, edge, predictExistence) + atomically(transaction, edge.sourceId) { metadataOption => + metadataOption.map { metadata => + // metadata already existed: update its count + val countDelta = countDeltaFor(preAndPostStates, metadata.state) + if (countDelta != 0) { + updateCount(transaction, edge.sourceId, countDelta) + } + }.getOrElse { + // metadata doesn't exist: populate it from scratch (post-edge-insert) + populateMetadata(transaction, edge.sourceId, Normal) } } - } catch { + }.unit.rescue { case e: MySQLTransactionRollbackException if (tries > 0) => write(edge, tries - 1, predictExistence) case e: SQLIntegrityConstraintViolationException if (tries > 0) => @@ -568,7 +574,7 @@ extends Shard { } private def updateCount(transaction: Transaction, sourceId: Long, countDelta: Int) = { - transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = count + ? " + + transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " + "WHERE source_id = ?", countDelta, sourceId) } From 643ad09f1d439e680cfb45ee5452ef0e9842c3f8 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Wed, 7 Mar 2012 19:33:37 -0800 Subject: [PATCH 5/5] Remove LIMIT as redundant. --- .../scala/com/twitter/flockdb/shards/SqlShard.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala index 9cf04675..0dfbf858 100644 --- a/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala +++ b/src/main/scala/com/twitter/flockdb/shards/SqlShard.scala @@ -440,15 +440,14 @@ extends Shard { ) { transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " + "position = ?, count = 0, state = ? " + - "WHERE source_id = ? AND destination_id = ? AND " + - "updated_at <= ? LIMIT 1", + "WHERE source_id = ? AND destination_id = ? AND updated_at <= ?", edge.updatedAt.inSeconds, edge.position, edge.state.id, edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds) } else { try { transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " + "count = 0, state = ? " + - "WHERE source_id = ? AND destination_id = ? AND updated_at <= ? LIMIT 1", + "WHERE source_id = ? AND destination_id = ? AND updated_at <= ?", edge.updatedAt.inSeconds, edge.state.id, edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds) } catch { @@ -457,7 +456,7 @@ extends Shard { // FIXME: hacky. remove with the new schema. transaction.execute("UPDATE " + tablePrefix + "_edges SET updated_at = ?, " + "count = 0, state = ?, position = position + ? " + - "WHERE source_id = ? AND destination_id = ? AND updated_at <= ? LIMIT 1", + "WHERE source_id = ? AND destination_id = ? AND updated_at <= ?", edge.updatedAt.inSeconds, edge.state.id, (randomGenerator.nextInt() % 999) + 1, edge.sourceId, edge.destinationId, edge.updatedAt.inSeconds) @@ -469,7 +468,9 @@ extends Shard { case 1 => edge.state case 0 => oldEdge.state case x => - throw new AssertionError("Invalid update count " + x + ": LIMIT statement not applied?") + throw new AssertionError( + "Invalid update count " + x + ": querying by primary key should make this impossible?" + ) } (Some(oldEdge.state), newEdgeState) }