Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

removed cursorable

  • Loading branch information...
commit 5dddf087e45781f4be5c0680a62d68a5a6a7bc97 1 parent a8849d3
Josh Hull authored
View
10 src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala
@@ -6,7 +6,7 @@ import com.twitter.util.TimeConversions._
import net.lag.logging.Logger
import nameserver.{NameServer, NonExistentShard}
import collection.mutable.ListBuffer
-import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException, Cursorable}
+import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException}
trait Entity[T] {
def similar(other: T): Int
@@ -109,11 +109,13 @@ abstract case class CopyJob[S <: Shard](shardIds: Seq[ShardId],
def serialize: Map[String, Any]
}
-abstract class MultiShardCopy[S <: Shard, R <: Entity[R], C <: Cursorable[C]](shardIds: Seq[ShardId], cursor: C, count: Int,
+abstract class MultiShardCopy[S <: Shard, R <: Entity[R], C <: Ordered[C]](shardIds: Seq[ShardId], cursor: C, count: Int,
nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, priority: Int) extends CopyJob(shardIds, count, nameServer, scheduler, priority) {
private val log = Logger.get(getClass.getName)
+ def cursorAtEnd(cursor :C): Boolean
+
def nextCopy(lowestCursor: C): Option[CopyJob[S]]
def scheduleItem(missing: Boolean, list: (S, ListBuffer[R], C), tableId: Int, item: R): Unit
@@ -144,10 +146,10 @@ abstract class MultiShardCopy[S <: Shard, R <: Entity[R], C <: Cursorable[C]](sh
} else if (nameServer.getCommonShardId(shardIds) == None) {
throw new RuntimeException("these shardIds don't have a common ancestor")
} else {
- while (listCursors.forall(lc => !lc._2.isEmpty || lc._3.atEnd) && listCursors.exists(lc => !lc._2.isEmpty)) {
+ while (listCursors.forall(lc => !lc._2.isEmpty || cursorAtEnd(lc._3)) && listCursors.exists(lc => !lc._2.isEmpty)) {
val tableId = tableIds(0)
val firstList = smallestList(listCursors)
- val finishedLists = listCursors.filter(lc => lc._3.atEnd && lc._2.isEmpty)
+ val finishedLists = listCursors.filter(lc => cursorAtEnd(lc._3) && lc._2.isEmpty)
if (finishedLists.size == listCursors.size - 1) {
scheduleBulk(finishedLists.map(_._1), firstList._2)
firstList._2.clear
View
7 src/main/scala/com/twitter/gizzard/shards/Cursorable.scala
@@ -1,7 +0,0 @@
-package com.twitter.gizzard.shards
-
-trait Cursorable[T] extends Ordered[T] {
- def atEnd: Boolean
- def atStart: Boolean
-}
-
View
6 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
@@ -10,7 +10,7 @@ import collection.mutable.ListBuffer
import com.twitter.gizzard
import nameserver.NameServer
-import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException, Cursorable}
+import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException}
import scheduler.{JobScheduler, JsonJob, JsonJobParser, PrioritizingJobScheduler, Entity, MultiShardCopy, CopyJobFactory, CopyJobParser}
package object config {
@@ -161,7 +161,7 @@ object TestCursor {
val End = new TestCursor(EndPosition)
}
-case class TestCursor(position: Int) extends Cursorable[TestCursor] {
+case class TestCursor(position: Int) extends Ordered[TestCursor] {
def atStart = position == TestCursor.StartPosition
def atEnd = position == TestCursor.EndPosition
def compare(other: TestCursor) = {
@@ -269,6 +269,8 @@ extends CopyJobFactory[TestShard] {
class TestCopy(shardIds: Seq[ShardId], cursor: TestCursor, count: Int,
nameServer: NameServer[TestShard], scheduler: PrioritizingJobScheduler) extends MultiShardCopy[TestShard, TestResult, TestCursor](shardIds, cursor, count, nameServer, scheduler, Priority.High.id) {
+ def cursorAtEnd(c: TestCursor) = c.atEnd
+
def select(shard: TestShard, cursor: TestCursor, count: Int) = shard.getAll(cursor, count)
def scheduleBulk(otherShards: Seq[TestShard], items: Seq[TestResult]) = {
otherShards.foreach(_.putAll(items.map{i => (i.id, i.value)}))
Please sign in to comment.
Something went wrong with that request. Please try again.