Permalink
Browse files

Merge branch 'twoeight' of github.com:twitter/flockdb into twoeight

  • Loading branch information...
2 parents 637cb51 + 19309cd commit 1038cfb7d9aebe0979d2963024f7b00d28e59221 @freels freels committed Feb 25, 2011
@@ -20,8 +20,18 @@ import com.twitter.util.Time
import com.twitter.flockdb.jobs.single._
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, JsonJob, Repairable}
-case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time, count: Int,
+object Edge {
+ def apply(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time, count: Int, state: State) = new Edge(sourceId, destinationId, position, updatedAt, count, state)
+}
+
+case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAtSeconds: Int, count: Int,
state: State) extends Ordered[Edge] with Repairable[Edge]{
+
+ def this(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time, count: Int, state: State) =
+ this(sourceId, destinationId, position, updatedAt.inSeconds, count, state)
+
+ val updatedAt = Time.fromSeconds(updatedAtSeconds)
+
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler[JsonJob], priority: Int) = {
scheduler.put(priority, toJob(tableId, forwardingManager))
}
@@ -20,7 +20,22 @@ import com.twitter.util.Time
import com.twitter.gizzard.scheduler._
import jobs.multi._
-case class Metadata(sourceId: Long, state: State, count: Int, updatedAt: Time) extends Ordered[Metadata] with Repairable[Metadata] {
+object Metadata {
+ def apply(sourceId: Long, state: State, count: Int, updatedAt: Time) = new Metadata(sourceId, state, count, updatedAt)
+ def apply(sourceId: Long, state: State, updatedAt: Time) = new Metadata(sourceId, state, updatedAt)
+}
+
+case class Metadata(sourceId: Long, state: State, count: Int, updatedAtSeconds: Int) extends Ordered[Metadata] with Repairable[Metadata] {
+
+ def this(sourceId: Long, state: State, count: Int, updatedAt: Time) =
+ this(sourceId, state, count, updatedAt.inSeconds)
+
+ def this(sourceId: Long, state: State, updatedAt: Time) =
+ this(sourceId, state, 0, updatedAt.inSeconds)
+
+ val updatedAt = Time.fromSeconds(updatedAtSeconds)
+
+
def compare(other: Metadata) = {
val out = updatedAt.compare(other.updatedAt)
if (out == 0) {
@@ -109,7 +109,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
def getMetadata(sourceId: Long): Option[Metadata] = {
queryEvaluator.selectOne("SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ?", sourceId) { row =>
- Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at")))
+ new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at")))
}
}
@@ -124,7 +124,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
queryEvaluator.select(SelectCopy, query, cursor.position, count + 1) { row =>
if (i < count) {
val sourceId = row.getLong("source_id")
- metadatas += Metadata(sourceId, State(row.getInt("state")), row.getInt("count"),
+ metadatas += new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"),
Time.fromSeconds(row.getInt("updated_at")))
nextCursor = Cursor(sourceId)
i += 1
@@ -384,7 +384,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
private def updateEdge(transaction: Transaction, metadata: Metadata, edge: Edge,
oldEdge: Edge): Int = {
- if ((oldEdge.updatedAt == edge.updatedAt) && (oldEdge.state max edge.state) != edge.state) return 0
+ if ((oldEdge.updatedAtSeconds == edge.updatedAtSeconds) && (oldEdge.state max edge.state) != edge.state) return 0
val updatedRows = if (
oldEdge.state != Archived && // Only update position when coming from removed or negated into normal
@@ -563,7 +563,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
queryEvaluator.transaction { transaction =>
transaction.selectOne(SelectModify,
"SELECT * FROM " + tablePrefix + "_metadata WHERE source_id = ? FOR UPDATE", sourceId) { row =>
- f(transaction, Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at"))))
+ f(transaction, new Metadata(sourceId, State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at"))))
} getOrElse(throw new MissingMetadataRow)
}
} catch {
@@ -579,7 +579,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
queryEvaluator.transaction { transaction =>
transaction.select(SelectModify,
"SELECT * FROM " + tablePrefix + "_metadata WHERE source_id in (?) FOR UPDATE", sourceIds) { row =>
- metadataMap.put(row.getLong("source_id"), Metadata(row.getLong("source_id"), State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at"))))
+ metadataMap.put(row.getLong("source_id"), new Metadata(row.getLong("source_id"), State(row.getInt("state")), row.getInt("count"), Time.fromSeconds(row.getInt("updated_at"))))
}
if (metadataMap.size < sourceIds.length)
throw new MissingMetadataRow
@@ -632,7 +632,7 @@ class SqlShard(val queryEvaluator: QueryEvaluator, val shardInfo: shards.ShardIn
// FIXME: computeCount could be really expensive. :(
def updateMetadata(sourceId: Long, state: State, updatedAt: Time) {
atomically(sourceId) { (transaction, metadata) =>
- if ((updatedAt != metadata.updatedAt) || ((metadata.state max state) == state)) {
+ if ((updatedAt.inSeconds != metadata.updatedAtSeconds) || ((metadata.state max state) == state)) {
transaction.execute("UPDATE " + tablePrefix + "_metadata SET state = ?, updated_at = ?, count = ? WHERE source_id = ? AND updated_at <= ?",
state.id, updatedAt.inSeconds, computeCount(sourceId, state), sourceId, updatedAt.inSeconds)
}
@@ -20,6 +20,7 @@ import java.io.File
import org.specs.Specification
import com.twitter.gizzard.shards.{Busy, ShardId, ShardInfo}
import com.twitter.gizzard.nameserver.Forwarding
+import com.twitter.gizzard.scheduler._
import com.twitter.gizzard.test.NameServerDatabase
import com.twitter.util.Eval
import com.twitter.querulous.evaluator.QueryEvaluatorFactory
@@ -46,6 +47,10 @@ abstract class ConfiguredSpecification extends Specification {
}
}
}
+
+ def jsonMatching(list1: Iterable[JsonJob], list2: Iterable[JsonJob]) = {
+ list1 must eventually(verify(l1 => { l1.map(_.toJson).sameElements(list2.map(_.toJson))}))
+ }
}
abstract class IntegrationSpecification extends ConfiguredSpecification with NameServerDatabase {
@@ -47,42 +47,37 @@ class EdgesSpec extends IntegrationSpecification {
"get_metadata" in {
reset(config)
- Time.withCurrentTimeFrozen { time =>
- flock.contains_metadata(alice, FOLLOWS) must eventually(be_==(false))
- flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
- flock.contains_metadata(alice, FOLLOWS) must eventually(be_==(true))
-
- // updated_at should not be confused with created_at. Flock rows are commonly inserted with updated_at t=0.
- // This is done to make their sort order low, and prevents a race condition in the case where in an empty db:
- //
- // 1. Mark alice archived.
- // 2. Wait 1 second
- // 3. Insert edge between alice and bob
- // 4. Play those two operations in the db out of order.
- // 5. Observe that alice is unfortunately still in the normal state.
- //
- flock.get_metadata(alice, FOLLOWS) must eventually (be_==(flockdb.Metadata(alice, State.Normal, 1, Time.epoch).toThrift))
+ flock.contains_metadata(alice, FOLLOWS) must eventually(be_==(false))
+ flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
+ flock.contains_metadata(alice, FOLLOWS) must eventually(be_==(true))
- }
+ // updated_at should not be confused with created_at. Flock rows are commonly inserted with updated_at t=0.
+ // This is done to make their sort order low, and prevents a race condition in the case where in an empty db:
+ //
+ // 1. Mark alice archived.
+ // 2. Wait 1 second
+ // 3. Insert edge between alice and bob
+ // 4. Play those two operations in the db out of order.
+ // 5. Observe that alice is unfortunately still in the normal state.
+ //
+ flock.get_metadata(alice, FOLLOWS) must eventually (be_==(new flockdb.Metadata(alice, State.Normal, 1, Time.epoch).toThrift))
}
"add" in {
"existing graph" in {
reset(config)
- Time.withCurrentTimeFrozen { time =>
- flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
- val term = new QueryTerm(alice, FOLLOWS, true)
- term.setDestination_ids(List[Long](bob).pack)
- term.setState_ids(List[Int](State.Normal.id).toJavaList)
- val op = new SelectOperation(SelectOperationType.SimpleQuery)
- op.setTerm(term)
- val page = new Page(1, Cursor.Start.position)
- flock.select(List(op).toJavaList, page).ids.array.size must eventually(be_>(0))
- time.advance(1.second)
- flock.execute(Select(alice, FOLLOWS, bob).remove.toThrift)
- flock.select(List(op).toJavaList, page).ids.array.size must eventually(be_==(0))
- flock.count(Select(alice, FOLLOWS, Nil).toThrift) mustEqual 0
- }
+ flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
+ val term = new QueryTerm(alice, FOLLOWS, true)
+ term.setDestination_ids(List[Long](bob).pack)
+ term.setState_ids(List[Int](State.Normal.id).toJavaList)
+ val op = new SelectOperation(SelectOperationType.SimpleQuery)
+ op.setTerm(term)
+ val page = new Page(1, Cursor.Start.position)
+ flock.select(List(op).toJavaList, page).ids.array.size must eventually(be_>(0))
+ Thread.sleep(1000)
+ flock.execute(Select(alice, FOLLOWS, bob).remove.toThrift)
+ flock.select(List(op).toJavaList, page).ids.array.size must eventually(be_==(0))
+ flock.count(Select(alice, FOLLOWS, Nil).toThrift) mustEqual 0
}
"nonexistent graph" in {
@@ -94,43 +89,41 @@ class EdgesSpec extends IntegrationSpecification {
"remove" in {
reset(config)
flock.execute(Select(bob, FOLLOWS, alice).remove.toThrift)
- (!flock.contains(bob, FOLLOWS, alice) &&
- flock.count(Select(alice, FOLLOWS, Nil).toThrift) == 0 &&
- flock.count(Select(Nil, FOLLOWS, alice).toThrift) == 0) must eventually(beTrue)
+ flock.contains(bob, FOLLOWS, alice) must eventually(beFalse)
+ flock.count(Select(alice, FOLLOWS, Nil).toThrift) must eventually(be_==(0))
+ flock.count(Select(Nil, FOLLOWS, alice).toThrift) must eventually(be_==(0))
}
"archive" in {
reset(config)
- Time.withCurrentTimeFrozen { time =>
- flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
- flock.execute(Select(alice, FOLLOWS, carl).add.toThrift)
- flock.execute(Select(alice, FOLLOWS, darcy).add.toThrift)
- flock.execute(Select(darcy, FOLLOWS, alice).add.toThrift)
- (flock.count(Select(alice, FOLLOWS, ()).toThrift) == 3 &&
- flock.count(Select((), FOLLOWS, alice).toThrift) == 1) must eventually(beTrue)
- for (destinationId <- List(bob, carl, darcy)) {
- flock.count(Select((), FOLLOWS, destinationId).toThrift) must eventually(be_==(1))
- }
- flock.count(Select(darcy, FOLLOWS, ()).toThrift) must eventually(be_==(1))
-
- time.advance(1.second)
- flock.execute((Select(alice, FOLLOWS, ()).archive + Select((), FOLLOWS, alice).archive).toThrift)
- (flock.count(Select(alice, FOLLOWS, ()).toThrift) == 0 &&
- flock.count(Select((), FOLLOWS, alice).toThrift) == 0) must eventually(beTrue)
- for (destinationId <- List(bob, carl, darcy)) {
- flock.count(Select((), FOLLOWS, destinationId).toThrift) must eventually(be_==(0))
- }
- flock.count(Select(darcy, FOLLOWS, ()).toThrift) must eventually(be_==(0))
-
- time.advance(1.seconds)
- flock.execute((Select(alice, FOLLOWS, ()).add + Select((), FOLLOWS, alice).add).toThrift)
- (flock.count(Select(alice, FOLLOWS, ()).toThrift) == 3 &&
- flock.count(Select((), FOLLOWS, alice).toThrift) == 1) must eventually(beTrue)
- for (destinationId <- List(bob, carl, darcy)) {
- flock.count(Select((), FOLLOWS, destinationId).toThrift) must eventually(be_==(1))
- }
- flock.count(Select(darcy, FOLLOWS, ()).toThrift) must eventually(be_==(1))
+ flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
+ flock.execute(Select(alice, FOLLOWS, carl).add.toThrift)
+ flock.execute(Select(alice, FOLLOWS, darcy).add.toThrift)
+ flock.execute(Select(darcy, FOLLOWS, alice).add.toThrift)
+ flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(3))
+ flock.count(Select((), FOLLOWS, alice).toThrift) must eventually(be_==(1))
+ for (destinationId <- List(bob, carl, darcy)) {
+ flock.count(Select((), FOLLOWS, destinationId).toThrift) must eventually(be_==(1))
+ }
+ flock.count(Select(darcy, FOLLOWS, ()).toThrift) must eventually(be_==(1))
+
+ Thread.sleep(1000)
+ flock.execute((Select(alice, FOLLOWS, ()).archive + Select((), FOLLOWS, alice).archive).toThrift)
+ flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(0))
+ flock.count(Select((), FOLLOWS, alice).toThrift) must eventually(be_==(0))
+ for (destinationId <- List(bob, carl, darcy)) {
+ flock.count(Select((), FOLLOWS, destinationId).toThrift) must eventually(be_==(0))
+ }
+ flock.count(Select(darcy, FOLLOWS, ()).toThrift) must eventually(be_==(0))
+
+ Thread.sleep(1000)
+ flock.execute((Select(alice, FOLLOWS, ()).add + Select((), FOLLOWS, alice).add).toThrift)
+ flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(3))
+ flock.count(Select((), FOLLOWS, alice).toThrift) must eventually(be_==(1))
+ for (destinationId <- List(bob, carl, darcy)) {
+ flock.count(Select((), FOLLOWS, destinationId).toThrift) must eventually(be_==(1))
}
+ flock.count(Select(darcy, FOLLOWS, ()).toThrift) must eventually(be_==(1))
}
"archive & unarchive concurrently" in {
@@ -158,20 +151,18 @@ class EdgesSpec extends IntegrationSpecification {
"toggle polarity" in {
reset(config)
- Time.withCurrentTimeFrozen { time =>
- flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
- flock.execute(Select(alice, FOLLOWS, carl).add.toThrift)
- flock.execute(Select(alice, FOLLOWS, darcy).add.toThrift)
- flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(3))
- time.advance(1.second)
- flock.execute(Select(alice, FOLLOWS, ()).negate.toThrift)
- flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(0))
- flock.count(Select(alice, FOLLOWS, ()).negative.toThrift) must eventually(be_==(3))
- time.advance(1.second)
- flock.execute(Select(alice, FOLLOWS, ()).add.toThrift)
- flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(3))
- flock.count(Select(alice, FOLLOWS, ()).negative.toThrift) must eventually(be_==(0))
- }
+ flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
+ flock.execute(Select(alice, FOLLOWS, carl).add.toThrift)
+ flock.execute(Select(alice, FOLLOWS, darcy).add.toThrift)
+ flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(3))
+ Thread.sleep(1000)
+ flock.execute(Select(alice, FOLLOWS, ()).negate.toThrift)
+ flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(0))
+ flock.count(Select(alice, FOLLOWS, ()).negative.toThrift) must eventually(be_==(3))
+ Thread.sleep(1000)
+ flock.execute(Select(alice, FOLLOWS, ()).add.toThrift)
+ flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(3))
+ flock.count(Select(alice, FOLLOWS, ()).negative.toThrift) must eventually(be_==(0))
}
"counts" in {
@@ -188,22 +179,20 @@ class EdgesSpec extends IntegrationSpecification {
"select_edges" in {
"simple query" in {
reset(config)
- Time.withCurrentTimeFrozen { time =>
- flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
- time.advance(1.second)
- flock.execute(Select(alice, FOLLOWS, carl).add.toThrift)
- flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(2))
-
- val term = new QueryTerm(alice, FOLLOWS, true)
- term.setState_ids(List[Int](State.Normal.id).toJavaList)
- val query = new EdgeQuery(term, new Page(10, Cursor.Start.position))
- val resultsList = flock.select_edges(List[EdgeQuery](query).toJavaList).toList
- resultsList.size mustEqual 1
- val results = resultsList(0)
- results.next_cursor mustEqual Cursor.End.position
- results.prev_cursor mustEqual Cursor.End.position
- results.edges.toList.map { _.destination_id }.toList mustEqual List[Long](carl, bob)
- }
+ flock.execute(Select(alice, FOLLOWS, bob).add.toThrift)
+ Thread.sleep(1000)
+ flock.execute(Select(alice, FOLLOWS, carl).add.toThrift)
+ flock.count(Select(alice, FOLLOWS, ()).toThrift) must eventually(be_==(2))
+
+ val term = new QueryTerm(alice, FOLLOWS, true)
+ term.setState_ids(List[Int](State.Normal.id).toJavaList)
+ val query = new EdgeQuery(term, new Page(10, Cursor.Start.position))
+ val resultsList = flock.select_edges(List[EdgeQuery](query).toJavaList).toList
+ resultsList.size mustEqual 1
+ val results = resultsList(0)
+ results.next_cursor mustEqual Cursor.End.position
+ results.prev_cursor mustEqual Cursor.End.position
+ results.edges.toList.map { _.destination_id }.toList mustEqual List[Long](carl, bob)
}
"intersection" in {
@@ -32,21 +32,21 @@ object EdgeSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"Edge" should {
"normal becomes single.Add" in {
- val edge = Edge(source, dest, pos, now, count, State.Normal)
+ val edge = new Edge(source, dest, pos, now, count, State.Normal)
edge.toJob(graph, forwardingManager) mustEqual new Add(source, graph, dest, pos, now, forwardingManager, OrderedUuidGenerator)
}
"removed becomes single.Remove" in {
- val edge = Edge(source, dest, pos, now, count, State.Removed)
+ val edge = new Edge(source, dest, pos, now, count, State.Removed)
edge.toJob(graph, forwardingManager) mustEqual new Remove(source, graph, dest, pos, now, forwardingManager, OrderedUuidGenerator)
}
"archived becomes single.Archive" in {
- val edge = Edge(source, dest, pos, now, count, State.Archived)
+ val edge = new Edge(source, dest, pos, now, count, State.Archived)
edge.toJob(graph, forwardingManager) mustEqual new Archive(source, graph, dest, pos, now, forwardingManager, OrderedUuidGenerator)
}
"negative becomes single.Negate" in {
- val edge = Edge(source, dest, pos, now, count, State.Negative)
+ val edge = new Edge(source, dest, pos, now, count, State.Negative)
edge.toJob(graph, forwardingManager) mustEqual new Negate(source, graph, dest, pos, now, forwardingManager, OrderedUuidGenerator)
}
Oops, something went wrong.

0 comments on commit 1038cfb

Please sign in to comment.