Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

wip

  • Loading branch information...
commit a4e1c1704df051c0ec30d3f0234616866df95590 1 parent 88e6189
@freels freels authored
View
10 src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
@@ -33,6 +33,10 @@ class JobRelay(
retries: Int)
extends (String => JobRelayCluster) {
+ private def createClient(hosts: Seq[Host]) = {
+ new JobRelayCluster(hosts, priority, framed, timeout, retries)
+ }
+
private val clients = Map(hostMap.flatMap { case (c, hs) =>
var blocked = false
val onlineHosts = hs.filter(_.status match {
@@ -44,13 +48,15 @@ extends (String => JobRelayCluster) {
if (onlineHosts.isEmpty) {
if (blocked) Seq(c -> new BlockedJobRelayCluster(c)) else Seq()
} else {
- Seq(c -> new JobRelayCluster(onlineHosts, priority, framed, timeout, retries))
+ Seq(c -> createClient(onlineHosts))
}
}.toSeq: _*)
val clusters = clients.keySet
- def apply(cluster: String) = clients.getOrElse(cluster, NullJobRelayCluster)
+ def apply(c: String) = clients.getOrElse(c, NullJobRelayCluster)
+
+ def getOnlineOrOfflineCluster(c: String) = clients.getOrElse(c, createClient(hostMap(c)))
}
class JobRelayCluster(
View
9 src/main/scala/com/twitter/gizzard/scheduler/CopyJob.scala
@@ -124,13 +124,15 @@ class BasicCopyJobFactory[S <: Shard](
extends CopyJobFactory[S] {
def apply(sourceId: ShardId, destId: ShardId) = {
+ println("creating a new job")
new BasicCopyJob(sourceId, destId, None, defaultCount, ns, s, copyAdapter)
}
def parser = new CopyJobParser[S] {
def deserialize(attrs: Map[String, Any], sourceId: ShardId, destId: ShardId, count: Int) = {
- val cursor = attrs("cursor").asInstanceOf[Map[String,Any]]
- new BasicCopyJob(sourceId, destId, Some(cursor), count, ns, s, copyAdapter)
+ println("parsing a new job")
+ val cursor = attrs.get("cursor").map(_.asInstanceOf[Map[String,Any]])
+ new BasicCopyJob(sourceId, destId, cursor, count, ns, s, copyAdapter)
}
}
}
@@ -145,9 +147,10 @@ class BasicCopyJob[S <: Shard](
copyAdapter: ShardCopyAdapter[S])
extends CopyJob[S](sourceId, destId, count, nameServer, scheduler) {
- def serialize = Map("cursor" -> cursor)
+ def serialize = cursor.map(c => Map("cursor" -> c)).getOrElse(Map())
def copyPage(source: S, dest: S, count: Int) = {
+ println("here")
copyAdapter.copyPage(source, dest, cursor, count).map { nextCursor =>
new BasicCopyJob(sourceId, destId, Some(nextCursor), count, nameServer, scheduler, copyAdapter)
}
View
2  src/main/scala/com/twitter/gizzard/scheduler/CrossClusterCopyJob.scala
@@ -180,7 +180,7 @@ extends CrossClusterCopyJob[S](sourceId, destId, count, factory) {
def applyPage() {
try {
- nameServer.jobRelay(destId.cluster)(List(writeJob.toJson))
+ nameServer.jobRelay.getOnlineOrOfflineCluster(destId.cluster)(List(writeJob.toJson))
} catch {
case e: ShardTimeoutException if (count > CopyJob.MIN_COPY) =>
log.warning("Shard block " + phase + " timed out; trying a smaller block size.")
View
107 src/test/scala/com/twitter/gizzard/scheduler_new/BasicCopyJobSpec.scala
@@ -0,0 +1,107 @@
+package com.twitter.gizzard.scheduler
+
+import com.twitter.util.TimeConversions._
+import org.specs.Specification
+import org.specs.mock.{ClassMocker, JMocker}
+import shards._
+
+class FakeCopyAdapter(maxCount: Int) extends ShardCopyAdapter[Shard] {
+ var pagesWritten = 0
+
+ def readPage(source: Shard, cursor: Option[Map[String,Any]], count: Int) = {
+ val lastCount = cursor.flatMap(_.get("last_count")).map(_.asInstanceOf[{ def toInt: Int}].toInt) getOrElse 0
+ val nextCursor = if (lastCount < maxCount) Some(Map("last_count" -> (lastCount + count))) else None
+ println(nextCursor)
+ println(count)
+ (Map("count" -> (lastCount + count)), nextCursor)
+ }
+
+ def writePage(dest: Shard, data: Map[String,Any]) {
+ pagesWritten += 1
+ }
+
+ def copyPage(source: Shard, dest: Shard, cursor: Option[Map[String,Any]], count: Int) = {
+ val (data, nextCursor) = readPage(source, cursor, count)
+ writePage(dest, data)
+ nextCursor
+ }
+}
+
+object BasicCopyJobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
+ "BasicCopyJob" should {
+ val sourceId = ShardId("testhost", "1")
+ val destinationId = ShardId("testhost", "2")
+ val count = 1
+ val nameServer = mock[nameserver.NameServer[Shard]]
+ val jobScheduler = mock[JobScheduler[JsonJob]]
+ val shardCopyAdapter = new FakeCopyAdapter(10)
+ val copyFactory = new BasicCopyJobFactory(nameServer, jobScheduler, shardCopyAdapter, count)
+ val codec = new JsonCodec({ a => error(new String(a, "UTF-8")) })
+ codec += ("BasicCopyJob".r -> copyFactory.parser)
+ val source = mock[Shard]
+ val destination = mock[Shard]
+
+ "toMap" in {
+ val copy = copyFactory(sourceId, destinationId)
+ copy.toMap mustEqual Map(
+ "source_shard_hostname" -> sourceId.hostname,
+ "source_shard_table_prefix" -> sourceId.tablePrefix,
+ "destination_shard_hostname" -> destinationId.hostname,
+ "destination_shard_table_prefix" -> destinationId.tablePrefix,
+ "count" -> count
+ ) ++ copy.serialize
+ }
+
+ "toJson" in {
+ val copy = copyFactory(sourceId, destinationId)
+ val json = copy.toJson
+ json mustMatch "Copy"
+ json mustMatch "\"source_shard_hostname\":\"%s\"".format(sourceId.hostname)
+ json mustMatch "\"source_shard_table_prefix\":\"%s\"".format(sourceId.tablePrefix)
+ json mustMatch "\"destination_shard_hostname\":\"%s\"".format(destinationId.hostname)
+ json mustMatch "\"destination_shard_table_prefix\":\"%s\"".format(destinationId.tablePrefix)
+ json mustMatch "\"count\":" + count
+ }
+
+ "parse" in {
+ val copy = copyFactory(sourceId, destinationId)
+ val json = copy.toJson
+ val parsedCopy = codec.inflate(json.getBytes("UTF-8"))
+
+ parsedCopy.toJson mustEqual json
+ }
+
+ "multiple page apply" in {
+ val copy = copyFactory(sourceId, destinationId)
+
+ expect {
+ one(nameServer).markShardBusy(destinationId, Busy.Busy)
+ one(nameServer).findShardById(sourceId) willReturn source
+ one(nameServer).findShardById(destinationId) willReturn destination
+ one(nameServer).markShardBusy(destinationId, Busy.Normal)
+ }
+
+ copy.apply()
+
+ println(shardCopyAdapter.pagesWritten)
+ }
+
+ "single page apply" in {
+ val shardCopyAdapter = new FakeCopyAdapter(1)
+ val copyFactory = new BasicCopyJobFactory(nameServer, jobScheduler, shardCopyAdapter, count)
+ val copy = copyFactory(sourceId, destinationId)
+
+ expect {
+ one(nameServer).markShardBusy(destinationId, Busy.Busy)
+ one(nameServer).findShardById(sourceId) willReturn source
+ one(nameServer).findShardById(destinationId) willReturn destination
+ val nextJobP = capturingParam[JsonJob]
+ one(jobScheduler).put(nextJobP.capture)
+ }
+
+ copy.apply()
+
+ println(shardCopyAdapter.pagesWritten)
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.