Permalink
Browse files

move repair to gizzard

  • Loading branch information...
1 parent e148412 commit 1ff2e3799660390244f40d9bae1a2310c7c90755 Josh Hull committed Jan 19, 2011
@@ -18,7 +18,8 @@ package com.twitter.flockdb
import com.twitter.util.Time
import com.twitter.flockdb.jobs.single._
-import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, JsonJob, Repairable}
+import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, JsonJob}
+import com.twitter.flockdb.jobs.Repairable
case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time, count: Int,
state: State) extends Repairable[Edge] {
@@ -68,7 +68,7 @@ class FlockDB(config: flockdb.config.FlockDB, w3c: W3CStats) extends GizzardServ
val copyPriority = Priority.Medium.id
val copyFactory = new jobs.CopyFactory(nameServer, jobScheduler(Priority.Medium.id))
- val repairFactory = new jobs.RepairFactory(nameServer, jobScheduler)
+ override val repairFactory = new jobs.RepairFactory(nameServer, jobScheduler)
val dbQueryEvaluatorFactory = config.edgesQueryEvaluator(stats)
val materializingQueryEvaluatorFactory = config.materializingQueryEvaluator(stats)
@@ -134,12 +134,6 @@ class FlockDBThriftAdapter(val edges: EdgesService, val scheduler: PrioritizingJ
edges.get(source_id, graph_id, destination_id).toThrift
}
- def repair_shard(sourceId: thrift.ShardId, destinationId: thrift.ShardId) = {
- scheduler.put(Priority.Medium.id, repairs(
- ShardId(sourceId.hostname, sourceId.table_prefix),
- ShardId(destinationId.hostname, destinationId.table_prefix)))
- }
-
@deprecated
def select(operations: JList[thrift.SelectOperation], page: thrift.Page): thrift.Results = {
edges.select(new SelectQuery(operations.toSeq.map { _.fromThrift }, page.fromThrift)).toThrift
@@ -28,6 +28,10 @@ import com.twitter.gizzard.nameserver.{NameServer, NonExistentShard}
import com.twitter.gizzard.shards.{ShardDatabaseTimeoutException, ShardTimeoutException}
import collection.mutable.ListBuffer
+trait Repairable[T] {
+ def similar(other: T): Int
+}
+
object Repair {
type RepairCursor = (Cursor, Cursor)
val START = (Cursor.Start, Cursor.Start)
@@ -37,14 +41,14 @@ object Repair {
}
class RepairFactory(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
- extends RepairJobFactory[Shard, Metadata] {
+ extends RepairJobFactory[Shard] {
def apply(sourceShardId: ShardId, destShardId: ShardId) = {
new MetadataRepair(sourceShardId, destShardId, MetadataRepair.START, MetadataRepair.START, MetadataRepair.COUNT, nameServer, scheduler)
}
}
class RepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
- extends RepairJobParser[Shard, Edge] {
+ extends RepairJobParser[Shard] {
def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
val srcCursor = (Cursor(attributes("src_cursor1").asInstanceOf[AnyVal].toLong),
Cursor(attributes("src_cursor2").asInstanceOf[AnyVal].toLong))
@@ -54,10 +58,54 @@ class RepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobSche
}
}
+abstract class TwoCursorRepair[R <: Repairable[R]](sourceShardId: ShardId, destinationShardId: ShardId, count: Int, nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob], priority: Int) extends RepairJob[Shard](sourceShardId, destinationShardId, count, nameServer, scheduler, priority) {
+
+ def enqueueFirst(tableId: Int, list: ListBuffer[R])
+
+ def resolve(tableId: Int, srcSeq: Seq[R], srcCursorAtEnd: Boolean, destSeq: Seq[R], destCursorAtEnd: Boolean) = {
+ val srcItems = new ListBuffer[R]()
+ srcItems ++= srcSeq
+ val destItems = new ListBuffer[R]()
+ destItems ++= destSeq
+ var running = !(srcItems.isEmpty && destItems.isEmpty)
+ while (running) {
+ val srcItem = srcItems.firstOption
+ val destItem = destItems.firstOption
+ (srcCursorAtEnd, destCursorAtEnd, srcItem, destItem) match {
+ case (true, true, None, None) => running = false
+ case (true, true, _, None) => enqueueFirst(tableId, srcItems)
+ case (true, true, None, _) => enqueueFirst(tableId, destItems)
+ case (_, _, _, _) =>
+ (srcItem, destItem) match {
+ case (None, None) => running = false
+ case (_, None) => running = false
+ case (None, _) => running = false
+ case (_, _) =>
+ srcItem.get.similar(destItem.get) match {
+ case x if x < 0 => enqueueFirst(tableId, srcItems)
+ case x if x > 0 => enqueueFirst(tableId, destItems)
+ case _ =>
+ if (srcItem != destItem) {
+ enqueueFirst(tableId, srcItems)
+ enqueueFirst(tableId, destItems)
+ } else {
+ srcItems.remove(0)
+ destItems.remove(0)
+ }
+ }
+ }
+ }
+ running &&= !(srcItems.isEmpty && destItems.isEmpty)
+ }
+ (srcItems.firstOption, destItems.firstOption)
+ }
+
+}
+
class Repair(sourceShardId: ShardId, destinationShardId: ShardId, srcCursor: Repair.RepairCursor,
destCursor: Repair.RepairCursor, count: Int, nameServer: NameServer[Shard],
scheduler: PrioritizingJobScheduler[JsonJob])
- extends RepairJob[Shard, Edge](sourceShardId, destinationShardId, count, nameServer, scheduler, Repair.PRIORITY) {
+ extends TwoCursorRepair[Edge](sourceShardId, destinationShardId, count, nameServer, scheduler, Repair.PRIORITY) {
private val log = Logger.get(getClass.getName)
@@ -68,9 +116,7 @@ class Repair(sourceShardId: ShardId, destinationShardId: ShardId, srcCursor: Rep
def repair(sourceShard: Shard, destinationShard: Shard) = {
val (srcSeq, newSrcCursor) = sourceShard.selectAll(srcCursor, count)
val (destSeq, newDestCursor) = destinationShard.selectAll(destCursor, count)
- println("getting forwarding!")
val sourceTableId = nameServer.getRootForwardings(sourceShard.shardInfo.id)(0).tableId
- println("getting forwarding!")
val destinationTableId = nameServer.getRootForwardings(destinationShard.shardInfo.id)(0).tableId
if (sourceTableId != destinationTableId) {
throw new RuntimeException(sourceShard+" tableId did not match "+destinationShard);
@@ -118,7 +164,7 @@ object MetadataRepair {
}
class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
- extends RepairJobParser[Shard, Metadata] {
+ extends RepairJobParser[Shard] {
def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int) = {
val srcCursor = Cursor(attributes("src_cursor").asInstanceOf[AnyVal].toLong)
val destCursor = Cursor(attributes("dest_cursor").asInstanceOf[AnyVal].toLong)
@@ -128,19 +174,16 @@ class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: Prioritizin
class MetadataRepair(sourceShardId: ShardId, destinationShardId: ShardId, srcCursor: MetadataRepair.RepairCursor,
destCursor: MetadataRepair.RepairCursor, count: Int, nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
- extends RepairJob[Shard, Metadata](sourceShardId, destinationShardId, count, nameServer, scheduler, MetadataRepair.PRIORITY) {
+ extends TwoCursorRepair[Metadata](sourceShardId, destinationShardId, count, nameServer, scheduler, MetadataRepair.PRIORITY) {
private val log = Logger.get(getClass.getName)
def scheduleNextRepair(srcEdge: Option[Metadata], newSrcCursor: MetadataRepair.RepairCursor, destEdge: Option[Metadata], newDestCursor: MetadataRepair.RepairCursor) = {
- println("scheduleNextRepair")
scheduler.put(MetadataRepair.PRIORITY, (newSrcCursor, newDestCursor) match {
case (MetadataRepair.END, MetadataRepair.END) =>
- println("making a repair")
new Repair(sourceShardId, destinationShardId, Repair.START, Repair.START, Repair.COUNT, nameServer, scheduler)
case (_, _) =>
incrGauge
- println("doing metadata work")
(srcEdge, destEdge) match {
case (None, None) =>
new MetadataRepair(sourceShardId, destinationShardId, newSrcCursor, newDestCursor, count, nameServer, scheduler)
@@ -167,18 +210,13 @@ class MetadataRepair(sourceShardId: ShardId, destinationShardId: ShardId, srcCur
}
def repair(sourceShard: Shard, destinationShard: Shard) = {
- println("repair: source:"+sourceShard+" dest:"+destinationShard)
val (srcSeq, newSrcCursor) = sourceShard.selectAllMetadata(srcCursor, count)
val (destSeq, newDestCursor) = destinationShard.selectAllMetadata(destCursor, count)
- println("getting forwarding!")
val sourceTableId = nameServer.getRootForwardings(sourceShard.shardInfo.id)(0).tableId
- println("getting forwarding!")
val destinationTableId = nameServer.getRootForwardings(destinationShard.shardInfo.id)(0).tableId
if (sourceTableId != destinationTableId) {
- println("no match!")
throw new RuntimeException(sourceShard+" tableId did not match "+destinationShard);
} else {
- println("resolving!")
val (srcMetadata, destMetadata) = resolve(sourceTableId, srcSeq, newSrcCursor == MetadataRepair.END, destSeq, newDestCursor == MetadataRepair.END)
scheduleNextRepair(srcMetadata, newSrcCursor, destMetadata, newDestCursor)
}
@@ -22,6 +22,7 @@ import com.twitter.util.Time
import com.twitter.util.TimeConversions._
import flockdb.jobs.multi._
import com.twitter.gizzard.scheduler._
+import com.twitter.flockdb.jobs.Repairable
case class Metadata(sourceId: Long, state: State, count: Int, updatedAt: Time) extends Repairable[Metadata] {
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler[JsonJob], priority: Int) = {
@@ -128,11 +128,6 @@ struct EdgeResults {
3: i64 prev_cursor
}
-struct ShardId {
- 1: string hostname
- 2: string table_prefix
-}
-
service FlockDB {
# return true if the edge exists.
bool contains(1: i64 source_id, 2: i32 graph_id, 3: i64 destination_id) throws(1: FlockException ex)
@@ -153,8 +148,6 @@ service FlockDB {
void execute(1: ExecuteOperations operations) throws(1: FlockException ex)
- void repair_shard(1: ShardId source_id, 2: ShardId destination_id) throws(1: FlockException ex)
-
# deprecated:
i32 count(1: list<SelectOperation> operations) throws(1: FlockException ex)
Results select(1: list<SelectOperation> operations, 2: Page page) throws(1: FlockException ex)
@@ -20,9 +20,9 @@ import com.twitter.gizzard.thrift.conversions.Sequences._
import com.twitter.ostrich.Stats
import com.twitter.util.Time
import com.twitter.util.TimeConversions._
-import thrift._
import conversions.ExecuteOperations._
import conversions.SelectOperation._
+import com.twitter.gizzard.thrift._
import com.twitter.gizzard.shards.{Busy, ShardId, ShardInfo}
import com.twitter.gizzard.nameserver.Forwarding
import com.twitter.flockdb.shards.{SqlShard}
@@ -51,7 +51,10 @@ class RepairSpec extends IntegrationSpecification {
shard2.add(1L, 4L, 4L, Time.now) // only on two shard
- flock.repair_shard(new thrift.ShardId(shard1id.hostname, shard1id.tablePrefix), new thrift.ShardId(shard2id.hostname, shard2id.tablePrefix))
+ manager.repair_shard(
+ new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix),
+ new com.twitter.gizzard.thrift.ShardId(shard2id.hostname, shard2id.tablePrefix)
+ )
shard1.selectAll(Repair.START, Repair.COUNT)._1 must eventually(verify(s => s sameElements shard2.selectAll(Repair.START, Repair.COUNT)._1))
}
}
Oops, something went wrong.

0 comments on commit 1ff2e37

Please sign in to comment.