Permalink
Browse files

move repair_shard to gizzard interface

  • Loading branch information...
1 parent 3a390d0 commit 24f4de9e75ac7f7c8816a8428c2e2dbb1e27e4bd Josh Hull committed Jan 19, 2011
@@ -4,16 +4,18 @@ import com.twitter.util.Duration
import com.twitter.util.TimeConversions._
import net.lag.logging.Logger
import nameserver.{NameServer, BasicShardRepository}
-import scheduler.{CopyJobFactory, JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec}
+import scheduler.{CopyJobFactory, JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec, RepairJobFactory}
import shards.{Shard, ReadWriteShard}
abstract class GizzardServer[S <: Shard, J <: JsonJob](config: gizzard.config.GizzardServer) {
def readWriteShardAdapter: ReadWriteShard[S] => S
def copyFactory: CopyJobFactory[S]
+ def repairFactory: RepairJobFactory[S] = null
def jobPriorities: Seq[Int]
def copyPriority: Int
+ def repairPriority: Int = copyPriority
def start(): Unit
def shutdown(quiesce: Boolean): Unit
def shutdown() { shutdown(false) }
@@ -45,7 +47,7 @@ abstract class GizzardServer[S <: Shard, J <: JsonJob](config: gizzard.config.Gi
// service wiring
- lazy val managerServer = new thrift.ManagerService(nameServer, copyFactory, jobScheduler, copyScheduler)
+ lazy val managerServer = new thrift.ManagerService(nameServer, copyFactory, jobScheduler, copyScheduler, repairFactory, repairPriority)
lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer))
lazy val jobInjectorServer = new thrift.JobInjectorService(jobCodec, jobScheduler)
@@ -112,7 +112,7 @@ extends Shard {
def getRootForwardings(id: ShardId): Seq[Forwarding] = {
val ids = nameServerShard.listUpwardLinks(id)
(try {
- getForwardingForShard(id) ::: Nil
+ getForwardingForShard(id) :: Nil
} catch {
case e:ShardException => Nil
}) ++ ids.map((i) => getRootForwardings(i.upId)).flatMap((i) => i)
@@ -11,23 +11,19 @@ object RepairJob {
val MIN_COPY = 500
}
-trait Repairable[T] {
- def similar(other: T): Int
-}
-
/**
* A factory for creating a new repair job (with default count and a starting cursor) from a source
* and destination shard ID.
*/
-trait RepairJobFactory[S <: Shard, R <: Repairable[R]] extends ((ShardId, ShardId) => RepairJob[S, R])
+trait RepairJobFactory[S <: Shard] extends ((ShardId, ShardId) => RepairJob[S])
/**
* A parser that creates a repair job out of json. The basic attributes (source shard ID, destination)
* shard ID, count) are parsed out first, and the remaining attributes are passed to
* 'deserialize' to decode any shard-specific data (like a cursor).
*/
-trait RepairJobParser[S <: Shard, R <: Repairable[R]] extends JsonJobParser {
- def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int): RepairJob[S, R]
+trait RepairJobParser[S <: Shard] extends JsonJobParser {
+ def deserialize(attributes: Map[String, Any], sourceId: ShardId, destinationId: ShardId, count: Int): RepairJob[S]
def apply(attributes: Map[String, Any]): JsonJob = {
deserialize(attributes,
@@ -46,7 +42,7 @@ trait RepairJobParser[S <: Shard, R <: Repairable[R]] extends JsonJobParser {
* 'repair' is called to do the actual data repair. It should return a new Some[RepairJob] representing
* the next chunk of work to do, or None if the entire copying job is complete.
*/
-abstract case class RepairJob[S <: Shard, R <: Repairable[R]](sourceId: ShardId,
+abstract case class RepairJob[S <: Shard](sourceId: ShardId,
destinationId: ShardId,
var count: Int,
nameServer: NameServer[S],
@@ -103,44 +99,4 @@ abstract case class RepairJob[S <: Shard, R <: Repairable[R]](sourceId: ShardId,
def repair(sourceShard: S, destinationShard: S)
def serialize: Map[String, Any]
-
- def enqueueFirst(tableId: Int, list: ListBuffer[R])
-
- def resolve(tableId: Int, srcSeq: Seq[R], srcCursorAtEnd: Boolean, destSeq: Seq[R], destCursorAtEnd: Boolean) = {
- val srcItems = new ListBuffer[R]()
- srcItems ++= srcSeq
- val destItems = new ListBuffer[R]()
- destItems ++= destSeq
- var running = !(srcItems.isEmpty && destItems.isEmpty)
- while (running) {
- val srcItem = srcItems.firstOption
- val destItem = destItems.firstOption
- (srcCursorAtEnd, destCursorAtEnd, srcItem, destItem) match {
- case (true, true, None, None) => running = false
- case (true, true, _, None) => enqueueFirst(tableId, srcItems)
- case (true, true, None, _) => enqueueFirst(tableId, destItems)
- case (_, _, _, _) =>
- (srcItem, destItem) match {
- case (None, None) => running = false
- case (_, None) => running = false
- case (None, _) => running = false
- case (_, _) =>
- srcItem.get.similar(destItem.get) match {
- case x if x < 0 => enqueueFirst(tableId, srcItems)
- case x if x > 0 => enqueueFirst(tableId, destItems)
- case _ =>
- if (srcItem != destItem) {
- enqueueFirst(tableId, srcItems)
- enqueueFirst(tableId, destItems)
- } else {
- srcItems.remove(0)
- destItems.remove(0)
- }
- }
- }
- }
- running &&= !(srcItems.isEmpty && destItems.isEmpty)
- }
- (srcItems.firstOption, destItems.firstOption)
- }
}
@@ -9,13 +9,13 @@ import com.twitter.gizzard.thrift.conversions.ShardInfo._
import com.twitter.gizzard.thrift.conversions.Forwarding._
import com.twitter.gizzard.thrift.conversions.Host._
import com.twitter.gizzard.shards._
-import com.twitter.gizzard.scheduler.{CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler}
+import com.twitter.gizzard.scheduler.{CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler, RepairJobFactory}
import com.twitter.gizzard.nameserver._
import net.lag.logging.Logger
import java.util.{List => JList}
-class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S], copier: CopyJobFactory[S], scheduler: PrioritizingJobScheduler[J], copyScheduler: JobScheduler[JsonJob]) extends Manager.Iface {
+class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S], copier: CopyJobFactory[S], scheduler: PrioritizingJobScheduler[J], copyScheduler: JobScheduler[JsonJob], repairer: RepairJobFactory[S], repairPriority: Int) extends Manager.Iface {
val log = Logger.get(getClass.getName)
def wrapEx[A](f: => A): A = try { f } catch {
@@ -98,6 +98,10 @@ class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S],
wrapEx(copyScheduler.put(copier(sourceId.fromThrift, destinationId.fromThrift)))
}
+ def repair_shard(sourceId: ShardId, destinationId: ShardId) = {
+ wrapEx((scheduler.asInstanceOf[PrioritizingJobScheduler[JsonJob]]).put(repairPriority, repairer(sourceId.fromThrift, destinationId.fromThrift)))
+ }
+
def dump_nameserver(tableId: Int) = wrapEx(nameServer.dumpStructure(tableId).toThrift)
@@ -89,6 +89,7 @@ service Manager {
void mark_shard_busy(1: ShardId id, 2: i32 busy) throws(1: GizzardException ex)
void copy_shard(1: ShardId source_id, 2: ShardId destination_id) throws(1: GizzardException ex)
+ void repair_shard(1: ShardId source_id, 2: ShardId destination_id) throws(1: GizzardException ex)
NameserverState dump_nameserver(1: i32 table_id) throws(1: GizzardException ex)
@@ -16,7 +16,7 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas
val scheduler = mock[PrioritizingJobScheduler[JsonJob]]
val subScheduler = mock[JobScheduler[JsonJob]]
val copyScheduler = mock[JobScheduler[JsonJob]]
- val manager = new ManagerService(nameServer, copier, scheduler, copyScheduler)
+ val manager = new ManagerService(nameServer, copier, scheduler, copyScheduler, null, 0)
val shard = mock[Shard]
val thriftShardInfo1 = new thrift.ShardInfo(new thrift.ShardId("hostname", "table_prefix"),

0 comments on commit 24f4de9

Please sign in to comment.