diff --git a/project/build/FlockDBProject.scala b/project/build/FlockDBProject.scala index c4979dcb..f6a02135 100644 --- a/project/build/FlockDBProject.scala +++ b/project/build/FlockDBProject.scala @@ -8,7 +8,7 @@ with SubversionPublisher with DefaultRepos { override def filterScalaJars = false val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1" - val gizzard = "com.twitter" % "gizzard" % "2.1.3" + val gizzard = "com.twitter" % "gizzard" % "2.1.4-SNAPSHOT" val asm = "asm" % "asm" % "1.5.3" % "test" val cglib = "cglib" % "cglib" % "2.1_3" % "test" val hamcrest = "org.hamcrest" % "hamcrest-all" % "1.1" % "test" diff --git a/src/main/scala/com/twitter/flockdb/jobs/Repair.scala b/src/main/scala/com/twitter/flockdb/jobs/Repair.scala index 959a4e38..08f5c4bd 100644 --- a/src/main/scala/com/twitter/flockdb/jobs/Repair.scala +++ b/src/main/scala/com/twitter/flockdb/jobs/Repair.scala @@ -84,6 +84,10 @@ class Repair(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int, def forwardingManager = new ForwardingManager(nameServer) + def scheduleBulk(otherShards: Seq[Shard], items: Seq[Edge]) = { + otherShards.foreach(_.writeCopies(items)) + } + def scheduleDifferent(list: (Shard, ListBuffer[Edge], Repair.RepairCursor), tableId: Int, item: Edge) = { item.schedule(tableId, forwardingManager, scheduler, priority) } @@ -158,6 +162,10 @@ class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor if (item.state != State.Normal || item.count != 0) item.schedule(tableId, forwardingManager, scheduler, priority) } + def scheduleBulk(otherShards: Seq[Shard], items: Seq[Metadata]) = { + otherShards.foreach(_.writeMetadata(items)) + } + def forwardingManager = new ForwardingManager(nameServer) def cursorAtEnd(c: MetadataRepair.RepairCursor) = c == MetadataRepair.END diff --git a/src/test/scala/com/twitter/flockdb/integration/RepairSpec.scala b/src/test/scala/com/twitter/flockdb/integration/RepairSpec.scala index dbe1357f..776a4357 100644 --- a/src/test/scala/com/twitter/flockdb/integration/RepairSpec.scala +++ b/src/test/scala/com/twitter/flockdb/integration/RepairSpec.scala @@ -57,6 +57,16 @@ class RepairSpec extends IntegrationSpecification { shard3.negate(3L, 1L, 5L, Time.now) // only on two shard + // bulk + shard1.add(5L, 2L, 1L, Time.now) // same + shard1.add(6L, 2L, 1L, Time.now) // same + shard1.add(7L, 2L, 1L, Time.now) // same + shard1.add(8L, 2L, 1L, Time.now) // same + shard1.add(9L, 2L, 1L, Time.now) // same + shard1.add(10L, 2L, 1L, Time.now) // same + + + val list = new java.util.ArrayList[com.twitter.gizzard.thrift.ShardId] list.add(new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix)) list.add(new com.twitter.gizzard.thrift.ShardId(shard2id.hostname, shard2id.tablePrefix))