Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
moved multishard repair to gizzard
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Hull committed Jan 26, 2011
1 parent f97ac9e commit 0b9440a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 56 deletions.
3 changes: 1 addition & 2 deletions src/main/scala/com/twitter/flockdb/Edge.scala
Expand Up @@ -18,8 +18,7 @@ package com.twitter.flockdb

import com.twitter.util.Time
import com.twitter.flockdb.jobs.single._
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, JsonJob}
import com.twitter.flockdb.jobs.Repairable
import com.twitter.gizzard.scheduler.{PrioritizingJobScheduler, JsonJob, Repairable}

case class Edge(sourceId: Long, destinationId: Long, position: Long, updatedAt: Time, count: Int,
state: State) extends Repairable[Edge] {
Expand Down
68 changes: 15 additions & 53 deletions src/main/scala/com/twitter/flockdb/jobs/Repair.scala
Expand Up @@ -22,16 +22,11 @@ import com.twitter.gizzard.nameserver.NameServer
import com.twitter.ostrich.Stats
import com.twitter.util.TimeConversions._
import conversions.Numeric._
import shards.{Shard, Metadata}
import net.lag.logging.Logger
import com.twitter.gizzard.nameserver.{NameServer, NonExistentShard}
import com.twitter.gizzard.shards.{ShardDatabaseTimeoutException, ShardTimeoutException}
import collection.mutable.ListBuffer

trait Repairable[T] {
def similar(other: T): Int
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler[JsonJob], priority: Int): Unit
}
import shards.{Shard, Metadata}

object Repair {
type RepairCursor = (Cursor, Cursor)
Expand All @@ -57,54 +52,9 @@ class RepairParser(nameServer: NameServer[Shard], scheduler: PrioritizingJobSche
}
}

abstract class MultiShardRepair[R <: Repairable[R], C <: Any](shardIds: Seq[ShardId], cursor: C, count: Int,
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob]) extends RepairJob(shardIds, count, nameServer, scheduler, Repair.PRIORITY) {

def scheduleNextRepair(lowestItem: Option[R]): Unit

def forwardingManager = new ForwardingManager(nameServer)

def cursorAtEnd(cursor: C): Boolean

def smallestList(listCursors: Seq[(ListBuffer[R], C)]) = {
listCursors.map(_._1).filter(!_.isEmpty).reduceLeft((list1, list2) => if (list1(0).similar(list2(0)) < 0) list1 else list2)
}

def repairListCursor(listCursors: Seq[(ListBuffer[R], C)], tableIds: Seq[Int]) = {
if (tableIds.forall((id) => id == tableIds(0))) {
while (listCursors.forall(lc => !lc._1.isEmpty || cursorAtEnd(lc._2)) && listCursors.exists(lc => !lc._1.isEmpty)) {
val tableId = tableIds(0)
val firstList = smallestList(listCursors)
val firstItem = firstList.remove(0)
var firstEnqueued = false
val similarLists = listCursors.map(_._1).filter(!_.isEmpty).filter(_ != firstList).filter(_(0).similar(firstItem) == 0)
if (similarLists.size != (listCursors.size - 1) ) {
firstEnqueued = true
firstItem.schedule(tableId, forwardingManager, scheduler, priority)
}
for (list <- similarLists) {
if (firstItem == list(0)) {
list.remove(0)
} else {
if (!firstEnqueued) {
firstEnqueued = true
firstItem.schedule(tableId, forwardingManager, scheduler, priority)
}
list.remove(0).schedule(tableId, forwardingManager, scheduler, priority)
}
}
}
scheduleNextRepair(if (listCursors.filter(!_._1.isEmpty).size == 0) None else Some(smallestList(listCursors)(0)))
} else {
throw new RuntimeException("tableIds didn't match")
}

}
}

class Repair(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
extends MultiShardRepair[Edge, Repair.RepairCursor](shardIds, cursor, count, nameServer, scheduler) {
extends MultiShardRepair[Shard, Edge, Repair.RepairCursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) {

private val log = Logger.get(getClass.getName)

Expand All @@ -114,6 +64,12 @@ class Repair(shardIds: Seq[ShardId], cursor: Repair.RepairCursor, count: Int,

def cursorAtEnd(c: Repair.RepairCursor) = c == Repair.END

def forwardingManager = new ForwardingManager(nameServer)

def schedule(tableId: Int, item: Edge) = {
item.schedule(tableId, forwardingManager, scheduler, priority)
}

def repair(shards: Seq[Shard]) = {
val tableIds = shards.map((shard:Shard) => nameServer.getRootForwardings(shard.shardInfo.id)(0).tableId)

Expand Down Expand Up @@ -154,7 +110,7 @@ class MetadataRepairParser(nameServer: NameServer[Shard], scheduler: Prioritizin

class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor, count: Int,
nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler[JsonJob])
extends MultiShardRepair[Metadata, MetadataRepair.RepairCursor](shardIds, cursor, count, nameServer, scheduler) {
extends MultiShardRepair[Shard, Metadata, MetadataRepair.RepairCursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) {

private val log = Logger.get(getClass.getName)

Expand All @@ -165,6 +121,12 @@ class MetadataRepair(shardIds: Seq[ShardId], cursor: MetadataRepair.RepairCursor
})
}

def schedule(tableId: Int, item: Metadata) = {
item.schedule(tableId, forwardingManager, scheduler, priority)
}

def forwardingManager = new ForwardingManager(nameServer)

def cursorAtEnd(c: MetadataRepair.RepairCursor) = c == MetadataRepair.END

def generateCursor(metadata: Metadata) = {
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/com/twitter/flockdb/shards/Shard.scala
Expand Up @@ -22,7 +22,6 @@ import com.twitter.util.Time
import com.twitter.util.TimeConversions._
import flockdb.jobs.multi._
import com.twitter.gizzard.scheduler._
import com.twitter.flockdb.jobs.Repairable

case class Metadata(sourceId: Long, state: State, count: Int, updatedAt: Time) extends Repairable[Metadata] {
def schedule(tableId: Int, forwardingManager: ForwardingManager, scheduler: PrioritizingJobScheduler[JsonJob], priority: Int) = {
Expand Down

0 comments on commit 0b9440a

Please sign in to comment.