From 960a28d1d2298e7e61d4034dfc0829f22977e2c0 Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Thu, 21 Apr 2011 13:35:47 -0700 Subject: [PATCH 01/11] initial cut --- .../com/twitter/gizzard/GizzardServer.scala | 11 +- .../twitter/gizzard/scheduler/CopyJob.scala | 259 ++++++------- .../twitter/gizzard/scheduler/RepairJob.scala | 61 +-- .../twitter/gizzard/shards/Cursorable.scala | 7 + .../gizzard/thrift/ManagerService.scala | 10 +- src/main/thrift/Manager.thrift | 2 +- .../gizzard/integration/TestServer.scala | 158 ++++++-- .../gizzard/scheduler_new/CopyJobSpec.scala | 363 +++++++++--------- .../thrift/ShardManagerServiceSpec.scala | 31 +- 9 files changed, 502 insertions(+), 400 deletions(-) create mode 100644 src/main/scala/com/twitter/gizzard/shards/Cursorable.scala diff --git a/src/main/scala/com/twitter/gizzard/GizzardServer.scala b/src/main/scala/com/twitter/gizzard/GizzardServer.scala index 10386062..198d9fb1 100644 --- a/src/main/scala/com/twitter/gizzard/GizzardServer.scala +++ b/src/main/scala/com/twitter/gizzard/GizzardServer.scala @@ -4,7 +4,7 @@ import com.twitter.util.Duration import com.twitter.util.TimeConversions._ import net.lag.logging.Logger import nameserver.{NameServer, BasicShardRepository} -import scheduler.{CopyJobFactory, JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec, RepairJobFactory} +import scheduler.{JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec, RepairJobFactory} import shards.{Shard, ReadWriteShard} import config.{GizzardServer => ServerConfig} @@ -12,12 +12,11 @@ import config.{GizzardServer => ServerConfig} abstract class GizzardServer[S <: Shard](config: ServerConfig) { def readWriteShardAdapter: ReadWriteShard[S] => S - def copyFactory: CopyJobFactory[S] def repairFactory: RepairJobFactory[S] = null def diffFactory: RepairJobFactory[S] = null def jobPriorities: Seq[Int] - def copyPriority: Int - def repairPriority: Int = copyPriority + //def copyPriority: Int + def repairPriority: Int def start(): Unit def shutdown(quiesce: Boolean): Unit def shutdown() { shutdown(false) } @@ -45,16 +44,12 @@ abstract class GizzardServer[S <: Shard](config: ServerConfig) { p -> config.jobQueues(p)(jobCodec) } toMap) - lazy val copyScheduler = jobScheduler(copyPriority).asInstanceOf[JobScheduler] - // service wiring lazy val managerServer = new thrift.ManagerService( nameServer, - copyFactory, jobScheduler, - copyScheduler, repairFactory, repairPriority, diffFactory) diff --git a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala index 39f548f6..06d1d6dd 100644 --- a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala +++ b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala @@ -1,129 +1,130 @@ -package com.twitter.gizzard -package scheduler - -import com.twitter.ostrich.Stats -import com.twitter.util.TimeConversions._ -import net.lag.logging.Logger -import nameserver.{NameServer, NonExistentShard} -import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException} - -object CopyJob { - val MIN_COPY = 500 -} - -/** - * A factory for creating a new copy job (with default count and a starting cursor) from a source - * and destination shard ID. - */ -trait CopyJobFactory[S <: Shard] extends ((ShardId, ShardId) => CopyJob[S]) - -/** - * A parser that creates a copy job out of json. The basic attributes (source shard ID, destination) - * shard ID, and count) are parsed out first, and the remaining attributes are passed to - * 'deserialize' to decode any shard-specific data (like a cursor). - */ -trait CopyJobParser[S <: Shard] extends JsonJobParser { - def deserialize(attributes: Map[String, Any], sourceId: ShardId, - destinationId: ShardId, count: Int): CopyJob[S] - - def apply(attributes: Map[String, Any]): JsonJob = { - deserialize(attributes, - ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString), - ShardId(attributes("destination_shard_hostname").toString, attributes("destination_shard_table_prefix").toString), - attributes("count").asInstanceOf[{def toInt: Int}].toInt) - } -} - -/** - * A json-encodable job that represents the state of a copy from one shard to another. - * - * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items. - * Other shard-specific data (like the cursor) can be encoded in 'serialize'. - * - * 'copyPage' is called to do the actual data copying. It should return a new CopyJob representing - * the next chunk of work to do, or None if the entire copying job is complete. - */ -abstract case class CopyJob[S <: Shard](sourceId: ShardId, - destinationId: ShardId, - var count: Int, - nameServer: NameServer[S], - scheduler: JobScheduler) - extends JsonJob { - private val log = Logger.get(getClass.getName) - - override def shouldReplicate = false - - def toMap = { - Map("source_shard_hostname" -> sourceId.hostname, - "source_shard_table_prefix" -> sourceId.tablePrefix, - "destination_shard_hostname" -> destinationId.hostname, - "destination_shard_table_prefix" -> destinationId.tablePrefix, - "count" -> count - ) ++ serialize - } - - def finish() { - nameServer.markShardBusy(destinationId, shards.Busy.Normal) - log.info("Copying finished for (type %s) from %s to %s", - getClass.getName.split("\\.").last, sourceId, destinationId) - Stats.clearGauge(gaugeName) - } - - def apply() { - try { - if (nameServer.getShard(destinationId).busy == shards.Busy.Cancelled) { - log.info("Copying cancelled for (type %s) from %s to %s", - getClass.getName.split("\\.").last, sourceId, destinationId) - Stats.clearGauge(gaugeName) - - } else { - - val sourceShard = nameServer.findShardById(sourceId) - val destinationShard = nameServer.findShardById(destinationId) - - log.info("Copying shard block (type %s) from %s to %s: state=%s", - getClass.getName.split("\\.").last, sourceId, destinationId, toMap) - // do this on each iteration, so it happens in the queue and can be retried if the db is busy: - nameServer.markShardBusy(destinationId, shards.Busy.Busy) - - this.nextJob = copyPage(sourceShard, destinationShard, count) - this.nextJob match { - case None => finish() - case _ => incrGauge - } - } - } catch { - case e: NonExistentShard => - log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.") - case e: ShardTimeoutException => - if (count > CopyJob.MIN_COPY) { - log.warning("Shard block copy timed out; trying a smaller block size.") - count = (count * 0.9).toInt - scheduler.put(this) - } else { - log.error("Shard block copy timed out on minimum block size.") - nameServer.markShardBusy(destinationId, shards.Busy.Error) - throw e - } - case e: ShardDatabaseTimeoutException => - log.warning("Shard block copy failed to get a database connection; retrying.") - scheduler.put(this) - case e: Throwable => - log.error(e, "Shard block copy stopped due to exception: %s", e) - nameServer.markShardBusy(destinationId, shards.Busy.Error) - throw e - } - } - - private def incrGauge = { - Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count) - } - - private def gaugeName = { - "x-copying-" + sourceId + "-" + destinationId - } - - def copyPage(sourceShard: S, destinationShard: S, count: Int): Option[CopyJob[S]] - - def serialize: Map[String, Any] -} +//package com.twitter.gizzard +//package scheduler +// +//import com.twitter.ostrich.Stats +//import com.twitter.util.TimeConversions._ +//import net.lag.logging.Logger +//import nameserver.{NameServer, NonExistentShard} +//import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException} +// +//object CopyJob { +// val MIN_COPY = 500 +//} +// +///** +// * A factory for creating a new copy job (with default count and a starting cursor) from a source +// * and destination shard ID. +// */ +//trait CopyJobFactory[S <: Shard] extends ((ShardId, ShardId) => CopyJob[S]) +// +///** +// * A parser that creates a copy job out of json. The basic attributes (source shard ID, destination) +// * shard ID, and count) are parsed out first, and the remaining attributes are passed to +// * 'deserialize' to decode any shard-specific data (like a cursor). +// */ +//trait CopyJobParser[S <: Shard] extends JsonJobParser { +// def deserialize(attributes: Map[String, Any], sourceId: ShardId, +// destinationId: ShardId, count: Int): CopyJob[S] +// +// def apply(attributes: Map[String, Any]): JsonJob = { +// deserialize(attributes, +// ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString), +// ShardId(attributes("destination_shard_hostname").toString, attributes("destination_shard_table_prefix").toString), +// attributes("count").asInstanceOf[{def toInt: Int}].toInt) +// } +//} +// +///** +// * A json-encodable job that represents the state of a copy from one shard to another. +// * +// * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items. +// * Other shard-specific data (like the cursor) can be encoded in 'serialize'. +// * +// * 'copyPage' is called to do the actual data copying. It should return a new CopyJob representing +// * the next chunk of work to do, or None if the entire copying job is complete. +// */ +//abstract case class CopyJob[S <: Shard](sourceId: ShardId, +// destinationId: ShardId, +// var count: Int, +// nameServer: NameServer[S], +// scheduler: JobScheduler) +// extends JsonJob { +// private val log = Logger.get(getClass.getName) +// +// override def shouldReplicate = false +// +// def toMap = { +// Map("source_shard_hostname" -> sourceId.hostname, +// "source_shard_table_prefix" -> sourceId.tablePrefix, +// "destination_shard_hostname" -> destinationId.hostname, +// "destination_shard_table_prefix" -> destinationId.tablePrefix, +// "count" -> count +// ) ++ serialize +// } +// +// def finish() { +// nameServer.markShardBusy(destinationId, shards.Busy.Normal) +// log.info("Copying finished for (type %s) from %s to %s", +// getClass.getName.split("\\.").last, sourceId, destinationId) +// Stats.clearGauge(gaugeName) +// } +// +// def apply() { +// try { +// if (nameServer.getShard(destinationId).busy == shards.Busy.Cancelled) { +// log.info("Copying cancelled for (type %s) from %s to %s", +// getClass.getName.split("\\.").last, sourceId, destinationId) +// Stats.clearGauge(gaugeName) +// +// } else { +// +// val sourceShard = nameServer.findShardById(sourceId) +// val destinationShard = nameServer.findShardById(destinationId) +// +// log.info("Copying shard block (type %s) from %s to %s: state=%s", +// getClass.getName.split("\\.").last, sourceId, destinationId, toMap) +// // do this on each iteration, so it happens in the queue and can be retried if the db is busy: +// nameServer.markShardBusy(destinationId, shards.Busy.Busy) +// +// this.nextJob = copyPage(sourceShard, destinationShard, count) +// this.nextJob match { +// case None => finish() +// case _ => incrGauge +// } +// } +// } catch { +// case e: NonExistentShard => +// log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.") +// case e: ShardTimeoutException => +// if (count > CopyJob.MIN_COPY) { +// log.warning("Shard block copy timed out; trying a smaller block size.") +// count = (count * 0.9).toInt +// scheduler.put(this) +// } else { +// log.error("Shard block copy timed out on minimum block size.") +// nameServer.markShardBusy(destinationId, shards.Busy.Error) +// throw e +// } +// case e: ShardDatabaseTimeoutException => +// log.warning("Shard block copy failed to get a database connection; retrying.") +// scheduler.put(this) +// case e: Throwable => +// log.error(e, "Shard block copy stopped due to exception: %s", e) +// nameServer.markShardBusy(destinationId, shards.Busy.Error) +// throw e +// } +// } +// +// private def incrGauge = { +// Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count) +// } +// +// private def gaugeName = { +// "x-copying-" + sourceId + "-" + destinationId +// } +// +// def copyPage(sourceShard: S, destinationShard: S, count: Int): Option[CopyJob[S]] +// +// def serialize: Map[String, Any] +//} +// \ No newline at end of file diff --git a/src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala b/src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala index 402d3d92..4744be72 100644 --- a/src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala +++ b/src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala @@ -6,10 +6,11 @@ import com.twitter.util.TimeConversions._ import net.lag.logging.Logger import nameserver.{NameServer, NonExistentShard} import collection.mutable.ListBuffer -import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException} +import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException, Cursorable} trait Repairable[T] { def similar(other: T): Int + def shouldRepair(other: T): Boolean } object RepairJob { @@ -56,17 +57,15 @@ abstract case class RepairJob[S <: Shard](shardIds: Seq[ShardId], override def shouldReplicate = false - def label(): String - def finish() { - log.info("[%s] - finished for (type %s) for %s", label, + log.info("[Repair] - finished for (type %s) for %s", getClass.getName.split("\\.").last, shardIds.mkString(", ")) Stats.clearGauge(gaugeName) } def apply() { try { - log.info("[%s] - shard block (type %s): state=%s", label, + log.info("[Repair] - shard block (type %s): state=%s", getClass.getName.split("\\.").last, toMap) val shardObjs = shardIds.map(nameServer.findShardById(_)) shardIds.foreach(nameServer.markShardBusy(_, shards.Busy.Busy)) @@ -77,16 +76,16 @@ abstract case class RepairJob[S <: Shard](shardIds: Seq[ShardId], } } catch { case e: NonExistentShard => - log.error("[%s] - failed because one of the shards doesn't exist. Terminating the repair.", label) + log.error("[Repair] - failed because one of the shards doesn't exist. Terminating the repair.") case e: ShardDatabaseTimeoutException => - log.warning("[%s] - failed to get a database connection; retrying.", label) + log.warning("[Repair] - failed to get a database connection; retrying.") scheduler.put(priority, this) case e: ShardTimeoutException if (count > RepairJob.MIN_COPY) => - log.warning("[%s] - block copy timed out; trying a smaller block size.", label) + log.warning("[Repair] - block copy timed out; trying a smaller block size.") count = (count * 0.9).toInt scheduler.put(priority, this) case e: Throwable => - log.warning(e, "[%s] - stopped due to exception: %s", label, e) + log.warning(e, "[Repair] - stopped due to exception: %s", e) throw e } } @@ -102,7 +101,7 @@ abstract case class RepairJob[S <: Shard](shardIds: Seq[ShardId], } private def gaugeName = { - "x-"+label.toLowerCase+"-" + shardIds.mkString("-") + "x-repair-" + shardIds.mkString("-") } def repair(shards: Seq[S]) @@ -110,39 +109,45 @@ abstract case class RepairJob[S <: Shard](shardIds: Seq[ShardId], def serialize: Map[String, Any] } -abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Any](shardIds: Seq[ShardId], cursor: C, count: Int, +abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Cursorable[C]](shardIds: Seq[ShardId], cursor: C, count: Int, nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, priority: Int) extends RepairJob(shardIds, count, nameServer, scheduler, priority) { private val log = Logger.get(getClass.getName) def nextRepair(lowestCursor: C): Option[RepairJob[S]] - def scheduleDifferent(list: (S, ListBuffer[R], C), tableId: Int, item: R): Unit - - def scheduleMissing(list: (S, ListBuffer[R], C), tableId: Int, item: R): Unit + def scheduleItem(missing: Boolean, list: (S, ListBuffer[R], C), tableId: Int, item: R): Unit def scheduleBulk(otherShards: Seq[S], items: Seq[R]): Unit - def cursorAtEnd(cursor: C): Boolean - - def lowestCursor(c1: C, c2: C): C - def smallestList(listCursors: Seq[(S, ListBuffer[R], C)]) = { listCursors.filter(!_._2.isEmpty).reduceLeft((list1, list2) => if (list1._2(0).similar(list2._2(0)) < 0) list1 else list2) } - def shouldSchedule(original:R, suspect: R): Boolean + def select(shard: S, cursor: C, count: Int): (Seq[R], C) + + def repair(shards: Seq[S]) = { + val tableIds = shards.map(shard => nameServer.getRootForwardings(shard.shardInfo.id).head.tableId) + + val listCursors = shards.map( (shard) => { + val (seq, newCursor) = select(shard, cursor, count) + val list = new ListBuffer[R]() + list ++= seq + (shard, list, newCursor) + }) + repairListCursor(listCursors, tableIds) + } - def repairListCursor(listCursors: Seq[(S, ListBuffer[R], C)], tableIds: Seq[Int]) = { + private def repairListCursor(listCursors: Seq[(S, ListBuffer[R], C)], tableIds: Seq[Int]) = { if (!tableIds.forall((id) => id == tableIds(0))) { throw new RuntimeException("tableIds didn't match") } else if (nameServer.getCommonShardId(shardIds) == None) { throw new RuntimeException("these shardIds don't have a common ancestor") } else { - while (listCursors.forall(lc => !lc._2.isEmpty || cursorAtEnd(lc._3)) && listCursors.exists(lc => !lc._2.isEmpty)) { + while (listCursors.forall(lc => !lc._2.isEmpty || lc._3.atEnd) && listCursors.exists(lc => !lc._2.isEmpty)) { val tableId = tableIds(0) val firstList = smallestList(listCursors) - val finishedLists = listCursors.filter(lc => cursorAtEnd(lc._3) && lc._2.isEmpty) + val finishedLists = listCursors.filter(lc => lc._3.atEnd && lc._2.isEmpty) if (finishedLists.size == listCursors.size - 1) { scheduleBulk(finishedLists.map(_._1), firstList._2) firstList._2.clear @@ -152,21 +157,21 @@ abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Any](shardI val similarLists = listCursors.filter(!_._2.isEmpty).filter(_._1 != firstList._1).filter(_._2(0).similar(firstItem) == 0) if (similarLists.size != (listCursors.size - 1) ) { firstEnqueued = true - scheduleMissing(firstList, tableId, firstItem) + scheduleItem(true, firstList, tableId, firstItem) } for (list <- similarLists) { - val listItem = list._2.remove(0) - if (shouldSchedule(firstItem, listItem)) { + val similarItem = list._2.remove(0) + if (firstItem.shouldRepair(similarItem)) { if (!firstEnqueued) { firstEnqueued = true - scheduleDifferent(firstList, tableId, firstItem) + scheduleItem(false, firstList, tableId, firstItem) } - scheduleDifferent(list, tableId, listItem) + scheduleItem(false, list, tableId, similarItem) } } } } - val nextCursor = listCursors.map(_._3).reduceLeft((c1, c2) => lowestCursor(c1, c2)) + val nextCursor = listCursors.map(_._3).reduceLeft((c1, c2) => if (c1.compare(c2) <= 0) c1 else c2) this.nextJob = nextRepair(nextCursor) } } diff --git a/src/main/scala/com/twitter/gizzard/shards/Cursorable.scala b/src/main/scala/com/twitter/gizzard/shards/Cursorable.scala new file mode 100644 index 00000000..06adec17 --- /dev/null +++ b/src/main/scala/com/twitter/gizzard/shards/Cursorable.scala @@ -0,0 +1,7 @@ +package com.twitter.gizzard.shards + +trait Cursorable[T] extends Ordered[T] { + def atEnd: Boolean + def atStart: Boolean +} + diff --git a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala index c8a93f52..8c498bfb 100644 --- a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala +++ b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala @@ -11,13 +11,13 @@ import com.twitter.gizzard.thrift.conversions.ShardInfo._ import com.twitter.gizzard.thrift.conversions.Forwarding._ import com.twitter.gizzard.thrift.conversions.Host._ import com.twitter.gizzard.shards._ -import com.twitter.gizzard.scheduler.{CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler, RepairJobFactory} +import com.twitter.gizzard.scheduler.{JsonJob, JobScheduler, PrioritizingJobScheduler, RepairJobFactory} import com.twitter.gizzard.nameserver._ import net.lag.logging.Logger import java.util.{List => JList} -class ManagerService[S <: shards.Shard](nameServer: NameServer[S], copier: CopyJobFactory[S], scheduler: PrioritizingJobScheduler, copyScheduler: JobScheduler, repairer: RepairJobFactory[S], repairPriority: Int, differ: RepairJobFactory[S]) extends Manager.Iface { +class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, repairer: RepairJobFactory[S], repairPriority: Int, differ: RepairJobFactory[S]) extends Manager.Iface { val log = Logger.get(getClass.getName) def wrapEx[A](f: => A): A = try { f } catch { @@ -99,9 +99,9 @@ class ManagerService[S <: shards.Shard](nameServer: NameServer[S], copier: CopyJ def mark_shard_busy(id: ShardId, busy: Int) = { wrapEx(nameServer.markShardBusy(id.fromThrift, busy.fromThrift)) } - def copy_shard(sourceId: ShardId, destinationId: ShardId) = { - wrapEx(copyScheduler.put(copier(sourceId.fromThrift, destinationId.fromThrift))) - } + //def copy_shard(sourceId: ShardId, destinationId: ShardId) = { + // wrapEx(copyScheduler.put(copier(sourceId.fromThrift, destinationId.fromThrift))) + //} def list_tables(): JList[java.lang.Integer] = wrapEx(nameServer.listTables) diff --git a/src/main/thrift/Manager.thrift b/src/main/thrift/Manager.thrift index b8726c0e..d6569977 100644 --- a/src/main/thrift/Manager.thrift +++ b/src/main/thrift/Manager.thrift @@ -89,7 +89,7 @@ service Manager { list list_hostnames() throws(1: GizzardException ex) void mark_shard_busy(1: ShardId id, 2: i32 busy) throws(1: GizzardException ex) - void copy_shard(1: ShardId source_id, 2: ShardId destination_id) throws(1: GizzardException ex) + //void copy_shard(1: ShardId source_id, 2: ShardId destination_id) throws(1: GizzardException ex) void repair_shard(1: list ids) throws(1: GizzardException ex) void diff_shards(1: list ids) throws(1: GizzardException ex) diff --git a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala index 771430b8..5c70fb77 100644 --- a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala +++ b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala @@ -6,11 +6,12 @@ import com.twitter.querulous import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator} import com.twitter.querulous.config.Connection import com.twitter.querulous.query.SqlQueryTimeoutException +import collection.mutable.ListBuffer import com.twitter.gizzard import nameserver.NameServer -import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException} -import scheduler.{JobScheduler, JsonJob, CopyJob, CopyJobParser, CopyJobFactory, JsonJobParser, PrioritizingJobScheduler} +import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException, Cursorable} +import scheduler.{JobScheduler, JsonJob, JsonJobParser, PrioritizingJobScheduler, Repairable, MultiShardRepair, RepairJobFactory, RepairJobParser} package object config { import com.twitter.gizzard.config._ @@ -73,6 +74,8 @@ package object config { Priority.Low.id -> new TestJobScheduler { val name = queueBase+"_low" } ) + def repairPriority = Priority.Low.id + jobInjector.port = iPort manager.port = mPort } @@ -93,13 +96,15 @@ class TestServer(conf: config.TestServer) extends GizzardServer[TestShard](conf) val readWriteShardAdapter = new TestReadWriteAdapter(_) val jobPriorities = List(Priority.High.id, Priority.Low.id) - val copyPriority = Priority.Low.id - val copyFactory = new TestCopyFactory(nameServer, jobScheduler(Priority.Low.id)) + //val copyPriority = Priority.Low.id + + def repairPriority = Priority.Low.id + //val repairFactory = new TestRepairFactory(nameServer, jobScheduler(Priority.Low.id)) shardRepo += ("TestShard" -> new SqlShardFactory(conf.queryEvaluator(), conf.databaseConnection)) jobCodec += ("Put".r -> new PutParser(nameServer.findCurrentForwarding(0, _))) - jobCodec += ("Copy".r -> new TestCopyParser(nameServer, jobScheduler(Priority.Low.id))) + //jobCodec += ("Copy".r -> new TestCopyParser(nameServer, jobScheduler(Priority.Low.id))) // service listener @@ -136,17 +141,48 @@ extends thrift.TestServer.Iface { def get(key: Int) = forwarding(key).get(key).toList.map(asTestResult) - private def asTestResult(t: (Int, String, Int)) = new thrift.TestResult(t._1, t._2, t._3) + private def asTestResult(t: TestResult) = new thrift.TestResult(t.id, t.value, t.count) } // Shard Definitions +case class TestResult(id: Int, value: String, count: Int) extends Repairable[TestResult] { + def similar(other: TestResult) = { + id.compare(other.id) match { + case x if x != 0 => x + case _ => value.compare(other.value) + } + } + def shouldRepair(other: TestResult) = { + similar(other) == 0 && count != other.count + } +} + +object TestCursor { + val Start = new TestCursor(-1) + val End = new TestCursor(0) +} + +case class TestCursor(position: Int) extends Cursorable[TestCursor] { + def atStart = position == -1 + def atEnd = position == 0 + def compare(other: TestCursor) = { + (atEnd, other.atEnd) match { + case (true, true) => 0 + case (true, false) => 1 + case (false, true) => -1 + case _ => position.compare(other.position) + } + } +} + trait TestShard extends shards.Shard { def put(key: Int, value: String): Unit def putAll(kvs: Seq[(Int, String)]): Unit - def get(key: Int): Option[(Int, String, Int)] - def getAll(key: Int, count: Int): Seq[(Int, String, Int)] + def get(key: Int): Option[TestResult] + def getAll(key: Int, count: Int): (Seq[TestResult], TestCursor) + def getAll(key: TestCursor, count: Int): (Seq[TestResult], TestCursor) } class TestReadWriteAdapter(s: shards.ReadWriteShard[TestShard]) @@ -155,6 +191,7 @@ extends shards.ReadWriteShardAdapter(s) with TestShard { def putAll(kvs: Seq[(Int,String)]) = s.writeOperation(_.putAll(kvs)) def get(k: Int) = s.readOperation(_.get(k)) def getAll(k:Int, c: Int) = s.readOperation(_.getAll(k,c)) + def getAll(k:TestCursor, c: Int) = s.readOperation(_.getAll(k,c)) } class SqlShardFactory(qeFactory: QueryEvaluatorFactory, conn: Connection) @@ -194,7 +231,7 @@ extends TestShard { private val getSql = "select * from " + table + " where id = ?" private val getAllSql = "select * from " + table + " where id > ? limit ?" - private def asResult(r: ResultSet) = (r.getInt("id"), r.getString("value"), r.getInt("count")) + private def asResult(r: ResultSet) = new TestResult(r.getInt("id"), r.getString("value"), r.getInt("count")) def put(key: Int, value: String) { evaluator.execute(putSql, key, value) } def putAll(kvs: Seq[(Int, String)]) { @@ -202,7 +239,12 @@ extends TestShard { } def get(key: Int) = evaluator.selectOne(getSql, key)(asResult) - def getAll(key: Int, count: Int) = evaluator.select(getAllSql, key, count)(asResult) + def getAll(key: Int, count: Int) = { + val result = evaluator.select(getAllSql, key, count)(asResult) + (result, if (result.size != count) TestCursor.End else new TestCursor(key + count)) + } + + def getAll(key: TestCursor, count: Int) = getAll(key.position, count) } @@ -219,33 +261,85 @@ class PutJob(key: Int, value: String, forwarding: Long => TestShard) extends Jso def apply() { forwarding(key).put(key, value) } } -class TestCopyFactory(ns: NameServer[TestShard], s: JobScheduler) -extends CopyJobFactory[TestShard] { - def apply(src: ShardId, dest: ShardId) = new TestCopy(src, dest, 0, 500, ns, s) -} +// class MetadataRepair(shardIds: Seq[ShardId], cursor: Cursor, count: Int, +// nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler) +// extends MultiShardRepair[Shard, Metadata, Cursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) { -class TestCopyParser(ns: NameServer[TestShard], s: JobScheduler) -extends CopyJobParser[TestShard] { - def deserialize(m: Map[String, Any], src: ShardId, dest: ShardId, count: Int) = { - val cursor = m("cursor").asInstanceOf[Int] - val count = m("count").asInstanceOf[Int] - new TestCopy(src, dest, cursor, count, ns, s) +class TestRepairFactory(ns: NameServer[TestShard], s: PrioritizingJobScheduler) +extends RepairJobFactory[TestShard] { + def apply(shardIds: Seq[ShardId]) = { + new TestRepair(shardIds, TestCursor.Start, 500, ns, s) } } -class TestCopy(srcId: ShardId, destId: ShardId, cursor: Int, count: Int, - ns: NameServer[TestShard], s: JobScheduler) -extends CopyJob[TestShard](srcId, destId, count, ns, s) { - def copyPage(src: TestShard, dest: TestShard, count: Int) = { - val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) } +class TestRepair(shardIds: Seq[ShardId], cursor: TestCursor, count: Int, + nameServer: NameServer[TestShard], scheduler: PrioritizingJobScheduler) extends MultiShardRepair[TestShard, TestResult, TestCursor](shardIds, cursor, count, nameServer, scheduler, Priority.Low.id) { - if (rows.isEmpty) { - None - } else { - dest.putAll(rows) - Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s)) - } + def select(shard: TestShard, cursor: TestCursor, count: Int) = shard.getAll(cursor, count) + def scheduleBulk(otherShards: Seq[TestShard], items: Seq[TestResult]) = { + otherShards.foreach(_.putAll(items.map{i => (i.id, i.value)})) + } + def scheduleItem(missing: Boolean, list: (TestShard, ListBuffer[TestResult], TestCursor), tableId: Int, item: TestResult) = { + scheduler.put(Priority.Low.id, new PutJob(item.id, item.value, nameServer.findCurrentForwarding(0, _))) + } + def nextRepair(lowestCursor: TestCursor) = { + if (lowestCursor.atEnd) None else Some(new TestRepair(shardIds, lowestCursor, count, nameServer, scheduler)) } + def serialize = Map("cursor" -> cursor.position) +} - def serialize = Map("cursor" -> cursor) +class TestRepairParser(ns: NameServer[TestShard], s: PrioritizingJobScheduler) +extends RepairJobParser[TestShard] { + def deserialize(m: Map[String, Any], shardIds: Seq[ShardId], count: Int) = { + val cursor = new TestCursor(m("cursor").asInstanceOf[Int]) + new TestRepair(shardIds, cursor, count, ns, s) + } } +// +//class TestCopy(srcId: ShardId, destId: ShardId, cursor: Int, count: Int, +// ns: NameServer[TestShard], s: JobScheduler) +//extends CopyJob[TestShard](srcId, destId, count, ns, s) { +// def copyPage(src: TestShard, dest: TestShard, count: Int) = { +// val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) } +// +// if (rows.isEmpty) { +// None +// } else { +// dest.putAll(rows) +// Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s)) +// } +// } +// +// def serialize = Map("cursor" -> cursor) +//} + +//class TestCopyFactory(ns: NameServer[TestShard], s: JobScheduler) +//extends CopyJobFactory[TestShard] { +// def apply(src: ShardId, dest: ShardId) = new TestCopy(src, dest, 0, 500, ns, s) +//} + +//class TestCopyParser(ns: NameServer[TestShard], s: JobScheduler) +//extends CopyJobParser[TestShard] { +// def deserialize(m: Map[String, Any], src: ShardId, dest: ShardId, count: Int) = { +// val cursor = m("cursor").asInstanceOf[Int] +// val count = m("count").asInstanceOf[Int] +// new TestCopy(src, dest, cursor, count, ns, s) +// } +//} + +//class TestCopy(srcId: ShardId, destId: ShardId, cursor: Int, count: Int, +// ns: NameServer[TestShard], s: JobScheduler) +//extends CopyJob[TestShard](srcId, destId, count, ns, s) { +// def copyPage(src: TestShard, dest: TestShard, count: Int) = { +// val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) } +// +// if (rows.isEmpty) { +// None +// } else { +// dest.putAll(rows) +// Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s)) +// } +// } +// +// def serialize = Map("cursor" -> cursor) +//+++++++++++++++++++++++++++++++++++767766} diff --git a/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala b/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala index 78712f77..e97fefb8 100644 --- a/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala +++ b/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala @@ -1,181 +1,182 @@ -package com.twitter.gizzard -package scheduler - -import com.twitter.util.TimeConversions._ -import org.specs.Specification -import org.specs.mock.{ClassMocker, JMocker} - - -class FakeCopy(val sourceShardId: shards.ShardId, val destinationShardId: shards.ShardId, count: Int, - nameServer: nameserver.NameServer[shards.Shard], scheduler: JobScheduler)(nextCopy: => Option[FakeCopy]) - extends CopyJob[shards.Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) { - def serialize = Map("cursor" -> 1) - - @throws(classOf[Exception]) - def copyPage(sourceShard: shards.Shard, destinationShard: shards.Shard, count: Int) = nextCopy - - override def equals(that: Any) = that match { - case that: FakeCopy => - this.sourceShardId == that.sourceShardId && - this.destinationShardId == that.destinationShardId - case _ => false - } -} - -object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker { - "CopyJob" should { - val sourceShardId = shards.ShardId("testhost", "1") - val destinationShardId = shards.ShardId("testhost", "2") - val destinationShardInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Normal) - val count = CopyJob.MIN_COPY + 1 - val nextCopy = mock[FakeCopy] - val nameServer = mock[nameserver.NameServer[shards.Shard]] - val jobScheduler = mock[JobScheduler] - val makeCopy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(_) - val shard1 = mock[shards.Shard] - val shard2 = mock[shards.Shard] - - "toMap" in { - val copy = makeCopy(Some(nextCopy)) - copy.toMap mustEqual Map( - "source_shard_hostname" -> sourceShardId.hostname, - "source_shard_table_prefix" -> sourceShardId.tablePrefix, - "destination_shard_hostname" -> destinationShardId.hostname, - "destination_shard_table_prefix" -> destinationShardId.tablePrefix, - "count" -> count - ) ++ copy.serialize - } - - "toJson" in { - val copy = makeCopy(Some(nextCopy)) - val json = copy.toJson - json mustMatch "Copy" - json mustMatch "\"source_shard_hostname\":\"%s\"".format(sourceShardId.hostname) - json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(sourceShardId.tablePrefix) - json mustMatch "\"destination_shard_hostname\":\"%s\"".format(destinationShardId.hostname) - json mustMatch "\"destination_shard_table_prefix\":\"%s\"".format(destinationShardId.tablePrefix) - json mustMatch "\"count\":" + count - } - - "apply" in { - "normally" in { - val copy = makeCopy(Some(nextCopy)) - expect { - one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo - one(nameServer).findShardById(sourceShardId) willReturn shard1 - one(nameServer).findShardById(destinationShardId) willReturn shard2 - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) - } - - copy.apply() - } - - "no source shard" in { - val copy = makeCopy(Some(nextCopy)) - expect { - one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo - one(nameServer).findShardById(sourceShardId) willThrow new nameserver.NonExistentShard("foo") - } - - copy.apply() - } - - "no destination shard" in { - val copy = makeCopy(Some(nextCopy)) - expect { - one(nameServer).getShard(destinationShardId) willThrow new nameserver.NonExistentShard("foo") - } - - copy.apply() - } - - "with a database connection timeout" in { - val copy = makeCopy(throw new shards.ShardDatabaseTimeoutException(100.milliseconds, sourceShardId)) - expect { - one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo - one(nameServer).findShardById(sourceShardId) willReturn shard1 - one(nameServer).findShardById(destinationShardId) willReturn shard2 - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) - one(jobScheduler).put(copy) - } - - copy.apply() - copy.toMap("count") mustEqual (count * 0.9).toInt - } - - "with a random exception" in { - val copy = makeCopy(throw new Exception("boo")) - expect { - one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo - one(nameServer).findShardById(sourceShardId) willReturn shard1 - one(nameServer).findShardById(destinationShardId) willReturn shard2 - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) - never(jobScheduler).put(nextCopy) - } - - copy.apply() must throwA[Exception] - } - - "with a shard timeout" in { - "early on" in { - val copy = makeCopy(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) - expect { - one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo - one(nameServer).findShardById(sourceShardId) willReturn shard1 - one(nameServer).findShardById(destinationShardId) willReturn shard2 - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) - one(jobScheduler).put(copy) - } - - copy.apply() - } - - "after too many retries" in { - val count = CopyJob.MIN_COPY - 1 - val copy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) - - expect { - one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo - one(nameServer).findShardById(sourceShardId) willReturn shard1 - one(nameServer).findShardById(destinationShardId) willReturn shard2 - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) - never(jobScheduler).put(nextCopy) - } - - copy.apply() must throwA[Exception] - } - } - - "when cancelled" in { - val copy = makeCopy(Some(nextCopy)) - val cancelledInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Cancelled) - - expect { - one(nameServer).getShard(destinationShardId) willReturn cancelledInfo - never(nameServer).findShardById(sourceShardId) - never(nameServer).findShardById(destinationShardId) - never(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) - never(jobScheduler).put(nextCopy) - } - - copy.apply() - } - - "when finished" in { - val copy = makeCopy(None) - - expect { - one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo - one(nameServer).findShardById(sourceShardId) willReturn shard1 - one(nameServer).findShardById(destinationShardId) willReturn shard2 - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) - one(nameServer).markShardBusy(destinationShardId, shards.Busy.Normal) - } - - copy.apply() - } - } - } -} +//package com.twitter.gizzard +//package scheduler +// +//import com.twitter.util.TimeConversions._ +//import org.specs.Specification +//import org.specs.mock.{ClassMocker, JMocker} +// +// +//class FakeCopy(val sourceShardId: shards.ShardId, val destinationShardId: shards.ShardId, count: Int, +// nameServer: nameserver.NameServer[shards.Shard], scheduler: JobScheduler)(nextCopy: => Option[FakeCopy]) +// extends CopyJob[shards.Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) { +// def serialize = Map("cursor" -> 1) +// +// @throws(classOf[Exception]) +// def copyPage(sourceShard: shards.Shard, destinationShard: shards.Shard, count: Int) = nextCopy +// +// override def equals(that: Any) = that match { +// case that: FakeCopy => +// this.sourceShardId == that.sourceShardId && +// this.destinationShardId == that.destinationShardId +// case _ => false +// } +//} +// +//object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker { +// "CopyJob" should { +// val sourceShardId = shards.ShardId("testhost", "1") +// val destinationShardId = shards.ShardId("testhost", "2") +// val destinationShardInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Normal) +// val count = CopyJob.MIN_COPY + 1 +// val nextCopy = mock[FakeCopy] +// val nameServer = mock[nameserver.NameServer[shards.Shard]] +// val jobScheduler = mock[JobScheduler] +// val makeCopy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(_) +// val shard1 = mock[shards.Shard] +// val shard2 = mock[shards.Shard] +// +// "toMap" in { +// val copy = makeCopy(Some(nextCopy)) +// copy.toMap mustEqual Map( +// "source_shard_hostname" -> sourceShardId.hostname, +// "source_shard_table_prefix" -> sourceShardId.tablePrefix, +// "destination_shard_hostname" -> destinationShardId.hostname, +// "destination_shard_table_prefix" -> destinationShardId.tablePrefix, +// "count" -> count +// ) ++ copy.serialize +// } +// +// "toJson" in { +// val copy = makeCopy(Some(nextCopy)) +// val json = copy.toJson +// json mustMatch "Copy" +// json mustMatch "\"source_shard_hostname\":\"%s\"".format(sourceShardId.hostname) +// json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(sourceShardId.tablePrefix) +// json mustMatch "\"destination_shard_hostname\":\"%s\"".format(destinationShardId.hostname) +// json mustMatch "\"destination_shard_table_prefix\":\"%s\"".format(destinationShardId.tablePrefix) +// json mustMatch "\"count\":" + count +// } +// +// "apply" in { +// "normally" in { +// val copy = makeCopy(Some(nextCopy)) +// expect { +// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo +// one(nameServer).findShardById(sourceShardId) willReturn shard1 +// one(nameServer).findShardById(destinationShardId) willReturn shard2 +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) +// } +// +// copy.apply() +// } +// +// "no source shard" in { +// val copy = makeCopy(Some(nextCopy)) +// expect { +// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo +// one(nameServer).findShardById(sourceShardId) willThrow new nameserver.NonExistentShard("foo") +// } +// +// copy.apply() +// } +// +// "no destination shard" in { +// val copy = makeCopy(Some(nextCopy)) +// expect { +// one(nameServer).getShard(destinationShardId) willThrow new nameserver.NonExistentShard("foo") +// } +// +// copy.apply() +// } +// +// "with a database connection timeout" in { +// val copy = makeCopy(throw new shards.ShardDatabaseTimeoutException(100.milliseconds, sourceShardId)) +// expect { +// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo +// one(nameServer).findShardById(sourceShardId) willReturn shard1 +// one(nameServer).findShardById(destinationShardId) willReturn shard2 +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) +// one(jobScheduler).put(copy) +// } +// +// copy.apply() +// copy.toMap("count") mustEqual (count * 0.9).toInt +// } +// +// "with a random exception" in { +// val copy = makeCopy(throw new Exception("boo")) +// expect { +// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo +// one(nameServer).findShardById(sourceShardId) willReturn shard1 +// one(nameServer).findShardById(destinationShardId) willReturn shard2 +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) +// never(jobScheduler).put(nextCopy) +// } +// +// copy.apply() must throwA[Exception] +// } +// +// "with a shard timeout" in { +// "early on" in { +// val copy = makeCopy(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) +// expect { +// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo +// one(nameServer).findShardById(sourceShardId) willReturn shard1 +// one(nameServer).findShardById(destinationShardId) willReturn shard2 +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) +// one(jobScheduler).put(copy) +// } +// +// copy.apply() +// } +// +// "after too many retries" in { +// val count = CopyJob.MIN_COPY - 1 +// val copy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) +// +// expect { +// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo +// one(nameServer).findShardById(sourceShardId) willReturn shard1 +// one(nameServer).findShardById(destinationShardId) willReturn shard2 +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) +// never(jobScheduler).put(nextCopy) +// } +// +// copy.apply() must throwA[Exception] +// } +// } +// +// "when cancelled" in { +// val copy = makeCopy(Some(nextCopy)) +// val cancelledInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Cancelled) +// +// expect { +// one(nameServer).getShard(destinationShardId) willReturn cancelledInfo +// never(nameServer).findShardById(sourceShardId) +// never(nameServer).findShardById(destinationShardId) +// never(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) +// never(jobScheduler).put(nextCopy) +// } +// +// copy.apply() +// } +// +// "when finished" in { +// val copy = makeCopy(None) +// +// expect { +// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo +// one(nameServer).findShardById(sourceShardId) willReturn shard1 +// one(nameServer).findShardById(destinationShardId) willReturn shard2 +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) +// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Normal) +// } +// +// copy.apply() +// } +// } +// } +//} +// \ No newline at end of file diff --git a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala index fec206bb..3a13215e 100644 --- a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala +++ b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala @@ -8,17 +8,16 @@ import com.twitter.gizzard.thrift.conversions.Sequences._ import com.twitter.gizzard.thrift.conversions.ShardId._ import com.twitter.gizzard.thrift.conversions.ShardInfo._ import shards.{Busy, Shard} -import scheduler.{CopyJob, CopyJobFactory, JobScheduler, PrioritizingJobScheduler, JsonJob} +import scheduler.{JobScheduler, PrioritizingJobScheduler, JsonJob} object ManagerServiceSpec extends ConfiguredSpecification with JMocker with ClassMocker { val nameServer = mock[nameserver.NameServer[Shard]] - val copier = mock[CopyJobFactory[Shard]] + //val copier = mock[CopyJobFactory[Shard]] val scheduler = mock[PrioritizingJobScheduler] val subScheduler = mock[JobScheduler] - val copyScheduler = mock[JobScheduler] - val manager = new ManagerService(nameServer, copier, scheduler, copyScheduler, null, 0, null) + val manager = new ManagerService(nameServer, scheduler, null, 0, null) val shard = mock[Shard] val thriftShardInfo1 = new thrift.ShardInfo(new thrift.ShardId("hostname", "table_prefix"), @@ -109,18 +108,18 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas manager.mark_shard_busy(thriftShardInfo1.id, Busy.Busy.id) } - "copy_shard" in { - val shardId1 = new shards.ShardId("hostname1", "table1") - val shardId2 = new shards.ShardId("hostname2", "table2") - val copyJob = mock[CopyJob[Shard]] - - expect { - one(copier).apply(shardId1, shardId2) willReturn copyJob - one(copyScheduler).put(copyJob) - } - - manager.copy_shard(shardId1.toThrift, shardId2.toThrift) - } + //"copy_shard" in { + // val shardId1 = new shards.ShardId("hostname1", "table1") + // val shardId2 = new shards.ShardId("hostname2", "table2") + // val copyJob = mock[CopyJob[Shard]] + // + // expect { + // one(copier).apply(shardId1, shardId2) willReturn copyJob + // one(copyScheduler).put(copyJob) + // } + // + // manager.copy_shard(shardId1.toThrift, shardId2.toThrift) + //} "set_forwarding" in { expect { From 74f38f7cb4a165ccdf2efc0cb6135e9c307362cd Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Fri, 22 Apr 2011 13:34:53 -0700 Subject: [PATCH 02/11] cleanup, working tests --- .../com/twitter/gizzard/GizzardServer.scala | 8 +- .../gizzard/thrift/ManagerService.scala | 8 +- src/main/thrift/Manager.thrift | 2 - .../gizzard/ConfiguredSpecification.scala | 22 ++- .../gizzard/integration/RepairSpec.scala | 111 +++++++++++ .../gizzard/integration/TestServer.scala | 35 ++-- .../gizzard/scheduler_new/CopyJobSpec.scala | 182 ------------------ .../thrift/ShardManagerServiceSpec.scala | 2 +- 8 files changed, 150 insertions(+), 220 deletions(-) create mode 100644 src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala delete mode 100644 src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala diff --git a/src/main/scala/com/twitter/gizzard/GizzardServer.scala b/src/main/scala/com/twitter/gizzard/GizzardServer.scala index 198d9fb1..07fd7a66 100644 --- a/src/main/scala/com/twitter/gizzard/GizzardServer.scala +++ b/src/main/scala/com/twitter/gizzard/GizzardServer.scala @@ -12,10 +12,8 @@ import config.{GizzardServer => ServerConfig} abstract class GizzardServer[S <: Shard](config: ServerConfig) { def readWriteShardAdapter: ReadWriteShard[S] => S - def repairFactory: RepairJobFactory[S] = null - def diffFactory: RepairJobFactory[S] = null + def repairFactory: RepairJobFactory[S] def jobPriorities: Seq[Int] - //def copyPriority: Int def repairPriority: Int def start(): Unit def shutdown(quiesce: Boolean): Unit @@ -32,7 +30,6 @@ abstract class GizzardServer[S <: Shard](config: ServerConfig) { lazy val shardRepo = new BasicShardRepository[S](readWriteShardAdapter, replicationFuture) lazy val nameServer = config.nameServer(shardRepo) - // job wiring def logUnparsableJob(j: Array[Byte]) { @@ -51,8 +48,7 @@ abstract class GizzardServer[S <: Shard](config: ServerConfig) { nameServer, jobScheduler, repairFactory, - repairPriority, - diffFactory) + repairPriority) lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer)) diff --git a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala index 8c498bfb..64fee2a2 100644 --- a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala +++ b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala @@ -17,7 +17,7 @@ import net.lag.logging.Logger import java.util.{List => JList} -class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, repairer: RepairJobFactory[S], repairPriority: Int, differ: RepairJobFactory[S]) extends Manager.Iface { +class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, repairer: RepairJobFactory[S], repairPriority: Int) extends Manager.Iface { val log = Logger.get(getClass.getName) def wrapEx[A](f: => A): A = try { f } catch { @@ -113,12 +113,6 @@ class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: Pr ))) } - def diff_shards(shardIds: JList[ShardId]) = { - wrapEx((scheduler.asInstanceOf[PrioritizingJobScheduler]).put(repairPriority, differ( - shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift) - ))) - } - // Job Scheduler Management def retry_errors() = wrapEx(scheduler.retryErrors()) diff --git a/src/main/thrift/Manager.thrift b/src/main/thrift/Manager.thrift index d6569977..98ef3a04 100644 --- a/src/main/thrift/Manager.thrift +++ b/src/main/thrift/Manager.thrift @@ -89,9 +89,7 @@ service Manager { list list_hostnames() throws(1: GizzardException ex) void mark_shard_busy(1: ShardId id, 2: i32 busy) throws(1: GizzardException ex) - //void copy_shard(1: ShardId source_id, 2: ShardId destination_id) throws(1: GizzardException ex) void repair_shard(1: list ids) throws(1: GizzardException ex) - void diff_shards(1: list ids) throws(1: GizzardException ex) list list_tables() throws(1: GizzardException ex) diff --git a/src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala b/src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala index ca94d0da..f010b5b5 100644 --- a/src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala +++ b/src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala @@ -25,7 +25,9 @@ trait IntegrationSpecification extends Specification { trait TestServerFacts { def enum: Int; def nsDatabaseName: String; def databaseName: String def basePort: Int; def injectorPort: Int; def managerPort: Int - def sqlShardInfo: shards.ShardInfo; def forwarding: nameserver.Forwarding + def sqlShardInfos: Seq[shards.ShardInfo] + def replicatingShardInfo: shards.ShardInfo + def forwarding: nameserver.Forwarding def kestrelQueues: Seq[String] } @@ -39,9 +41,17 @@ trait IntegrationSpecification extends Specification { val basePort = port val injectorPort = port + 1 val managerPort = port + 2 - val sqlShardInfo = shards.ShardInfo(shards.ShardId("localhost", "t0_0"), - "TestShard", "int", "int", shards.Busy.Normal) - val forwarding = nameserver.Forwarding(0, 0, sqlShardInfo.id) + val sqlShardInfos = shards.ShardInfo(shards.ShardId("localhost", "t0_1"), + "TestShard", "int", "int", shards.Busy.Normal) :: + shards.ShardInfo(shards.ShardId("localhost", "t0_2"), + "TestShard", "int", "int", shards.Busy.Normal) :: + shards.ShardInfo(shards.ShardId("localhost", "t0_3"), + "TestShard", "int", "int", shards.Busy.Normal) :: Nil + + val replicatingShardInfo = shards.ShardInfo(shards.ShardId("localhost", "replicating_0"), + "com.twitter.gizzard.shards.ReplicatingShard", "", "", shards.Busy.Normal) + + val forwarding = nameserver.Forwarding(0, 0, replicatingShardInfo.id) val kestrelQueues = Seq("gizzard_test_"+name+"_high_queue", "gizzard_test_"+name+"_high_queue_errors", "gizzard_test_"+name+"_low_queue", @@ -85,7 +95,9 @@ trait IntegrationSpecification extends Specification { createTestServerDBs(s) s.nameServer.rebuildSchema() s.nameServer.setForwarding(s.forwarding) - s.nameServer.createShard(s.sqlShardInfo) + s.nameServer.createShard(s.replicatingShardInfo) + s.sqlShardInfos.foreach(s.nameServer.createShard(_)) + s.sqlShardInfos.foreach(info => s.nameServer.addLink(s.replicatingShardInfo.id, info.id, 1)) s.nameServer.reload() } } diff --git a/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala b/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala new file mode 100644 index 00000000..8cd880e2 --- /dev/null +++ b/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala @@ -0,0 +1,111 @@ +package com.twitter.gizzard +package integration + +import scala.collection.JavaConversions._ +import com.twitter.gizzard.thrift.conversions.Sequences._ +import testserver.thrift.TestResult + +class RepairSpec extends IntegrationSpecification with ConfiguredSpecification { + "Repair" should { + val servers = List(1, 2, 3).map(testServer) + val clients = servers.map(testServerClient) + + val server1 :: server2 :: server3 :: _ = servers + val client1 :: client2 :: client3 :: _ = clients + + val hostFor1 :: hostFor2 :: hostFor3 :: _ = List(server1, server2, server3).map { s => + nameserver.Host("localhost", s.injectorPort, "c" + s.enum, nameserver.HostStatus.Normal) + } + + doBefore { + resetTestServerDBs(servers: _*) + setupServers(servers: _*) + List(server1, server2).foreach(_.nameServer.addRemoteHost(hostFor3)) + List(server1, server3).foreach(_.nameServer.addRemoteHost(hostFor2)) + List(server2, server3).foreach(_.nameServer.addRemoteHost(hostFor1)) + + servers.foreach(_.nameServer.reload()) + } + + doAfter { stopServers(servers: _*) } + + "differing shards should become the same" in { + startServers(servers: _*) + val shard1id :: shard2id :: shard3id :: _ = server1.sqlShardInfos.map(_.id) + val shard1 :: shard2 :: shard3 :: _ = server1.sqlShardInfos.map(s => server1.nameServer.findShardById(s.id, 0)) + shard1.put(1, "hi") + shard2.put(2, "hi") + shard3.put(2, "hi there") + shard3.put(3, "one") + shard1.put(4, "this") + shard1.put(5, "is") + shard1.put(6, "bulk") + //server2.testService.put(1, "hi") + + val list = new java.util.ArrayList[com.twitter.gizzard.thrift.ShardId] + list.add(new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix)) + list.add(new com.twitter.gizzard.thrift.ShardId(shard2id.hostname, shard2id.tablePrefix)) + list.add(new com.twitter.gizzard.thrift.ShardId(shard3id.hostname, shard3id.tablePrefix)) + server1.managerServer.repair_shard(list) + + def listElemenets(list: Seq[com.twitter.gizzard.testserver.TestResult]) = { + list.map((e) => (e.id, e.value)) + } + + listElemenets(shard1.getAll(0, 100)._1) must eventually( + verify(s => s sameElements listElemenets(shard2.getAll(0, 100)._1))) + listElemenets(shard1.getAll(0, 100)._1) must eventually( + verify(s => s sameElements listElemenets(shard3.getAll(0, 100)._1))) + listElemenets(shard2.getAll(0, 100)._1) must eventually( + verify(s => s sameElements listElemenets(shard3.getAll(0, 100)._1))) + } + + //val replicatingShardId = ShardId("localhost", "replicating_forward_1") + //val (shard1id, shard2id, shard3id) = (ShardId("localhost", "forward_1_1"), ShardId("localhost", "forward_2_1"), ShardId("localhost", "forward_3_1")) + //lazy val shard1 = nameServer.findShardById(shard1id) + //lazy val shard2 = nameServer.findShardById(shard2id) + //lazy val shard3 = nameServer.findShardById(shard3id) + // + //"differing shards should become the same" in { + // shard1.add(1L, 2L, 1L, Time.now) // same + // shard2.add(1L, 2L, 1L, Time.now) + // + // shard1.archive(2L, 1L, 2L, Time.now) // one archived, one normal + // shard2.add(2L, 1L, 2L, Time.now) + // shard3.add(2L, 1L, 2L, Time.now) + // + // shard1.add(1L, 3L, 3L, Time.now) // only on one shard + // shard3.archive(1L, 3L, 3L, Time.now) + // + // shard2.add(1L, 4L, 4L, Time.now) // only on two shard + // + // shard3.negate(3L, 1L, 5L, Time.now) // only on two shard + // + // // bulk + // shard1.add(5L, 2L, 1L, Time.now) // same + // shard1.add(6L, 2L, 1L, Time.now) // same + // shard1.add(7L, 2L, 1L, Time.now) // same + // shard1.add(8L, 2L, 1L, Time.now) // same + // shard1.add(9L, 2L, 1L, Time.now) // same + // shard1.add(10L, 2L, 1L, Time.now) // same + // + // + // + // val list = new java.util.ArrayList[com.twitter.gizzard.thrift.ShardId] + // list.add(new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix)) + // list.add(new com.twitter.gizzard.thrift.ShardId(shard2id.hostname, shard2id.tablePrefix)) + // list.add(new com.twitter.gizzard.thrift.ShardId(shard3id.hostname, shard3id.tablePrefix)) + // manager.repair_shard(list) + // def listElemenets(list: Seq[Edge]) = { + // list.map((e) => (e.sourceId, e.destinationId, e.state)) + // } + // + // listElemenets(shard1.selectAll(EdgeCursor.Start, Repair.COUNT)._1) must eventually( + // verify(s => s sameElements listElemenets(shard2.selectAll(EdgeCursor.Start, Repair.COUNT)._1))) + // listElemenets(shard1.selectAll(EdgeCursor.Start, Repair.COUNT)._1) must eventually( + // verify(s => s sameElements listElemenets(shard3.selectAll(EdgeCursor.Start, Repair.COUNT)._1))) + // listElemenets(shard2.selectAll(EdgeCursor.Start, Repair.COUNT)._1) must eventually( + // verify(s => s sameElements listElemenets(shard3.selectAll(EdgeCursor.Start, Repair.COUNT)._1))) + //} + } +} diff --git a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala index 5c70fb77..1488d94a 100644 --- a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala +++ b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala @@ -74,7 +74,7 @@ package object config { Priority.Low.id -> new TestJobScheduler { val name = queueBase+"_low" } ) - def repairPriority = Priority.Low.id + def repairPriority = Priority.High.id jobInjector.port = iPort manager.port = mPort @@ -98,14 +98,13 @@ class TestServer(conf: config.TestServer) extends GizzardServer[TestShard](conf) val jobPriorities = List(Priority.High.id, Priority.Low.id) //val copyPriority = Priority.Low.id - def repairPriority = Priority.Low.id - //val repairFactory = new TestRepairFactory(nameServer, jobScheduler(Priority.Low.id)) + def repairPriority = Priority.High.id + val repairFactory = new TestRepairFactory(nameServer, jobScheduler) shardRepo += ("TestShard" -> new SqlShardFactory(conf.queryEvaluator(), conf.databaseConnection)) + jobCodec += ("Repair".r -> new TestRepairParser(nameServer, jobScheduler)) jobCodec += ("Put".r -> new PutParser(nameServer.findCurrentForwarding(0, _))) - //jobCodec += ("Copy".r -> new TestCopyParser(nameServer, jobScheduler(Priority.Low.id))) - // service listener @@ -149,24 +148,23 @@ extends thrift.TestServer.Iface { case class TestResult(id: Int, value: String, count: Int) extends Repairable[TestResult] { def similar(other: TestResult) = { - id.compare(other.id) match { - case x if x != 0 => x - case _ => value.compare(other.value) - } + id.compare(other.id) } def shouldRepair(other: TestResult) = { - similar(other) == 0 && count != other.count + similar(other) == 0 && value != other.value } } object TestCursor { - val Start = new TestCursor(-1) - val End = new TestCursor(0) + val StartPosition = -1 + val EndPosition = 0 + val Start = new TestCursor(StartPosition) + val End = new TestCursor(EndPosition) } case class TestCursor(position: Int) extends Cursorable[TestCursor] { - def atStart = position == -1 - def atEnd = position == 0 + def atStart = position == TestCursor.StartPosition + def atEnd = position == TestCursor.EndPosition def compare(other: TestCursor) = { (atEnd, other.atEnd) match { case (true, true) => 0 @@ -233,6 +231,7 @@ extends TestShard { private def asResult(r: ResultSet) = new TestResult(r.getInt("id"), r.getString("value"), r.getInt("count")) + override def toString = shardInfo.toString def put(key: Int, value: String) { evaluator.execute(putSql, key, value) } def putAll(kvs: Seq[(Int, String)]) { evaluator.executeBatch(putSql) { b => for ((k,v) <- kvs) b(k,v) } @@ -273,19 +272,21 @@ extends RepairJobFactory[TestShard] { } class TestRepair(shardIds: Seq[ShardId], cursor: TestCursor, count: Int, - nameServer: NameServer[TestShard], scheduler: PrioritizingJobScheduler) extends MultiShardRepair[TestShard, TestResult, TestCursor](shardIds, cursor, count, nameServer, scheduler, Priority.Low.id) { + nameServer: NameServer[TestShard], scheduler: PrioritizingJobScheduler) extends MultiShardRepair[TestShard, TestResult, TestCursor](shardIds, cursor, count, nameServer, scheduler, Priority.High.id) { def select(shard: TestShard, cursor: TestCursor, count: Int) = shard.getAll(cursor, count) def scheduleBulk(otherShards: Seq[TestShard], items: Seq[TestResult]) = { otherShards.foreach(_.putAll(items.map{i => (i.id, i.value)})) } def scheduleItem(missing: Boolean, list: (TestShard, ListBuffer[TestResult], TestCursor), tableId: Int, item: TestResult) = { - scheduler.put(Priority.Low.id, new PutJob(item.id, item.value, nameServer.findCurrentForwarding(0, _))) + scheduler.put(Priority.High.id, new PutJob(item.id, item.value, nameServer.findCurrentForwarding(0, _))) } def nextRepair(lowestCursor: TestCursor) = { if (lowestCursor.atEnd) None else Some(new TestRepair(shardIds, lowestCursor, count, nameServer, scheduler)) } - def serialize = Map("cursor" -> cursor.position) + def serialize = { + Map("cursor" -> cursor.position) + } } class TestRepairParser(ns: NameServer[TestShard], s: PrioritizingJobScheduler) diff --git a/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala b/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala deleted file mode 100644 index e97fefb8..00000000 --- a/src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala +++ /dev/null @@ -1,182 +0,0 @@ -//package com.twitter.gizzard -//package scheduler -// -//import com.twitter.util.TimeConversions._ -//import org.specs.Specification -//import org.specs.mock.{ClassMocker, JMocker} -// -// -//class FakeCopy(val sourceShardId: shards.ShardId, val destinationShardId: shards.ShardId, count: Int, -// nameServer: nameserver.NameServer[shards.Shard], scheduler: JobScheduler)(nextCopy: => Option[FakeCopy]) -// extends CopyJob[shards.Shard](sourceShardId, destinationShardId, count, nameServer, scheduler) { -// def serialize = Map("cursor" -> 1) -// -// @throws(classOf[Exception]) -// def copyPage(sourceShard: shards.Shard, destinationShard: shards.Shard, count: Int) = nextCopy -// -// override def equals(that: Any) = that match { -// case that: FakeCopy => -// this.sourceShardId == that.sourceShardId && -// this.destinationShardId == that.destinationShardId -// case _ => false -// } -//} -// -//object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker { -// "CopyJob" should { -// val sourceShardId = shards.ShardId("testhost", "1") -// val destinationShardId = shards.ShardId("testhost", "2") -// val destinationShardInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Normal) -// val count = CopyJob.MIN_COPY + 1 -// val nextCopy = mock[FakeCopy] -// val nameServer = mock[nameserver.NameServer[shards.Shard]] -// val jobScheduler = mock[JobScheduler] -// val makeCopy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(_) -// val shard1 = mock[shards.Shard] -// val shard2 = mock[shards.Shard] -// -// "toMap" in { -// val copy = makeCopy(Some(nextCopy)) -// copy.toMap mustEqual Map( -// "source_shard_hostname" -> sourceShardId.hostname, -// "source_shard_table_prefix" -> sourceShardId.tablePrefix, -// "destination_shard_hostname" -> destinationShardId.hostname, -// "destination_shard_table_prefix" -> destinationShardId.tablePrefix, -// "count" -> count -// ) ++ copy.serialize -// } -// -// "toJson" in { -// val copy = makeCopy(Some(nextCopy)) -// val json = copy.toJson -// json mustMatch "Copy" -// json mustMatch "\"source_shard_hostname\":\"%s\"".format(sourceShardId.hostname) -// json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(sourceShardId.tablePrefix) -// json mustMatch "\"destination_shard_hostname\":\"%s\"".format(destinationShardId.hostname) -// json mustMatch "\"destination_shard_table_prefix\":\"%s\"".format(destinationShardId.tablePrefix) -// json mustMatch "\"count\":" + count -// } -// -// "apply" in { -// "normally" in { -// val copy = makeCopy(Some(nextCopy)) -// expect { -// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo -// one(nameServer).findShardById(sourceShardId) willReturn shard1 -// one(nameServer).findShardById(destinationShardId) willReturn shard2 -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) -// } -// -// copy.apply() -// } -// -// "no source shard" in { -// val copy = makeCopy(Some(nextCopy)) -// expect { -// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo -// one(nameServer).findShardById(sourceShardId) willThrow new nameserver.NonExistentShard("foo") -// } -// -// copy.apply() -// } -// -// "no destination shard" in { -// val copy = makeCopy(Some(nextCopy)) -// expect { -// one(nameServer).getShard(destinationShardId) willThrow new nameserver.NonExistentShard("foo") -// } -// -// copy.apply() -// } -// -// "with a database connection timeout" in { -// val copy = makeCopy(throw new shards.ShardDatabaseTimeoutException(100.milliseconds, sourceShardId)) -// expect { -// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo -// one(nameServer).findShardById(sourceShardId) willReturn shard1 -// one(nameServer).findShardById(destinationShardId) willReturn shard2 -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) -// one(jobScheduler).put(copy) -// } -// -// copy.apply() -// copy.toMap("count") mustEqual (count * 0.9).toInt -// } -// -// "with a random exception" in { -// val copy = makeCopy(throw new Exception("boo")) -// expect { -// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo -// one(nameServer).findShardById(sourceShardId) willReturn shard1 -// one(nameServer).findShardById(destinationShardId) willReturn shard2 -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) -// never(jobScheduler).put(nextCopy) -// } -// -// copy.apply() must throwA[Exception] -// } -// -// "with a shard timeout" in { -// "early on" in { -// val copy = makeCopy(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) -// expect { -// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo -// one(nameServer).findShardById(sourceShardId) willReturn shard1 -// one(nameServer).findShardById(destinationShardId) willReturn shard2 -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) -// one(jobScheduler).put(copy) -// } -// -// copy.apply() -// } -// -// "after too many retries" in { -// val count = CopyJob.MIN_COPY - 1 -// val copy = new FakeCopy(sourceShardId, destinationShardId, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId)) -// -// expect { -// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo -// one(nameServer).findShardById(sourceShardId) willReturn shard1 -// one(nameServer).findShardById(destinationShardId) willReturn shard2 -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Error) -// never(jobScheduler).put(nextCopy) -// } -// -// copy.apply() must throwA[Exception] -// } -// } -// -// "when cancelled" in { -// val copy = makeCopy(Some(nextCopy)) -// val cancelledInfo = shards.ShardInfo(destinationShardId, "FakeShard", "", "", shards.Busy.Cancelled) -// -// expect { -// one(nameServer).getShard(destinationShardId) willReturn cancelledInfo -// never(nameServer).findShardById(sourceShardId) -// never(nameServer).findShardById(destinationShardId) -// never(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) -// never(jobScheduler).put(nextCopy) -// } -// -// copy.apply() -// } -// -// "when finished" in { -// val copy = makeCopy(None) -// -// expect { -// one(nameServer).getShard(destinationShardId) willReturn destinationShardInfo -// one(nameServer).findShardById(sourceShardId) willReturn shard1 -// one(nameServer).findShardById(destinationShardId) willReturn shard2 -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy) -// one(nameServer).markShardBusy(destinationShardId, shards.Busy.Normal) -// } -// -// copy.apply() -// } -// } -// } -//} -// \ No newline at end of file diff --git a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala index 3a13215e..2098608f 100644 --- a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala +++ b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala @@ -17,7 +17,7 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas //val copier = mock[CopyJobFactory[Shard]] val scheduler = mock[PrioritizingJobScheduler] val subScheduler = mock[JobScheduler] - val manager = new ManagerService(nameServer, scheduler, null, 0, null) + val manager = new ManagerService(nameServer, scheduler, null, 0) val shard = mock[Shard] val thriftShardInfo1 = new thrift.ShardInfo(new thrift.ShardId("hostname", "table_prefix"), From 18a5578950ef39c9315997f3f2540eb082e35a8b Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Fri, 22 Apr 2011 13:44:12 -0700 Subject: [PATCH 03/11] removed old job --- .../twitter/gizzard/scheduler/CopyJob.scala | 130 ------------------ 1 file changed, 130 deletions(-) delete mode 100644 src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala diff --git a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala deleted file mode 100644 index 06d1d6dd..00000000 --- a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala +++ /dev/null @@ -1,130 +0,0 @@ -//package com.twitter.gizzard -//package scheduler -// -//import com.twitter.ostrich.Stats -//import com.twitter.util.TimeConversions._ -//import net.lag.logging.Logger -//import nameserver.{NameServer, NonExistentShard} -//import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException} -// -//object CopyJob { -// val MIN_COPY = 500 -//} -// -///** -// * A factory for creating a new copy job (with default count and a starting cursor) from a source -// * and destination shard ID. -// */ -//trait CopyJobFactory[S <: Shard] extends ((ShardId, ShardId) => CopyJob[S]) -// -///** -// * A parser that creates a copy job out of json. The basic attributes (source shard ID, destination) -// * shard ID, and count) are parsed out first, and the remaining attributes are passed to -// * 'deserialize' to decode any shard-specific data (like a cursor). -// */ -//trait CopyJobParser[S <: Shard] extends JsonJobParser { -// def deserialize(attributes: Map[String, Any], sourceId: ShardId, -// destinationId: ShardId, count: Int): CopyJob[S] -// -// def apply(attributes: Map[String, Any]): JsonJob = { -// deserialize(attributes, -// ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString), -// ShardId(attributes("destination_shard_hostname").toString, attributes("destination_shard_table_prefix").toString), -// attributes("count").asInstanceOf[{def toInt: Int}].toInt) -// } -//} -// -///** -// * A json-encodable job that represents the state of a copy from one shard to another. -// * -// * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items. -// * Other shard-specific data (like the cursor) can be encoded in 'serialize'. -// * -// * 'copyPage' is called to do the actual data copying. It should return a new CopyJob representing -// * the next chunk of work to do, or None if the entire copying job is complete. -// */ -//abstract case class CopyJob[S <: Shard](sourceId: ShardId, -// destinationId: ShardId, -// var count: Int, -// nameServer: NameServer[S], -// scheduler: JobScheduler) -// extends JsonJob { -// private val log = Logger.get(getClass.getName) -// -// override def shouldReplicate = false -// -// def toMap = { -// Map("source_shard_hostname" -> sourceId.hostname, -// "source_shard_table_prefix" -> sourceId.tablePrefix, -// "destination_shard_hostname" -> destinationId.hostname, -// "destination_shard_table_prefix" -> destinationId.tablePrefix, -// "count" -> count -// ) ++ serialize -// } -// -// def finish() { -// nameServer.markShardBusy(destinationId, shards.Busy.Normal) -// log.info("Copying finished for (type %s) from %s to %s", -// getClass.getName.split("\\.").last, sourceId, destinationId) -// Stats.clearGauge(gaugeName) -// } -// -// def apply() { -// try { -// if (nameServer.getShard(destinationId).busy == shards.Busy.Cancelled) { -// log.info("Copying cancelled for (type %s) from %s to %s", -// getClass.getName.split("\\.").last, sourceId, destinationId) -// Stats.clearGauge(gaugeName) -// -// } else { -// -// val sourceShard = nameServer.findShardById(sourceId) -// val destinationShard = nameServer.findShardById(destinationId) -// -// log.info("Copying shard block (type %s) from %s to %s: state=%s", -// getClass.getName.split("\\.").last, sourceId, destinationId, toMap) -// // do this on each iteration, so it happens in the queue and can be retried if the db is busy: -// nameServer.markShardBusy(destinationId, shards.Busy.Busy) -// -// this.nextJob = copyPage(sourceShard, destinationShard, count) -// this.nextJob match { -// case None => finish() -// case _ => incrGauge -// } -// } -// } catch { -// case e: NonExistentShard => -// log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.") -// case e: ShardTimeoutException => -// if (count > CopyJob.MIN_COPY) { -// log.warning("Shard block copy timed out; trying a smaller block size.") -// count = (count * 0.9).toInt -// scheduler.put(this) -// } else { -// log.error("Shard block copy timed out on minimum block size.") -// nameServer.markShardBusy(destinationId, shards.Busy.Error) -// throw e -// } -// case e: ShardDatabaseTimeoutException => -// log.warning("Shard block copy failed to get a database connection; retrying.") -// scheduler.put(this) -// case e: Throwable => -// log.error(e, "Shard block copy stopped due to exception: %s", e) -// nameServer.markShardBusy(destinationId, shards.Busy.Error) -// throw e -// } -// } -// -// private def incrGauge = { -// Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count) -// } -// -// private def gaugeName = { -// "x-copying-" + sourceId + "-" + destinationId -// } -// -// def copyPage(sourceShard: S, destinationShard: S, count: Int): Option[CopyJob[S]] -// -// def serialize: Map[String, Any] -//} -// \ No newline at end of file From a7ca1957b070a56fb49d60d6b606e0d96ffba4b1 Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Fri, 22 Apr 2011 13:45:38 -0700 Subject: [PATCH 04/11] cleanup spec --- .../gizzard/integration/RepairSpec.scala | 49 ------------------- 1 file changed, 49 deletions(-) diff --git a/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala b/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala index 8cd880e2..7f7308c6 100644 --- a/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala +++ b/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala @@ -40,7 +40,6 @@ class RepairSpec extends IntegrationSpecification with ConfiguredSpecification { shard1.put(4, "this") shard1.put(5, "is") shard1.put(6, "bulk") - //server2.testService.put(1, "hi") val list = new java.util.ArrayList[com.twitter.gizzard.thrift.ShardId] list.add(new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix)) @@ -59,53 +58,5 @@ class RepairSpec extends IntegrationSpecification with ConfiguredSpecification { listElemenets(shard2.getAll(0, 100)._1) must eventually( verify(s => s sameElements listElemenets(shard3.getAll(0, 100)._1))) } - - //val replicatingShardId = ShardId("localhost", "replicating_forward_1") - //val (shard1id, shard2id, shard3id) = (ShardId("localhost", "forward_1_1"), ShardId("localhost", "forward_2_1"), ShardId("localhost", "forward_3_1")) - //lazy val shard1 = nameServer.findShardById(shard1id) - //lazy val shard2 = nameServer.findShardById(shard2id) - //lazy val shard3 = nameServer.findShardById(shard3id) - // - //"differing shards should become the same" in { - // shard1.add(1L, 2L, 1L, Time.now) // same - // shard2.add(1L, 2L, 1L, Time.now) - // - // shard1.archive(2L, 1L, 2L, Time.now) // one archived, one normal - // shard2.add(2L, 1L, 2L, Time.now) - // shard3.add(2L, 1L, 2L, Time.now) - // - // shard1.add(1L, 3L, 3L, Time.now) // only on one shard - // shard3.archive(1L, 3L, 3L, Time.now) - // - // shard2.add(1L, 4L, 4L, Time.now) // only on two shard - // - // shard3.negate(3L, 1L, 5L, Time.now) // only on two shard - // - // // bulk - // shard1.add(5L, 2L, 1L, Time.now) // same - // shard1.add(6L, 2L, 1L, Time.now) // same - // shard1.add(7L, 2L, 1L, Time.now) // same - // shard1.add(8L, 2L, 1L, Time.now) // same - // shard1.add(9L, 2L, 1L, Time.now) // same - // shard1.add(10L, 2L, 1L, Time.now) // same - // - // - // - // val list = new java.util.ArrayList[com.twitter.gizzard.thrift.ShardId] - // list.add(new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix)) - // list.add(new com.twitter.gizzard.thrift.ShardId(shard2id.hostname, shard2id.tablePrefix)) - // list.add(new com.twitter.gizzard.thrift.ShardId(shard3id.hostname, shard3id.tablePrefix)) - // manager.repair_shard(list) - // def listElemenets(list: Seq[Edge]) = { - // list.map((e) => (e.sourceId, e.destinationId, e.state)) - // } - // - // listElemenets(shard1.selectAll(EdgeCursor.Start, Repair.COUNT)._1) must eventually( - // verify(s => s sameElements listElemenets(shard2.selectAll(EdgeCursor.Start, Repair.COUNT)._1))) - // listElemenets(shard1.selectAll(EdgeCursor.Start, Repair.COUNT)._1) must eventually( - // verify(s => s sameElements listElemenets(shard3.selectAll(EdgeCursor.Start, Repair.COUNT)._1))) - // listElemenets(shard2.selectAll(EdgeCursor.Start, Repair.COUNT)._1) must eventually( - // verify(s => s sameElements listElemenets(shard3.selectAll(EdgeCursor.Start, Repair.COUNT)._1))) - //} } } From f170e9001452e6d4ac197facf9f2ac6d5c7595be Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Fri, 22 Apr 2011 13:46:06 -0700 Subject: [PATCH 05/11] removed comment --- src/test/scala/com/twitter/gizzard/integration/TestServer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala index 1488d94a..dc3047f2 100644 --- a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala +++ b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala @@ -96,7 +96,6 @@ class TestServer(conf: config.TestServer) extends GizzardServer[TestShard](conf) val readWriteShardAdapter = new TestReadWriteAdapter(_) val jobPriorities = List(Priority.High.id, Priority.Low.id) - //val copyPriority = Priority.Low.id def repairPriority = Priority.High.id val repairFactory = new TestRepairFactory(nameServer, jobScheduler) From 3671327f74f68a2f171b89d6cfc211ceccebd087 Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Fri, 22 Apr 2011 13:46:32 -0700 Subject: [PATCH 06/11] removed TestCopy --- .../gizzard/integration/TestServer.scala | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala index dc3047f2..b580199b 100644 --- a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala +++ b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala @@ -295,51 +295,3 @@ extends RepairJobParser[TestShard] { new TestRepair(shardIds, cursor, count, ns, s) } } -// -//class TestCopy(srcId: ShardId, destId: ShardId, cursor: Int, count: Int, -// ns: NameServer[TestShard], s: JobScheduler) -//extends CopyJob[TestShard](srcId, destId, count, ns, s) { -// def copyPage(src: TestShard, dest: TestShard, count: Int) = { -// val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) } -// -// if (rows.isEmpty) { -// None -// } else { -// dest.putAll(rows) -// Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s)) -// } -// } -// -// def serialize = Map("cursor" -> cursor) -//} - -//class TestCopyFactory(ns: NameServer[TestShard], s: JobScheduler) -//extends CopyJobFactory[TestShard] { -// def apply(src: ShardId, dest: ShardId) = new TestCopy(src, dest, 0, 500, ns, s) -//} - -//class TestCopyParser(ns: NameServer[TestShard], s: JobScheduler) -//extends CopyJobParser[TestShard] { -// def deserialize(m: Map[String, Any], src: ShardId, dest: ShardId, count: Int) = { -// val cursor = m("cursor").asInstanceOf[Int] -// val count = m("count").asInstanceOf[Int] -// new TestCopy(src, dest, cursor, count, ns, s) -// } -//} - -//class TestCopy(srcId: ShardId, destId: ShardId, cursor: Int, count: Int, -// ns: NameServer[TestShard], s: JobScheduler) -//extends CopyJob[TestShard](srcId, destId, count, ns, s) { -// def copyPage(src: TestShard, dest: TestShard, count: Int) = { -// val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) } -// -// if (rows.isEmpty) { -// None -// } else { -// dest.putAll(rows) -// Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s)) -// } -// } -// -// def serialize = Map("cursor" -> cursor) -//+++++++++++++++++++++++++++++++++++767766} From c51c96527f8662e8d198927f32ec78e8f44c3826 Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Fri, 22 Apr 2011 13:47:43 -0700 Subject: [PATCH 07/11] removed comment --- src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala index 64fee2a2..cbcc0b70 100644 --- a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala +++ b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala @@ -99,9 +99,6 @@ class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: Pr def mark_shard_busy(id: ShardId, busy: Int) = { wrapEx(nameServer.markShardBusy(id.fromThrift, busy.fromThrift)) } - //def copy_shard(sourceId: ShardId, destinationId: ShardId) = { - // wrapEx(copyScheduler.put(copier(sourceId.fromThrift, destinationId.fromThrift))) - //} def list_tables(): JList[java.lang.Integer] = wrapEx(nameServer.listTables) From a8563d377a9895ca0a1c7f108cdf7f6e2b866954 Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Fri, 22 Apr 2011 13:49:33 -0700 Subject: [PATCH 08/11] removed old spec --- .../gizzard/thrift/ShardManagerServiceSpec.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala index 2098608f..2581038e 100644 --- a/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala +++ b/src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala @@ -108,19 +108,6 @@ object ManagerServiceSpec extends ConfiguredSpecification with JMocker with Clas manager.mark_shard_busy(thriftShardInfo1.id, Busy.Busy.id) } - //"copy_shard" in { - // val shardId1 = new shards.ShardId("hostname1", "table1") - // val shardId2 = new shards.ShardId("hostname2", "table2") - // val copyJob = mock[CopyJob[Shard]] - // - // expect { - // one(copier).apply(shardId1, shardId2) willReturn copyJob - // one(copyScheduler).put(copyJob) - // } - // - // manager.copy_shard(shardId1.toThrift, shardId2.toThrift) - //} - "set_forwarding" in { expect { one(nameServer).setForwarding(forwarding) From 78113de94f4c14ea9653fc5ae987e1e3bea2048b Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Tue, 10 May 2011 18:31:25 -0700 Subject: [PATCH 09/11] repair -> copy --- .../com/twitter/gizzard/GizzardServer.scala | 10 ++-- .../{RepairJob.scala => CopyJob.scala} | 58 +++++++++---------- .../gizzard/thrift/ManagerService.scala | 8 +-- src/main/thrift/Manager.thrift | 2 +- 4 files changed, 39 insertions(+), 39 deletions(-) rename src/main/scala/com/twitter/gizzard/scheduler/{RepairJob.scala => CopyJob.scala} (73%) diff --git a/src/main/scala/com/twitter/gizzard/GizzardServer.scala b/src/main/scala/com/twitter/gizzard/GizzardServer.scala index 07fd7a66..6d912143 100644 --- a/src/main/scala/com/twitter/gizzard/GizzardServer.scala +++ b/src/main/scala/com/twitter/gizzard/GizzardServer.scala @@ -4,7 +4,7 @@ import com.twitter.util.Duration import com.twitter.util.TimeConversions._ import net.lag.logging.Logger import nameserver.{NameServer, BasicShardRepository} -import scheduler.{JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec, RepairJobFactory} +import scheduler.{JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec, CopyJobFactory} import shards.{Shard, ReadWriteShard} import config.{GizzardServer => ServerConfig} @@ -12,9 +12,9 @@ import config.{GizzardServer => ServerConfig} abstract class GizzardServer[S <: Shard](config: ServerConfig) { def readWriteShardAdapter: ReadWriteShard[S] => S - def repairFactory: RepairJobFactory[S] + def copyFactory: CopyJobFactory[S] def jobPriorities: Seq[Int] - def repairPriority: Int + def copyPriority: Int def start(): Unit def shutdown(quiesce: Boolean): Unit def shutdown() { shutdown(false) } @@ -47,8 +47,8 @@ abstract class GizzardServer[S <: Shard](config: ServerConfig) { lazy val managerServer = new thrift.ManagerService( nameServer, jobScheduler, - repairFactory, - repairPriority) + copyFactory, + copyPriority) lazy val managerThriftServer = config.manager(new thrift.Manager.Processor(managerServer)) diff --git a/src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala similarity index 73% rename from src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala rename to src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala index 4744be72..33da87ad 100644 --- a/src/main/scala/com/twitter/gizzard/scheduler/RepairJob.scala +++ b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala @@ -8,28 +8,28 @@ import nameserver.{NameServer, NonExistentShard} import collection.mutable.ListBuffer import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException, Cursorable} -trait Repairable[T] { +trait Entity[T] { def similar(other: T): Int - def shouldRepair(other: T): Boolean + def isSchedulable(other: T): Boolean } -object RepairJob { +object CopyJob { val MIN_COPY = 500 } /** - * A factory for creating a new repair job (with default count and a starting cursor) from a source + * A factory for creating a new copy job (with default count and a starting cursor) from a source * and destination shard ID. */ -trait RepairJobFactory[S <: Shard] extends (Seq[ShardId] => RepairJob[S]) +trait CopyJobFactory[S <: Shard] extends (Seq[ShardId] => CopyJob[S]) /** - * A parser that creates a repair job out of json. The basic attributes (source shard ID, destination) + * A parser that creates a copy job out of json. The basic attributes (source shard ID, destination) * shard ID, count) are parsed out first, and the remaining attributes are passed to * 'deserialize' to decode any shard-specific data (like a cursor). */ -trait RepairJobParser[S <: Shard] extends JsonJobParser { - def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int): RepairJob[S] +trait CopyJobParser[S <: Shard] extends JsonJobParser { + def deserialize(attributes: Map[String, Any], shardIds: Seq[ShardId], count: Int): CopyJob[S] def apply(attributes: Map[String, Any]): JsonJob = { deserialize(attributes, @@ -40,15 +40,15 @@ trait RepairJobParser[S <: Shard] extends JsonJobParser { } /** - * A json-encodable job that represents the state of a repair one a shard. + * A json-encodable job that represents the state of a copy one a shard. * * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items. * Other shard-specific data (like the cursor) can be encoded in 'serialize'. * - * 'repair' is called to do the actual data repair. It should return a new Some[RepairJob] representing + * 'copy' is called to do the actual data copy. It should return a new Some[CopyJob] representing * the next chunk of work to do, or None if the entire copying job is complete. */ -abstract case class RepairJob[S <: Shard](shardIds: Seq[ShardId], +abstract case class CopyJob[S <: Shard](shardIds: Seq[ShardId], var count: Int, nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, @@ -58,34 +58,34 @@ abstract case class RepairJob[S <: Shard](shardIds: Seq[ShardId], override def shouldReplicate = false def finish() { - log.info("[Repair] - finished for (type %s) for %s", + log.info("[Copy] - finished for (type %s) for %s", getClass.getName.split("\\.").last, shardIds.mkString(", ")) Stats.clearGauge(gaugeName) } def apply() { try { - log.info("[Repair] - shard block (type %s): state=%s", + log.info("[Copy] - shard block (type %s): state=%s", getClass.getName.split("\\.").last, toMap) val shardObjs = shardIds.map(nameServer.findShardById(_)) shardIds.foreach(nameServer.markShardBusy(_, shards.Busy.Busy)) - repair(shardObjs) + copy(shardObjs) this.nextJob match { case None => shardIds.foreach(nameServer.markShardBusy(_, shards.Busy.Normal)) case _ => } } catch { case e: NonExistentShard => - log.error("[Repair] - failed because one of the shards doesn't exist. Terminating the repair.") + log.error("[Copy] - failed because one of the shards doesn't exist. Terminating the copy.") case e: ShardDatabaseTimeoutException => - log.warning("[Repair] - failed to get a database connection; retrying.") + log.warning("[Copy] - failed to get a database connection; retrying.") scheduler.put(priority, this) - case e: ShardTimeoutException if (count > RepairJob.MIN_COPY) => - log.warning("[Repair] - block copy timed out; trying a smaller block size.") + case e: ShardTimeoutException if (count > CopyJob.MIN_COPY) => + log.warning("[Copy] - block copy timed out; trying a smaller block size.") count = (count * 0.9).toInt scheduler.put(priority, this) case e: Throwable => - log.warning(e, "[Repair] - stopped due to exception: %s", e) + log.warning(e, "[Copy] - stopped due to exception: %s", e) throw e } } @@ -101,20 +101,20 @@ abstract case class RepairJob[S <: Shard](shardIds: Seq[ShardId], } private def gaugeName = { - "x-repair-" + shardIds.mkString("-") + "x-copy-" + shardIds.mkString("-") } - def repair(shards: Seq[S]) + def copy(shards: Seq[S]) def serialize: Map[String, Any] } -abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Cursorable[C]](shardIds: Seq[ShardId], cursor: C, count: Int, - nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, priority: Int) extends RepairJob(shardIds, count, nameServer, scheduler, priority) { +abstract class MultiShardCopy[S <: Shard, R <: Entity[R], C <: Cursorable[C]](shardIds: Seq[ShardId], cursor: C, count: Int, + nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, priority: Int) extends CopyJob(shardIds, count, nameServer, scheduler, priority) { private val log = Logger.get(getClass.getName) - def nextRepair(lowestCursor: C): Option[RepairJob[S]] + def nextCopy(lowestCursor: C): Option[CopyJob[S]] def scheduleItem(missing: Boolean, list: (S, ListBuffer[R], C), tableId: Int, item: R): Unit @@ -126,7 +126,7 @@ abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Cursorable[ def select(shard: S, cursor: C, count: Int): (Seq[R], C) - def repair(shards: Seq[S]) = { + def copy(shards: Seq[S]) = { val tableIds = shards.map(shard => nameServer.getRootForwardings(shard.shardInfo.id).head.tableId) val listCursors = shards.map( (shard) => { @@ -135,10 +135,10 @@ abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Cursorable[ list ++= seq (shard, list, newCursor) }) - repairListCursor(listCursors, tableIds) + copyListCursor(listCursors, tableIds) } - private def repairListCursor(listCursors: Seq[(S, ListBuffer[R], C)], tableIds: Seq[Int]) = { + private def copyListCursor(listCursors: Seq[(S, ListBuffer[R], C)], tableIds: Seq[Int]) = { if (!tableIds.forall((id) => id == tableIds(0))) { throw new RuntimeException("tableIds didn't match") } else if (nameServer.getCommonShardId(shardIds) == None) { @@ -161,7 +161,7 @@ abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Cursorable[ } for (list <- similarLists) { val similarItem = list._2.remove(0) - if (firstItem.shouldRepair(similarItem)) { + if (firstItem.isSchedulable(similarItem)) { if (!firstEnqueued) { firstEnqueued = true scheduleItem(false, firstList, tableId, firstItem) @@ -172,7 +172,7 @@ abstract class MultiShardRepair[S <: Shard, R <: Repairable[R], C <: Cursorable[ } } val nextCursor = listCursors.map(_._3).reduceLeft((c1, c2) => if (c1.compare(c2) <= 0) c1 else c2) - this.nextJob = nextRepair(nextCursor) + this.nextJob = nextCopy(nextCursor) } } } diff --git a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala index cbcc0b70..43f9b062 100644 --- a/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala +++ b/src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala @@ -11,13 +11,13 @@ import com.twitter.gizzard.thrift.conversions.ShardInfo._ import com.twitter.gizzard.thrift.conversions.Forwarding._ import com.twitter.gizzard.thrift.conversions.Host._ import com.twitter.gizzard.shards._ -import com.twitter.gizzard.scheduler.{JsonJob, JobScheduler, PrioritizingJobScheduler, RepairJobFactory} +import com.twitter.gizzard.scheduler.{JsonJob, JobScheduler, PrioritizingJobScheduler, CopyJobFactory} import com.twitter.gizzard.nameserver._ import net.lag.logging.Logger import java.util.{List => JList} -class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, repairer: RepairJobFactory[S], repairPriority: Int) extends Manager.Iface { +class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, copier: CopyJobFactory[S], copyPriority: Int) extends Manager.Iface { val log = Logger.get(getClass.getName) def wrapEx[A](f: => A): A = try { f } catch { @@ -104,8 +104,8 @@ class ManagerService[S <: shards.Shard](nameServer: NameServer[S], scheduler: Pr def dump_nameserver(tableIds: JList[java.lang.Integer]) = wrapEx(nameServer.dumpStructure(tableIds.toList).map(_.toThrift)) - def repair_shard(shardIds: JList[ShardId]) = { - wrapEx((scheduler.asInstanceOf[PrioritizingJobScheduler]).put(repairPriority, repairer( + def copy_shard(shardIds: JList[ShardId]) = { + wrapEx((scheduler.asInstanceOf[PrioritizingJobScheduler]).put(copyPriority, copier( shardIds.toList.map(_.asInstanceOf[ShardId].fromThrift) ))) } diff --git a/src/main/thrift/Manager.thrift b/src/main/thrift/Manager.thrift index 98ef3a04..44607fe0 100644 --- a/src/main/thrift/Manager.thrift +++ b/src/main/thrift/Manager.thrift @@ -89,7 +89,7 @@ service Manager { list list_hostnames() throws(1: GizzardException ex) void mark_shard_busy(1: ShardId id, 2: i32 busy) throws(1: GizzardException ex) - void repair_shard(1: list ids) throws(1: GizzardException ex) + void copy_shard(1: list ids) throws(1: GizzardException ex) list list_tables() throws(1: GizzardException ex) From a8849d37ced5dd8285ee8c96087f950f769eef0a Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Thu, 12 May 2011 15:19:24 -0700 Subject: [PATCH 10/11] fix specs for copy --- .../{RepairSpec.scala => CopySpec.scala} | 6 +-- .../gizzard/integration/TestServer.scala | 38 +++++++++---------- 2 files changed, 20 insertions(+), 24 deletions(-) rename src/test/scala/com/twitter/gizzard/integration/{RepairSpec.scala => CopySpec.scala} (94%) diff --git a/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala b/src/test/scala/com/twitter/gizzard/integration/CopySpec.scala similarity index 94% rename from src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala rename to src/test/scala/com/twitter/gizzard/integration/CopySpec.scala index 7f7308c6..c59e46a5 100644 --- a/src/test/scala/com/twitter/gizzard/integration/RepairSpec.scala +++ b/src/test/scala/com/twitter/gizzard/integration/CopySpec.scala @@ -5,8 +5,8 @@ import scala.collection.JavaConversions._ import com.twitter.gizzard.thrift.conversions.Sequences._ import testserver.thrift.TestResult -class RepairSpec extends IntegrationSpecification with ConfiguredSpecification { - "Repair" should { +class CopySpec extends IntegrationSpecification with ConfiguredSpecification { + "Copy" should { val servers = List(1, 2, 3).map(testServer) val clients = servers.map(testServerClient) @@ -45,7 +45,7 @@ class RepairSpec extends IntegrationSpecification with ConfiguredSpecification { list.add(new com.twitter.gizzard.thrift.ShardId(shard1id.hostname, shard1id.tablePrefix)) list.add(new com.twitter.gizzard.thrift.ShardId(shard2id.hostname, shard2id.tablePrefix)) list.add(new com.twitter.gizzard.thrift.ShardId(shard3id.hostname, shard3id.tablePrefix)) - server1.managerServer.repair_shard(list) + server1.managerServer.copy_shard(list) def listElemenets(list: Seq[com.twitter.gizzard.testserver.TestResult]) = { list.map((e) => (e.id, e.value)) diff --git a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala index b580199b..30c07e7c 100644 --- a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala +++ b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala @@ -11,7 +11,7 @@ import collection.mutable.ListBuffer import com.twitter.gizzard import nameserver.NameServer import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException, Cursorable} -import scheduler.{JobScheduler, JsonJob, JsonJobParser, PrioritizingJobScheduler, Repairable, MultiShardRepair, RepairJobFactory, RepairJobParser} +import scheduler.{JobScheduler, JsonJob, JsonJobParser, PrioritizingJobScheduler, Entity, MultiShardCopy, CopyJobFactory, CopyJobParser} package object config { import com.twitter.gizzard.config._ @@ -74,7 +74,7 @@ package object config { Priority.Low.id -> new TestJobScheduler { val name = queueBase+"_low" } ) - def repairPriority = Priority.High.id + def copyPriority = Priority.High.id jobInjector.port = iPort manager.port = mPort @@ -97,12 +97,12 @@ class TestServer(conf: config.TestServer) extends GizzardServer[TestShard](conf) val readWriteShardAdapter = new TestReadWriteAdapter(_) val jobPriorities = List(Priority.High.id, Priority.Low.id) - def repairPriority = Priority.High.id - val repairFactory = new TestRepairFactory(nameServer, jobScheduler) + def copyPriority = Priority.High.id + val copyFactory = new TestCopyFactory(nameServer, jobScheduler) shardRepo += ("TestShard" -> new SqlShardFactory(conf.queryEvaluator(), conf.databaseConnection)) - jobCodec += ("Repair".r -> new TestRepairParser(nameServer, jobScheduler)) + jobCodec += ("Copy".r -> new TestCopyParser(nameServer, jobScheduler)) jobCodec += ("Put".r -> new PutParser(nameServer.findCurrentForwarding(0, _))) // service listener @@ -145,11 +145,11 @@ extends thrift.TestServer.Iface { // Shard Definitions -case class TestResult(id: Int, value: String, count: Int) extends Repairable[TestResult] { +case class TestResult(id: Int, value: String, count: Int) extends Entity[TestResult] { def similar(other: TestResult) = { id.compare(other.id) } - def shouldRepair(other: TestResult) = { + def isSchedulable(other: TestResult) = { similar(other) == 0 && value != other.value } } @@ -259,19 +259,15 @@ class PutJob(key: Int, value: String, forwarding: Long => TestShard) extends Jso def apply() { forwarding(key).put(key, value) } } -// class MetadataRepair(shardIds: Seq[ShardId], cursor: Cursor, count: Int, -// nameServer: NameServer[Shard], scheduler: PrioritizingJobScheduler) -// extends MultiShardRepair[Shard, Metadata, Cursor](shardIds, cursor, count, nameServer, scheduler, Repair.PRIORITY) { - -class TestRepairFactory(ns: NameServer[TestShard], s: PrioritizingJobScheduler) -extends RepairJobFactory[TestShard] { +class TestCopyFactory(ns: NameServer[TestShard], s: PrioritizingJobScheduler) +extends CopyJobFactory[TestShard] { def apply(shardIds: Seq[ShardId]) = { - new TestRepair(shardIds, TestCursor.Start, 500, ns, s) + new TestCopy(shardIds, TestCursor.Start, 500, ns, s) } } -class TestRepair(shardIds: Seq[ShardId], cursor: TestCursor, count: Int, - nameServer: NameServer[TestShard], scheduler: PrioritizingJobScheduler) extends MultiShardRepair[TestShard, TestResult, TestCursor](shardIds, cursor, count, nameServer, scheduler, Priority.High.id) { +class TestCopy(shardIds: Seq[ShardId], cursor: TestCursor, count: Int, + nameServer: NameServer[TestShard], scheduler: PrioritizingJobScheduler) extends MultiShardCopy[TestShard, TestResult, TestCursor](shardIds, cursor, count, nameServer, scheduler, Priority.High.id) { def select(shard: TestShard, cursor: TestCursor, count: Int) = shard.getAll(cursor, count) def scheduleBulk(otherShards: Seq[TestShard], items: Seq[TestResult]) = { @@ -280,18 +276,18 @@ class TestRepair(shardIds: Seq[ShardId], cursor: TestCursor, count: Int, def scheduleItem(missing: Boolean, list: (TestShard, ListBuffer[TestResult], TestCursor), tableId: Int, item: TestResult) = { scheduler.put(Priority.High.id, new PutJob(item.id, item.value, nameServer.findCurrentForwarding(0, _))) } - def nextRepair(lowestCursor: TestCursor) = { - if (lowestCursor.atEnd) None else Some(new TestRepair(shardIds, lowestCursor, count, nameServer, scheduler)) + def nextCopy(lowestCursor: TestCursor) = { + if (lowestCursor.atEnd) None else Some(new TestCopy(shardIds, lowestCursor, count, nameServer, scheduler)) } def serialize = { Map("cursor" -> cursor.position) } } -class TestRepairParser(ns: NameServer[TestShard], s: PrioritizingJobScheduler) -extends RepairJobParser[TestShard] { +class TestCopyParser(ns: NameServer[TestShard], s: PrioritizingJobScheduler) +extends CopyJobParser[TestShard] { def deserialize(m: Map[String, Any], shardIds: Seq[ShardId], count: Int) = { val cursor = new TestCursor(m("cursor").asInstanceOf[Int]) - new TestRepair(shardIds, cursor, count, ns, s) + new TestCopy(shardIds, cursor, count, ns, s) } } From 5dddf087e45781f4be5c0680a62d68a5a6a7bc97 Mon Sep 17 00:00:00 2001 From: Josh Hull Date: Mon, 6 Jun 2011 13:40:01 -0700 Subject: [PATCH 11/11] removed cursorable --- .../scala/com/twitter/gizzard/scheduler/CopyJob.scala | 10 ++++++---- .../scala/com/twitter/gizzard/shards/Cursorable.scala | 7 ------- .../com/twitter/gizzard/integration/TestServer.scala | 6 ++++-- 3 files changed, 10 insertions(+), 13 deletions(-) delete mode 100644 src/main/scala/com/twitter/gizzard/shards/Cursorable.scala diff --git a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala index 33da87ad..5b1b7f63 100644 --- a/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala +++ b/src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala @@ -6,7 +6,7 @@ import com.twitter.util.TimeConversions._ import net.lag.logging.Logger import nameserver.{NameServer, NonExistentShard} import collection.mutable.ListBuffer -import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException, Cursorable} +import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException} trait Entity[T] { def similar(other: T): Int @@ -109,11 +109,13 @@ abstract case class CopyJob[S <: Shard](shardIds: Seq[ShardId], def serialize: Map[String, Any] } -abstract class MultiShardCopy[S <: Shard, R <: Entity[R], C <: Cursorable[C]](shardIds: Seq[ShardId], cursor: C, count: Int, +abstract class MultiShardCopy[S <: Shard, R <: Entity[R], C <: Ordered[C]](shardIds: Seq[ShardId], cursor: C, count: Int, nameServer: NameServer[S], scheduler: PrioritizingJobScheduler, priority: Int) extends CopyJob(shardIds, count, nameServer, scheduler, priority) { private val log = Logger.get(getClass.getName) + def cursorAtEnd(cursor :C): Boolean + def nextCopy(lowestCursor: C): Option[CopyJob[S]] def scheduleItem(missing: Boolean, list: (S, ListBuffer[R], C), tableId: Int, item: R): Unit @@ -144,10 +146,10 @@ abstract class MultiShardCopy[S <: Shard, R <: Entity[R], C <: Cursorable[C]](sh } else if (nameServer.getCommonShardId(shardIds) == None) { throw new RuntimeException("these shardIds don't have a common ancestor") } else { - while (listCursors.forall(lc => !lc._2.isEmpty || lc._3.atEnd) && listCursors.exists(lc => !lc._2.isEmpty)) { + while (listCursors.forall(lc => !lc._2.isEmpty || cursorAtEnd(lc._3)) && listCursors.exists(lc => !lc._2.isEmpty)) { val tableId = tableIds(0) val firstList = smallestList(listCursors) - val finishedLists = listCursors.filter(lc => lc._3.atEnd && lc._2.isEmpty) + val finishedLists = listCursors.filter(lc => cursorAtEnd(lc._3) && lc._2.isEmpty) if (finishedLists.size == listCursors.size - 1) { scheduleBulk(finishedLists.map(_._1), firstList._2) firstList._2.clear diff --git a/src/main/scala/com/twitter/gizzard/shards/Cursorable.scala b/src/main/scala/com/twitter/gizzard/shards/Cursorable.scala deleted file mode 100644 index 06adec17..00000000 --- a/src/main/scala/com/twitter/gizzard/shards/Cursorable.scala +++ /dev/null @@ -1,7 +0,0 @@ -package com.twitter.gizzard.shards - -trait Cursorable[T] extends Ordered[T] { - def atEnd: Boolean - def atStart: Boolean -} - diff --git a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala index 30c07e7c..b29c3ea3 100644 --- a/src/test/scala/com/twitter/gizzard/integration/TestServer.scala +++ b/src/test/scala/com/twitter/gizzard/integration/TestServer.scala @@ -10,7 +10,7 @@ import collection.mutable.ListBuffer import com.twitter.gizzard import nameserver.NameServer -import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException, Cursorable} +import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException} import scheduler.{JobScheduler, JsonJob, JsonJobParser, PrioritizingJobScheduler, Entity, MultiShardCopy, CopyJobFactory, CopyJobParser} package object config { @@ -161,7 +161,7 @@ object TestCursor { val End = new TestCursor(EndPosition) } -case class TestCursor(position: Int) extends Cursorable[TestCursor] { +case class TestCursor(position: Int) extends Ordered[TestCursor] { def atStart = position == TestCursor.StartPosition def atEnd = position == TestCursor.EndPosition def compare(other: TestCursor) = { @@ -269,6 +269,8 @@ extends CopyJobFactory[TestShard] { class TestCopy(shardIds: Seq[ShardId], cursor: TestCursor, count: Int, nameServer: NameServer[TestShard], scheduler: PrioritizingJobScheduler) extends MultiShardCopy[TestShard, TestResult, TestCursor](shardIds, cursor, count, nameServer, scheduler, Priority.High.id) { + def cursorAtEnd(c: TestCursor) = c.atEnd + def select(shard: TestShard, cursor: TestCursor, count: Int) = shard.getAll(cursor, count) def scheduleBulk(otherShards: Seq[TestShard], items: Seq[TestResult]) = { otherShards.foreach(_.putAll(items.map{i => (i.id, i.value)}))