Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
use bulk repair
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Hull committed Mar 3, 2011
1 parent 25e594e commit 26165b7
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
2 changes: 1 addition & 1 deletion project/build/FlockDBProject.scala
Expand Up @@ -8,7 +8,7 @@ with SubversionPublisher with DefaultRepos {
override def filterScalaJars = false
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1"

val gizzard = "com.twitter" % "gizzard" % "2.1.3"
val gizzard = "com.twitter" % "gizzard" % "2.1.4-SNAPSHOT"
val asm = "asm" % "asm" % "1.5.3" % "test"
val cglib = "cglib" % "cglib" % "2.1_3" % "test"
val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1" % "test"
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/com/twitter/flockdb/jobs/Repair.scala
Expand Up @@ -84,6 +84,10 @@ class Repair(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,

def forwardingManager = new ForwardingManager(nameServer)

def scheduleBulk(otherShards: Seq[Shard], items: Seq[Edge]) = {
otherShards.foreach(_.writeCopies(items))
}

def scheduleDifferent(list: (Shard, ListBuffer[Edge], Repair.RepairCursor), tableId: Int, item: Edge) = {
item.schedule(tableId, forwardingManager, scheduler, priority)
}
Expand Down Expand Up @@ -158,6 +162,10 @@ class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor
if (item.state != State.Normal || item.count != 0) item.schedule(tableId, forwardingManager, scheduler, priority)
}

def scheduleBulk(otherShards: Seq[Shard], items: Seq[Metadata]) = {
otherShards.foreach(_.writeMetadata(items))
}

def forwardingManager = new ForwardingManager(nameServer)

def cursorAtEnd(c: MetadataRepair.RepairCursor) = c == MetadataRepair.END
Expand Down
10 changes: 10 additions & 0 deletions src/test/scala/com/twitter/flockdb/integration/RepairSpec.scala
Expand Up @@ -57,6 +57,16 @@ class RepairSpec extends IntegrationSpecification {

shard3.negate(3L, 1L, 5L, Time.now) // only on two shard

// bulk
shard1.add(5L, 2L, 1L, Time.now) // same
shard1.add(6L, 2L, 1L, Time.now) // same
shard1.add(7L, 2L, 1L, Time.now) // same
shard1.add(8L, 2L, 1L, Time.now) // same
shard1.add(9L, 2L, 1L, Time.now) // same
shard1.add(10L, 2L, 1L, Time.now) // same



val list = new java.util.ArrayList[com.twitter.gizzard.thrift.ShardId]
list.add(new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix))
list.add(new com.twitter.gizzard.thrift.ShardId(shard2id.hostname, shard2id.tablePrefix))
Expand Down

0 comments on commit 26165b7

Please sign in to comment.