Permalink
Browse files

Merge branch 'fix_metadata_updated_at_zero' into node_query

  • Loading branch information...
2 parents 6960bcc + 96f5304 commit 5a9cf872f5d52a19bd89e6afb3a590150c266bb0 Kyle Maxwell committed Jan 11, 2011
@@ -74,9 +74,9 @@ abstract class Single(sourceId: Long, graphId: Int, destinationId: Long, positio
(forwardShard, backwardShard)
}
- private def withLock(state: State, shard: Shard, id: Long)(f: (State, Option[Shard]) => Unit) {
+ private def withLock(state: State, shard: Shard, id: Long, updatedAt: Time)(f: (State, Option[Shard]) => Unit) {
try {
- shard.withLock(id) { (newShard, metadata) =>
+ shard.withLock(id, updatedAt) { (newShard, metadata) =>
f(metadata.state max state, Some(newShard))
}
} catch {
@@ -88,8 +88,8 @@ abstract class Single(sourceId: Long, graphId: Int, destinationId: Long, positio
def apply() {
val (forwardShard, backwardShard) = shards()
val uuid = uuidGenerator(position)
- withLock(preferredState, forwardShard, sourceId) { (state, forwardShard) =>
- withLock(state, backwardShard, destinationId) { (state, backwardShard) =>
+ withLock(preferredState, forwardShard, sourceId, updatedAt) { (state, forwardShard) =>
+ withLock(state, backwardShard, destinationId, updatedAt) { (state, backwardShard) =>
state match {
case State.Normal =>
forwardShard.foreach { _.add(sourceId, destinationId, uuid, updatedAt) }
@@ -52,16 +52,16 @@ class ReadWriteShardAdapter(shard: shards.ReadWriteShard[Shard])
def archive(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time) = shard.writeOperation(_.archive(sourceId, destinationId, position, updatedAt))
def archive(sourceId: Long, updatedAt: Time) = shard.writeOperation(_.archive(sourceId, updatedAt))
- def withLock[A](sourceId: Long)(f: (Shard, Metadata) => A) = {
+ def withLock[A](sourceId: Long, updatedAt: Time)(f: (Shard, Metadata) => A) = {
if (shard.isInstanceOf[shards.ReplicatingShard[_]]) {
val replicatingShard = shard.asInstanceOf[shards.ReplicatingShard[Shard]]
val lockServer = children.first.asInstanceOf[Shard]
val rest = children.drop(1).asInstanceOf[Seq[Shard]]
- lockServer.withLock(sourceId) { (lock, metadata) =>
+ lockServer.withLock(sourceId, updatedAt) { (lock, metadata) =>
f(new ReadWriteShardAdapter(new shards.ReplicatingShard(shardInfo, weight, List(lock) ++ rest, replicatingShard.loadBalancer, replicatingShard.future)), metadata)
}
} else {
- shard.writeOperation(_.withLock(sourceId)(f))
+ shard.writeOperation(_.withLock(sourceId, updatedAt)(f))
}
}
}
@@ -24,7 +24,7 @@ import com.twitter.util.TimeConversions._
trait Shard extends shards.Shard {
@throws(classOf[shards.ShardException]) def get(sourceId: Long, destinationId: Long): Option[Edge]
@throws(classOf[shards.ShardException]) def getMetadata(sourceId: Long): Option[Metadata]
- @throws(classOf[shards.ShardException]) def withLock[A](sourceId: Long)(f: (Shard, Metadata) => A): A
+ @throws(classOf[shards.ShardException]) def withLock[A](sourceId: Long, updatedAt: Time)(f: (Shard, Metadata) => A): A
@throws(classOf[shards.ShardException]) def count(sourceId: Long, states: Seq[State]): Int
@throws(classOf[shards.ShardException]) def counts(sourceIds: Seq[Long], results: mutable.Map[Long, Int])
@@ -138,8 +138,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
result + (if (state == State(row.getInt("state"))) row.getInt("count") else 0)
}
} getOrElse {
- populateMetadata(sourceId, Normal)
- count(sourceId, states)
+ 0
}
}
@@ -149,8 +148,6 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
}
}
- private def populateMetadata(sourceId: Long, state: State) { populateMetadata(sourceId, state, Time(0.seconds)) }
-
private def populateMetadata(sourceId: Long, state: State, updatedAt: Time) {
try {
queryEvaluator.execute(
@@ -451,7 +448,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
private def write(edge: Edge, tries: Int, predictExistence: Boolean) {
try {
- atomically(edge.sourceId) { (transaction, metadata) =>
+ atomically(edge.sourceId, edge.updatedAt) { (transaction, metadata) =>
val countDelta = writeEdge(transaction, metadata, edge, predictExistence)
if (countDelta != 0) {
transaction.execute("UPDATE " + tablePrefix + "_metadata SET count = GREATEST(count + ?, 0) " +
@@ -508,7 +505,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
edges.foreach { edge => sourceIdsSet += edge.sourceId }
val sourceIds = sourceIdsSet.toSeq
- atomically(sourceIds) { (transaction, metadataById) =>
+ atomically(sourceIds, edges.map(_.updatedAt)) { (transaction, metadataById) =>
val result = writeBurst(transaction, edges)
if (result.completed.size > 0) {
var currentSourceId = -1L
@@ -545,13 +542,13 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
}
}
- def withLock[A](sourceId: Long)(f: (Shard, Metadata) => A) = {
- atomically(sourceId) { (transaction, metadata) =>
+ def withLock[A](sourceId: Long, updatedAt: Time)(f: (Shard, Metadata) => A) = {
+ atomically(sourceId, updatedAt) { (transaction, metadata) =>
f(new SqlShard(transaction, shardInfo, weight, children, deadlockRetries), metadata)
}
}
- private def atomically[A](sourceId: Long)(f: (Transaction, Metadata) => A): A = {
+ private def atomically[A](sourceId: Long, timestamp: Time)(f: (Transaction, Metadata) => A): A = {
try {
queryEvaluator.transaction { transaction =>
transaction.selectOne(SelectModify,
@@ -561,12 +558,16 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
}
} catch {
case e: MissingMetadataRow =>
- populateMetadata(sourceId, Normal)
- atomically(sourceId)(f)
+ populateMetadata(sourceId, Normal, timestamp)
+ atomically(sourceId, timestamp)(f)
}
}
+
+ private def atomically[A](sourceIds: Seq[Long], timestamp: Time)(f: (Transaction, collection.Map[Long, Metadata]) => A): A = {
+ atomically(sourceIds, sourceIds.map{ id => timestamp })(f)
+ }
- private def atomically[A](sourceIds: Seq[Long])(f: (Transaction, collection.Map[Long, Metadata]) => A): A = {
+ private def atomically[A](sourceIds: Seq[Long], timestamps: Seq[Time])(f: (Transaction, collection.Map[Long, Metadata]) => A): A = {
try {
val metadataMap = mutable.Map[Long, Metadata]()
queryEvaluator.transaction { transaction =>
@@ -579,9 +580,10 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
f(transaction, metadataMap)
}
} catch {
- case e: MissingMetadataRow =>
- sourceIds.foreach { sourceId => populateMetadata(sourceId, Normal) }
- atomically(sourceIds)(f)
+ case e: MissingMetadataRow => {
+ for(sourceId <- sourceIds; timestamp <- timestamps) { populateMetadata(sourceId, Normal, timestamp) }
+ atomically(sourceIds, timestamps)(f)
+ }
}
}
@@ -592,7 +594,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
metadata.sourceId, 0, metadata.state.id, metadata.updatedAt.inSeconds)
} catch {
case e: SQLIntegrityConstraintViolationException =>
- atomically(metadata.sourceId) { (transaction, oldMetadata) =>
+ atomically(metadata.sourceId, metadata.updatedAt) { (transaction, oldMetadata) =>
transaction.execute("UPDATE " + tablePrefix + "_metadata SET state = ?, updated_at = ? " +
"WHERE source_id = ? AND updated_at <= ?",
metadata.state.id, metadata.updatedAt.inSeconds, metadata.sourceId,
@@ -624,7 +626,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
// FIXME: computeCount could be really expensive. :(
def updateMetadata(sourceId: Long, state: State, updatedAt: Time) {
- atomically(sourceId) { (transaction, metadata) =>
+ atomically(sourceId, updatedAt) { (transaction, metadata) =>
if ((updatedAt != metadata.updatedAt) || ((metadata.state max state) == state)) {
transaction.execute("UPDATE " + tablePrefix + "_metadata SET state = ?, updated_at = ?, count = ? WHERE source_id = ? AND updated_at <= ?",
state.id, updatedAt.inSeconds, computeCount(sourceId, state), sourceId, updatedAt.inSeconds)
@@ -29,7 +29,7 @@ import thrift.Edge
class FakeLockingShard(shard: Shard) extends SqlShard(null, new ShardInfo("a", "b", "c"), 1, Nil, 0) {
- override def withLock[A](sourceId: Long)(f: (Shard, Metadata) => A) = f(shard, shard.getMetadata(sourceId).get) // jMock is not up to the task
+ override def withLock[A](sourceId: Long, updatedAt: Time)(f: (Shard, Metadata) => A) = f(shard, shard.getMetadata(sourceId).get) // jMock is not up to the task
}
class JobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
@@ -50,7 +50,7 @@ object ReadWriteShardAdapterSpec extends ConfiguredSpecification with JMocker wi
val fake2 = new FakeLockingShard(shard2)
val fake1 = new FakeReadWriteShard[Shard](fake2, null, 1, Nil)
val shard = new ReadWriteShardAdapter(fake1)
- shard.withLock(sourceId) { (innerShard, metadata) =>
+ shard.withLock(sourceId, Time.now) { (innerShard, metadata) =>
innerShard.add(sourceId, Time.now)
}
}
@@ -113,8 +113,6 @@ class SqlShardSpec extends IntegrationSpecification with JMocker {
"when the state is given" >> {
"when no edges have been added beforehand and a non-normal state is given" >> {
shard.count(alice, List(State.Archived)) mustEqual 0
- val metadata = shard.getMetadata(alice).get
- metadata.state mustEqual State.Normal
}
"when edges have been added beforehand" >> {
@@ -324,6 +322,14 @@ class SqlShardSpec extends IntegrationSpecification with JMocker {
}
}
+ "getMetadata" in {
+ "side-effect rows should be created with a timestamp" >> {
+ shard.add(alice, bob, 1, now)
+ shard.getMetadata(alice).isDefined must eventually(be_==(true))
+ shard.getMetadata(alice).get.updatedAt mustEqual now
+ }
+ }
+
"get" in {
shard.add(alice, bob, 1, now)
shard.archive(carl, darcy, 2, now)

0 comments on commit 5a9cf87

Please sign in to comment.