Permalink
Browse files

add bulk scheduling

  • Loading branch information...
Josh Hull
Josh Hull committed Mar 3, 2011
1 parent e8a45a8 commit 8a4d903332e5d8c59c03f440b8e3c187c24417f2
Showing with 22 additions and 14 deletions.
  1. +22 −14 src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala
@@ -116,6 +116,8 @@ abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Any](shardI
def scheduleMissing(list: (S, ListBuffer[R], C), tableId: Int, item: R): Unit
+ def scheduleBulk(otherShards: Seq[S], items: Seq[R]): Unit
+
def cursorAtEnd(cursor: C): Boolean
def lowestCursor(c1: C, c2: C): C
@@ -135,21 +137,27 @@ abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Any](shardI
while (listCursors.forall(lc => !lc._2.isEmpty || cursorAtEnd(lc._3)) && listCursors.exists(lc => !lc._2.isEmpty)) {
val tableId = tableIds(0)
val firstList = smallestList(listCursors)
- val firstItem = firstList._2.remove(0)
- var firstEnqueued = false
- val similarLists = listCursors.filter(!_._2.isEmpty).filter(_._1 != firstList._1).filter(_._2(0).similar(firstItem) == 0)
- if (similarLists.size != (listCursors.size - 1) ) {
- firstEnqueued = true
- scheduleMissing(firstList, tableId, firstItem)
- }
- for (list <- similarLists) {
- val listItem = list._2.remove(0)
- if (shouldSchedule(firstItem, listItem)) {
- if (!firstEnqueued) {
- firstEnqueued = true
- scheduleDifferent(firstList, tableId, firstItem)
+ val finishedLists = listCursors.filter(lc => cursorAtEnd(lc._3) && lc._2.isEmpty)
+ if (finishedLists.size == listCursors.size - 1) {
+ scheduleBulk(finishedLists.map(_._1), firstList._2)
+ firstList._2.clear
+ } else {
+ val firstItem = firstList._2.remove(0)
+ var firstEnqueued = false
+ val similarLists = listCursors.filter(!_._2.isEmpty).filter(_._1 != firstList._1).filter(_._2(0).similar(firstItem) == 0)
+ if (similarLists.size != (listCursors.size - 1) ) {
+ firstEnqueued = true
+ scheduleMissing(firstList, tableId, firstItem)
+ }
+ for (list <- similarLists) {
+ val listItem = list._2.remove(0)
+ if (shouldSchedule(firstItem, listItem)) {
+ if (!firstEnqueued) {
+ firstEnqueued = true
+ scheduleDifferent(firstList, tableId, firstItem)
+ }
+ scheduleDifferent(list, tableId, listItem)
}
- scheduleDifferent(list, tableId, listItem)
}
}
}

0 comments on commit 8a4d903

Please sign in to comment.