Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

wip

  • Loading branch information...
commit d8f40355062958f3c45096560b40ee765706ef60 1 parent a92bdbd
Kyle Maxwell authored
View
5 src/main/scala/com/twitter/gizzard/GizzardServer.scala
@@ -4,14 +4,15 @@ import com.twitter.util.Duration
import com.twitter.util.TimeConversions._
import net.lag.logging.Logger
import nameserver.{NameServer, BasicShardRepository}
-import scheduler.{CopyJobFactory, JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec}
+import scheduler.{JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec}
+import com.twitter.gizzard.scheduler.cursor.CursorJobFactory
import shards.{Shard, ReadWriteShard}
abstract class GizzardServer[S <: Shard, J <: JsonJob](config: gizzard.config.GizzardServer) {
def readWriteShardAdapter: ReadWriteShard[S] => S
- def copyFactory: CopyJobFactory[S]
+ def copyFactory: CursorJobFactory[S]
def jobPriorities: Seq[Int]
def copyPriority: Int
def start(): Unit
View
7 src/main/scala/com/twitter/gizzard/Jsonable.scala
@@ -0,0 +1,7 @@
+package com.twitter.gizzard
+
+import scala.collection._
+
+trait Jsonable {
+ def toMap: Map[String, Any]
+}
View
162 src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala
@@ -1,162 +0,0 @@
-package com.twitter.gizzard.scheduler
-
-import com.twitter.ostrich.Stats
-import com.twitter.util.TimeConversions._
-import net.lag.logging.Logger
-import collection.mutable.ListBuffer
-import collection.mutable
-import nameserver.{NameServer, NonExistentShard}
-import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException}
-
-object CopyJob {
- val MIN_COPY = 500
-}
-
-/**
- * A factory for creating a new copy job (with default count and a starting cursor) from a source
- * and destination shard ID.
- */
-trait CopyJobFactory[S <: Shard] extends ((ShardId, List[CopyDestination]) => CopyJob[S])
-
-case class CopyDestination(shardId: ShardId, baseId: Option[Long])
-case class CopyDestinationShard[S](shard: S, baseId: Option[Long])
-
-/**
- * A parser that creates a copy job out of json. The basic attributes (source shard ID, destination)
- * shard ID, and count) are parsed out first, and the remaining attributes are passed to
- * 'deserialize' to decode any shard-specific data (like a cursor).
- */
-trait CopyJobParser[S <: Shard] extends JsonJobParser {
- def deserialize(attributes: Map[String, Any], sourceId: ShardId,
- destinations: List[CopyDestination], count: Int): CopyJob[S]
-
- def apply(attributes: Map[String, Any]): JsonJob = {
- val sourceId = ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString)
-
- deserialize(attributes,
- sourceId,
- parseDestinations(attributes).toList,
- attributes("count").asInstanceOf[{def toInt: Int}].toInt)
- }
-
- private def parseDestinations(attributes: Map[String, Any]) = {
- val destinations = new ListBuffer[CopyDestination]
- var i = 0
- while(attributes.contains("destination_" + i + "_hostname")) {
- val prefix = "destination_" + i
- val baseKey = prefix + "_base_id"
- val baseId = if (attributes.contains(baseKey)) {
- Some(attributes(baseKey).asInstanceOf[{def toLong: Long}].toLong)
- } else {
- None
- }
- val shardId = ShardId(attributes(prefix + "_shard_hostname").toString, attributes(prefix + "_shard_table_prefix").toString)
- destinations += CopyDestination(shardId, baseId)
- i += 1
- }
-
- destinations.toList
- }
-}
-
-/**
- * A json-encodable job that represents the state of a copy from one shard to another.
- *
- * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items.
- * Other shard-specific data (like the cursor) can be encoded in 'serialize'.
- *
- * 'copyPage' is called to do the actual data copying. It should return a new CopyJob representing
- * the next chunk of work to do, or None if the entire copying job is complete.
- */
-abstract case class CopyJob[S <: Shard](sourceId: ShardId,
- destinations: List[CopyDestination],
- var count: Int,
- nameServer: NameServer[S],
- scheduler: JobScheduler[JsonJob])
- extends JsonJob {
- private val log = Logger.get(getClass.getName)
-
- override def shouldReplicate = false
-
- def toMap = {
- Map("source_shard_hostname" -> sourceId.hostname,
- "source_shard_table_prefix" -> sourceId.tablePrefix,
- "count" -> count
- ) ++ serialize ++ destinationsToMap
- }
-
- private def destinationsToMap = {
- var i = 0
- val map = mutable.Map[String, Any]()
- destinations.foreach { destination =>
- map("destination_" + i + "_hostname") = destination.shardId.hostname
- map("destination_" + i + "_table_prefix") = destination.shardId.tablePrefix
- destination.baseId.foreach { id =>
- map("destination_" + i + "_base_id") = id
- }
- i += 1
- }
- map
- }
-
- def finish() {
- destinations.foreach { dest =>
- nameServer.markShardBusy(dest.shardId, shards.Busy.Normal)
- }
- log.info("Copying finished for (type %s) from %s to %s",
- getClass.getName.split("\\.").last, sourceId, destinations)
- Stats.clearGauge(gaugeName)
- }
-
- def apply() {
- try {
- log.info("Copying shard block (type %s) from %s to %s: state=%s",
- getClass.getName.split("\\.").last, sourceId, destinations, toMap)
- val sourceShard = nameServer.findShardById(sourceId)
-
- val destinationShards = destinations.map { dest =>
- CopyDestinationShard[S](nameServer.findShardById(dest.shardId), dest.baseId)
- }
-
- // do this on each iteration, so it happens in the queue and can be retried if the db is busy:
- destinations.foreach { dest =>
- nameServer.markShardBusy(dest.shardId, shards.Busy.Busy)
- }
-
- val nextJob = copyPage(sourceShard, destinationShards, count)
-
- nextJob match {
- case Some(job) =>
- incrGauge
- scheduler.put(job)
- case None =>
- finish()
- }
- } catch {
- case e: NonExistentShard =>
- log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.")
- case e: ShardTimeoutException if (count > CopyJob.MIN_COPY) =>
- log.warning("Shard block copy timed out; trying a smaller block size.")
- count = (count * 0.9).toInt
- scheduler.put(this)
- case e: ShardDatabaseTimeoutException =>
- log.warning("Shard block copy failed to get a database connection; retrying.")
- scheduler.put(this)
- case e: Throwable =>
- log.warning("Shard block copy stopped due to exception: %s", e)
- throw e
- }
- }
-
- private def incrGauge = {
- Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count)
- }
-
- private def gaugeName = {
- "x-copying-" + sourceId + "-" + destinations
- }
-
- def copyPage(sourceShard: S, destinationShards: List[CopyDestinationShard[S]], count: Int): Option[CopyJob[S]]
-
- def serialize: Map[String, Any]
-}
View
94 src/main/scala/com/twitter/gizzard/scheduler/cursor/CursorJob.scala
@@ -0,0 +1,94 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import net.lag.logging.Logger
+import com.twitter.gizzard.shards._
+import com.twitter.gizzard.nameserver.{NameServer, NonExistentShard, InvalidShard, NameserverUninitialized}
+
+import com.twitter.ostrich.Stats
+import scala.collection.mutable.Map
+
+object CursorJob {
+ val MIN_PAGE = 500
+}
+
+/**
+ * A json-encodable job that represents the state of a copy-like operation from one shard to another.
+ *
+ * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items.
+ * Other shard-specific data (like the cursor) can be encoded in 'serialize'.
+ *
+ * 'cursorPage' is called to do the actual data cursoring. It should return a new CursorJob representing
+ * the next chunk of work to do, or None if the entire job is complete.
+ */
+abstract case class CursorJob[S <: Shard](source: Source,
+ destinations: DestinationList,
+ var count: Int,
+ nameServer: NameServer[S],
+ scheduler: JobScheduler[JsonJob])
+ extends JsonJob {
+ private val log = Logger.get(getClass.getName)
+
+ override def shouldReplicate = false
+
+ def name: String
+
+ def toMap = {
+ source.toMap ++ serialize ++ destinations.toMap
+ }
+
+ def finish() {
+ destinations.foreach { dest =>
+ nameServer.markShardBusy(dest.shardId, shards.Busy.Normal)
+ }
+ log.info("Finished for (type %s) from %s to %s",
+ getClass.getName.split("\\.").last, source, destinations)
+ Stats.clearGauge(gaugeName)
+ }
+
+ def apply() {
+ try {
+ log.info("Shard block (type %s) from %s to %s: state=%s",
+ getClass.getName.split("\\.").last, source, destinations, toMap)
+
+ // do this on each iteration, so it happens in the queue and can be retried if the db is busy:
+ destinations.foreach { dest =>
+ nameServer.markShardBusy(dest.shardId, shards.Busy.Busy)
+ }
+
+ val nextJob = cursorPage(source, destinations, count)
+
+ nextJob match {
+ case Some(job) =>
+ incrGauge
+ scheduler.put(job)
+ case None =>
+ finish()
+ }
+ } catch {
+ case e: NonExistentShard =>
+ log.error("Shard block "+name+" failed because one of the shards doesn't exist. Terminating...")
+ case e: ShardTimeoutException if (count > CursorJob.MIN_PAGE) =>
+ log.warning("Shard block "+name+" timed out; trying a smaller block size.")
+ count = (count * 0.9).toInt
+ scheduler.put(this)
+ case e: ShardDatabaseTimeoutException =>
+ log.warning("Shard block "+name+" failed to get a database connection; retrying.")
+ scheduler.put(this)
+ case e: Throwable =>
+ log.warning("Shard block "+name+" stopped due to exception: %s", e)
+ throw e
+ }
+ }
+
+ private def incrGauge = {
+ Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count)
+ }
+
+ private def gaugeName = {
+ "x-"+name+"-" + source + "-" + destinations
+ }
+
+ def cursorPage(source: Source, destinations: DestinationList, count: Int): Option[CursorJob[S]]
+
+ def serialize: collection.Map[String, Any] = Map.empty[String, Any]
+}
View
5 src/main/scala/com/twitter/gizzard/scheduler/cursor/CursorJobFactory.scala
@@ -0,0 +1,5 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import com.twitter.gizzard.shards._
+
+trait CursorJobFactory[S <: Shard] extends ((Source[S], DestinationList[S]) => CursorJob[S])
View
17 src/main/scala/com/twitter/gizzard/scheduler/cursor/CursorJobParser.scala
@@ -0,0 +1,17 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import com.twitter.gizzard.shards._
+import scala.collection._
+
+trait CursorJobParser[S <: Shard] extends JsonJobParser {
+ def deserialize(attributes: Map[String, Any], source: Source,
+ destinations: DestinationList, count: Int): CursorJob[S]
+
+ def apply(attributes: Map[String, Any]): JsonJob = {
+ deserialize(attributes,
+ Source(attributes),
+ DestinationList(attributes),
+ attributes("count").asInstanceOf[{def toInt: Int}].toInt)
+ }
+
+}
View
24 src/main/scala/com/twitter/gizzard/scheduler/cursor/Destination.scala
@@ -0,0 +1,24 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import scala.collection._
+import com.twitter.gizzard.Jsonable
+import com.twitter.gizzard.shards._
+
+
+trait Destination extends Endpoint {
+ val serializeName = "destination"
+}
+
+class DestinationImpl(var shardId: ShardId) extends Destination
+
+object Destination extends EndpointUtils {
+ val serializeName = "source"
+
+ def apply(map: Map[String, Any]) = {
+ new DestinationImpl(shardIdFromMap(map))
+ }
+
+ def apply(shardId: ShardId) = {
+ new DestinationImpl(shardId)
+ }
+}
View
52 src/main/scala/com/twitter/gizzard/scheduler/cursor/DestinationList.scala
@@ -0,0 +1,52 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.immutable._
+import com.twitter.gizzard.Jsonable
+import com.twitter.gizzard.shards._
+
+object DestinationList {
+ def apply(map: collection.Map[String, Any]): DestinationList = {
+ val list = new DestinationList
+ // FIXME
+ list
+ }
+
+ def apply(shardId: ShardId): DestinationList = apply(Seq(shardId))
+ def apply(shardIds: Collection[ShardId]): DestinationList = {
+ val list = new DestinationList
+ shardIds.foreach { id => list + Destination(id) }
+ list
+ }
+}
+
+class DestinationList extends ArrayBuffer[Destination] with Jsonable {
+ def toMap: Map[String, Any] = {
+ var map = Map.empty[String, Any]
+ var i = 0
+ this.foreach { dest =>
+ map ++= dest.toMap(i)
+ i += 1
+ }
+ map
+ }
+}
+//
+// private def parseDestinations(attributes: Map[String, Any]) = {
+// val destinations = DestinationLi
+// var i = 0
+// while(attributes.contains("destination_" + i + "_hostname")) {
+// val prefix = "destination_" + i
+// val baseKey = prefix + "_base_id"
+// val baseId = if (attributes.contains(baseKey)) {
+// Some(attributes(baseKey).asInstanceOf[{def toLong: Long}].toLong)
+// } else {
+// None
+// }
+// val shardId = ShardId(attributes(prefix + "_shard_hostname").toString, attributes(prefix + "_shard_table_prefix").toString)
+// destinations += CopyDestination(shardId, baseId)
+// i += 1
+// }
+//
+// destinations.toList
+// }
View
29 src/main/scala/com/twitter/gizzard/scheduler/cursor/Endpoint.scala
@@ -0,0 +1,29 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import com.twitter.gizzard.shards.ShardId
+import scala.collection.immutable._
+
+trait EndpointUtils {
+ val serializeName: String
+ def shardIdFromMap(map: collection.Map[String, Any]): ShardId = shardIdFromMap(map, 0)
+ def shardIdFromMap(map: collection.Map[String, Any], i: Int): ShardId = {
+ ShardId(map(serializeName + "_" + i + "_hostname").toString, map(serializeName + "_" + i + "_tableprefix").toString)
+ }
+}
+
+trait Endpoint[S <: shards.Shard] {
+ val serializeName: String
+
+ def toMap: Map[String, Any] = toMap(0)
+
+ def shardId: ShardId
+
+ def shard: S
+
+ def toMap(i: Int): Map[String, Any] = {
+ Map(
+ serializeName + "_" + i + "_hostname" -> shardId.hostname,
+ serializeName + "_" + i + "_table_prefix" -> shardId.tablePrefix
+ )
+ }
+}
View
24 src/main/scala/com/twitter/gizzard/scheduler/cursor/Source.scala
@@ -0,0 +1,24 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import scala.collection.immutable._
+import com.twitter.gizzard.Jsonable
+import com.twitter.gizzard.shards._
+
+
+trait Source extends Endpoint {
+ val serializeName = "source"
+}
+
+object Source extends EndpointUtils {
+ val serializeName = "source"
+ def apply(map: collection.Map[String, Any]) = {
+ new SourceImpl(shardIdFromMap(map))
+ }
+
+ def apply(shardId: ShardId) = {
+ new SourceImpl(shardId)
+ }
+
+}
+
+class SourceImpl(var shardId: ShardId) extends Source
View
8 src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
@@ -9,13 +9,14 @@ import com.twitter.gizzard.thrift.conversions.ShardInfo._
import com.twitter.gizzard.thrift.conversions.Forwarding._
import com.twitter.gizzard.thrift.conversions.Host._
import com.twitter.gizzard.shards._
-import com.twitter.gizzard.scheduler.{CopyDestination, CopyJob, CopyJobFactory, JsonJob, JobScheduler, PrioritizingJobScheduler}
+import com.twitter.gizzard.scheduler.{JsonJob, JobScheduler, PrioritizingJobScheduler}
+import com.twitter.gizzard.scheduler.cursor._
import com.twitter.gizzard.nameserver._
import net.lag.logging.Logger
import java.util.{List => JList}
-class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S], copier: CopyJobFactory[S], scheduler: PrioritizingJobScheduler[J], copyScheduler: JobScheduler[JsonJob]) extends Manager.Iface {
+class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S], copier: CursorJobFactory[S], scheduler: PrioritizingJobScheduler[J], copyScheduler: JobScheduler[JsonJob]) extends Manager.Iface {
val log = Logger.get(getClass.getName)
def wrapEx[A](f: => A): A = try { f } catch {
@@ -94,8 +95,9 @@ class ManagerService[S <: shards.Shard, J <: JsonJob](nameServer: NameServer[S],
def mark_shard_busy(id: ShardId, busy: Int) = {
wrapEx(nameServer.markShardBusy(id.fromThrift, busy.fromThrift))
}
+
def copy_shard(sourceId: ShardId, destinationId: ShardId) = {
- wrapEx(copyScheduler.put(copier(sourceId.fromThrift, List(CopyDestination(destinationId.fromThrift, None)))))
+ wrapEx(copyScheduler.put(copier(Source(sourceId.fromThrift), DestinationList(destinationId.fromThrift))))
}
def dump_nameserver(tableId: Int) = wrapEx(nameServer.dumpStructure(tableId).toThrift)
View
190 src/test/scala/com/twitter/gizzard/integration/MulticopyIntegrationSpec.scala
@@ -1,95 +1,95 @@
-package com.twitter.gizzard.integration
-
-import com.twitter.gizzard.thrift.conversions.Sequences._
-import testserver._
-import testserver.config.TestServerConfig
-import testserver.thrift.TestResult
-import java.io.File
-import scheduler.CopyDestination
-import java.util.concurrent.atomic.AtomicInteger
-import org.specs.mock.{ClassMocker, JMocker}
-import net.lag.configgy.{Config => CConfig}
-import com.twitter.util.TimeConversions._
-import com.twitter.gizzard.thrift.{JobInjectorService, TThreadServer, JobInjector}
-import nameserver.{Host, HostStatus, JobRelay}
-
-class MulticopyIntegrationSpec extends IntegrationSpecification with ConfiguredSpecification {
- override def testServer(i: Int) = {
- val port = 8000 + (i - 1) * 3
- val name = "testserver" + i
- new TestServer(TestServerConfig(name, port)) with TestServerFacts {
- val enum = i
- val nsDatabaseName = "gizzard_test_"+name+"_ns"
- val databaseName = "gizzard_test_"+name
- val basePort = port
- val injectorPort = port + 1
- val managerPort = port + 2
- val sqlShardInfos = List(
- shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
- "TestShard", "int", "int", shards.Busy.Normal),
- shards.ShardInfo(shards.ShardId("localhost", "t0_1"),
- "TestShard", "int", "int", shards.Busy.Normal),
- shards.ShardInfo(shards.ShardId("localhost", "t0_2"),
- "TestShard", "int", "int", shards.Busy.Normal)
- )
- val forwardings = List(
- nameserver.Forwarding(0, 0, sqlShardInfos.first.id)
- )
- val kestrelQueues = Seq("gizzard_test_"+name+"_high_queue",
- "gizzard_test_"+name+"_high_queue_errors",
- "gizzard_test_"+name+"_low_queue",
- "gizzard_test_"+name+"_low_queue_errors")
- }
- }
-
- "Multicopy" should {
- val server = testServer(1)
- val client = testServerClient(server)
-
- doBefore {
- resetTestServerDBs(server)
- setupServers(server)
- server.nameServer.reload()
- }
-
- doAfter { stopServers(server) }
-
- "copy to multiple locations" in {
- startServers(server)
- val nameserver = server.nameServer
-
- val sourceInfo :: dest0info :: dest2info :: _ = server.sqlShardInfos
-
- client.put(0, "foo")
- client.put(1, "bar")
- client.put(2, "baz")
- client.put(3, "bonk")
- client.get(3) must eventually(be_==(List(new TestResult(3, "bonk", 1)).toJavaList))
-
- val dest0 = CopyDestination(dest0info.id, Some(0))
- val dest2 = CopyDestination(dest2info.id, Some(2))
-
- val copy = new TestSplitFactory(server.nameServer, server.jobScheduler(Priority.Low.id))(sourceInfo.id, List(dest0, dest2))
-
- copy()
-
- val sourceShard = nameserver.findShardById(sourceInfo.id)
- val dest0Shard = nameserver.findShardById(dest0info.id)
- val dest2Shard = nameserver.findShardById(dest2info.id)
-
- dest0Shard.get(0) must eventually(be_==(Some((0, "foo", 1))))
- dest0Shard.get(1) must eventually(be_==(Some((1, "bar", 1))))
- dest0Shard.get(2) must eventually(be_==(None))
- dest0Shard.get(3) must eventually(be_==(None))
-
- dest2Shard.get(0) must eventually(be_==(None))
- dest2Shard.get(1) must eventually(be_==(None))
- dest2Shard.get(2) must eventually(be_==(Some((2, "baz", 1))))
- dest2Shard.get(3) must eventually(be_==(Some((3, "bonk", 1))))
- }
-
-
-
- }
-}
-
+// package com.twitter.gizzard.integration
+//
+// import com.twitter.gizzard.thrift.conversions.Sequences._
+// import testserver._
+// import testserver.config.TestServerConfig
+// import testserver.thrift.TestResult
+// import java.io.File
+// import scheduler.cursor._
+// import java.util.concurrent.atomic.AtomicInteger
+// import org.specs.mock.{ClassMocker, JMocker}
+// import net.lag.configgy.{Config => CConfig}
+// import com.twitter.util.TimeConversions._
+// import com.twitter.gizzard.thrift.{JobInjectorService, TThreadServer, JobInjector}
+// import nameserver.{Host, HostStatus, JobRelay}
+//
+// class MulticopyIntegrationSpec extends IntegrationSpecification with ConfiguredSpecification {
+// override def testServer(i: Int) = {
+// val port = 8000 + (i - 1) * 3
+// val name = "testserver" + i
+// new TestServer(TestServerConfig(name, port)) with TestServerFacts {
+// val enum = i
+// val nsDatabaseName = "gizzard_test_"+name+"_ns"
+// val databaseName = "gizzard_test_"+name
+// val basePort = port
+// val injectorPort = port + 1
+// val managerPort = port + 2
+// val sqlShardInfos = List(
+// shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
+// "TestShard", "int", "int", shards.Busy.Normal),
+// shards.ShardInfo(shards.ShardId("localhost", "t0_1"),
+// "TestShard", "int", "int", shards.Busy.Normal),
+// shards.ShardInfo(shards.ShardId("localhost", "t0_2"),
+// "TestShard", "int", "int", shards.Busy.Normal)
+// )
+// val forwardings = List(
+// nameserver.Forwarding(0, 0, sqlShardInfos.first.id)
+// )
+// val kestrelQueues = Seq("gizzard_test_"+name+"_high_queue",
+// "gizzard_test_"+name+"_high_queue_errors",
+// "gizzard_test_"+name+"_low_queue",
+// "gizzard_test_"+name+"_low_queue_errors")
+// }
+// }
+//
+// "Multicopy" should {
+// val server = testServer(1)
+// val client = testServerClient(server)
+//
+// doBefore {
+// resetTestServerDBs(server)
+// setupServers(server)
+// server.nameServer.reload()
+// }
+//
+// doAfter { stopServers(server) }
+//
+// "copy to multiple locations" in {
+// startServers(server)
+// val nameserver = server.nameServer
+//
+// val sourceInfo :: dest0info :: dest2info :: _ = server.sqlShardInfos
+//
+// client.put(0, "foo")
+// client.put(1, "bar")
+// client.put(2, "baz")
+// client.put(3, "bonk")
+// client.get(3) must eventually(be_==(List(new TestResult(3, "bonk", 1)).toJavaList))
+//
+// val dest0 = CopyDestination(dest0info.id, Some(0))
+// val dest2 = CopyDestination(dest2info.id, Some(2))
+//
+// val copy = new TestSplitFactory(server.nameServer, server.jobScheduler(Priority.Low.id))(sourceInfo.id, List(dest0, dest2))
+//
+// copy()
+//
+// val sourceShard = nameserver.findShardById(sourceInfo.id)
+// val dest0Shard = nameserver.findShardById(dest0info.id)
+// val dest2Shard = nameserver.findShardById(dest2info.id)
+//
+// dest0Shard.get(0) must eventually(be_==(Some((0, "foo", 1))))
+// dest0Shard.get(1) must eventually(be_==(Some((1, "bar", 1))))
+// dest0Shard.get(2) must eventually(be_==(None))
+// dest0Shard.get(3) must eventually(be_==(None))
+//
+// dest2Shard.get(0) must eventually(be_==(None))
+// dest2Shard.get(1) must eventually(be_==(None))
+// dest2Shard.get(2) must eventually(be_==(Some((2, "baz", 1))))
+// dest2Shard.get(3) must eventually(be_==(Some((3, "bonk", 1))))
+// }
+//
+//
+//
+// }
+// }
+//
View
4 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
@@ -6,10 +6,10 @@ import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
import com.twitter.querulous.config.Connection
import com.twitter.querulous.query.SqlQueryTimeoutException
import gizzard.GizzardServer
-import com.twitter.gizzard.scheduler.{CopyDestination, CopyDestinationShard}
+import com.twitter.gizzard.scheduler.cursor._
import nameserver.NameServer
import shards.{ShardId, ShardInfo, ShardException, ShardTimeoutException}
-import scheduler.{JobScheduler, JsonJob, CopyJob, CopyJobParser, CopyJobFactory, JsonJobParser, PrioritizingJobScheduler}
+import scheduler.{JobScheduler, JsonJob, JsonJobParser, PrioritizingJobScheduler}
object config {
import com.twitter.gizzard.config._
View
40 src/test/scala/com/twitter/gizzard/scheduler_new/CopyJobSpec.scala
@@ -3,11 +3,11 @@ package com.twitter.gizzard.scheduler
import com.twitter.util.TimeConversions._
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
+import cursor._
-
-class FakeCopy(val sourceShardId: shards.ShardId, val dests: List[CopyDestination], count: Int,
+class FakeCopy(val source: Source, val dests: DestinationList, count: Int,
nameServer: nameserver.NameServer[shards.Shard], scheduler: JobScheduler[JsonJob])(nextJob: => Option[FakeCopy])
- extends CopyJob[shards.Shard](sourceShardId, dests, count, nameServer, scheduler) {
+ extends CursorJob[shards.Shard](source, dests, count, nameServer, scheduler) {
def serialize = Map("cursor" -> 1)
@throws(classOf[Exception])
@@ -17,29 +17,29 @@ class FakeCopy(val sourceShardId: shards.ShardId, val dests: List[CopyDestinatio
override def equals(that: Any) = that match {
case that: FakeCopy =>
- this.sourceShardId == that.sourceShardId
+ this.source == that.source
case _ => false
}
}
object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"CopyJob" should {
- val sourceShardId = shards.ShardId("testhost", "1")
+ val source = shards.ShardId("testhost", "1")
val destinationShardId = shards.ShardId("testhost", "2")
val destinations = List(CopyDestination(destinationShardId, None))
val count = CopyJob.MIN_COPY + 1
val nextCopy = mock[FakeCopy]
val nameServer = mock[nameserver.NameServer[shards.Shard]]
val jobScheduler = mock[JobScheduler[JsonJob]]
- val makeCopy = new FakeCopy(sourceShardId, destinations, count, nameServer, jobScheduler)(_)
+ val makeCopy = new FakeCopy(source, destinations, count, nameServer, jobScheduler)(_)
val shard1 = mock[shards.Shard]
val shard2 = mock[shards.Shard]
"toMap" in {
val copy = makeCopy(Some(nextCopy))
copy.toMap mustEqual Map(
- "source_shard_hostname" -> sourceShardId.hostname,
- "source_shard_table_prefix" -> sourceShardId.tablePrefix,
+ "source_shard_hostname" -> source.hostname,
+ "source_shard_table_prefix" -> source.tablePrefix,
"destination_0_hostname" -> destinationShardId.hostname,
"destination_0_table_prefix" -> destinationShardId.tablePrefix,
"count" -> count
@@ -50,8 +50,8 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
val copy = makeCopy(Some(nextCopy))
val json = copy.toJson
json mustMatch "Copy"
- json mustMatch "\"source_shard_hostname\":\"%s\"".format(sourceShardId.hostname)
- json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(sourceShardId.tablePrefix)
+ json mustMatch "\"source_shard_hostname\":\"%s\"".format(source.hostname)
+ json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(source.tablePrefix)
json mustMatch "\"destination_0_hostname\":\"%s\"".format(destinationShardId.hostname)
json mustMatch "\"destination_0_table_prefix\":\"%s\"".format(destinationShardId.tablePrefix)
json mustMatch "\"count\":" + count
@@ -61,7 +61,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"normally" in {
val copy = makeCopy(Some(nextCopy))
expect {
- one(nameServer).findShardById(sourceShardId) willReturn shard1
+ one(nameServer).findShardById(source) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
one(jobScheduler).put(nextCopy)
@@ -73,7 +73,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"no shard" in {
val copy = makeCopy(Some(nextCopy))
expect {
- one(nameServer).findShardById(sourceShardId) willThrow new nameserver.NonExistentShard("foo")
+ one(nameServer).findShardById(source) willThrow new nameserver.NonExistentShard("foo")
never(jobScheduler).put(nextCopy)
}
@@ -81,9 +81,9 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
}
"with a database connection timeout" in {
- val copy = makeCopy(throw new shards.ShardDatabaseTimeoutException(100.milliseconds, sourceShardId))
+ val copy = makeCopy(throw new shards.ShardDatabaseTimeoutException(100.milliseconds, source))
expect {
- one(nameServer).findShardById(sourceShardId) willReturn shard1
+ one(nameServer).findShardById(source) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
one(jobScheduler).put(copy)
@@ -96,7 +96,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"with a random exception" in {
val copy = makeCopy(throw new Exception("boo"))
expect {
- one(nameServer).findShardById(sourceShardId) willReturn shard1
+ one(nameServer).findShardById(source) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
never(jobScheduler).put(nextCopy)
@@ -107,9 +107,9 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"with a shard timeout" in {
"early on" in {
- val copy = makeCopy(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId))
+ val copy = makeCopy(throw new shards.ShardTimeoutException(100.milliseconds, source))
expect {
- one(nameServer).findShardById(sourceShardId) willReturn shard1
+ one(nameServer).findShardById(source) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
one(jobScheduler).put(copy)
@@ -120,10 +120,10 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
"after too many retries" in {
val count = CopyJob.MIN_COPY - 1
- val copy = new FakeCopy(sourceShardId, destinations, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, sourceShardId))
+ val copy = new FakeCopy(source, destinations, count, nameServer, jobScheduler)(throw new shards.ShardTimeoutException(100.milliseconds, source))
expect {
- one(nameServer).findShardById(sourceShardId) willReturn shard1
+ one(nameServer).findShardById(source) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
never(jobScheduler).put(nextCopy)
@@ -137,7 +137,7 @@ object CopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker
val copy = makeCopy(None)
expect {
- one(nameServer).findShardById(sourceShardId) willReturn shard1
+ one(nameServer).findShardById(source) willReturn shard1
one(nameServer).findShardById(destinationShardId) willReturn shard2
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Busy)
one(nameServer).markShardBusy(destinationShardId, shards.Busy.Normal)
Please sign in to comment.
Something went wrong with that request. Please try again.