Permalink
Browse files

wip

  • Loading branch information...
1 parent a92bdbd commit d8f40355062958f3c45096560b40ee765706ef60 Kyle Maxwell committed Jan 7, 2011
@@ -4,14 +4,15 @@ import com.twitter.util.Duration
import com.twitter.util.TimeConversions._
import net.lag.logging.Logger
import nameserver.{NameServer, BasicShardRepository}
-import scheduler.{CopyJobFactory, JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec}
+import scheduler.{JobScheduler, JsonJob, JobConsumer, PrioritizingJobScheduler, ReplicatingJsonCodec}
+import com.twitter.gizzard.scheduler.cursor.CursorJobFactory
import shards.{Shard, ReadWriteShard}
abstract class GizzardServer[S <: Shard, J <: JsonJob](config: gizzard.config.GizzardServer) {
def readWriteShardAdapter: ReadWriteShard[S] => S
- def copyFactory: CopyJobFactory[S]
+ def copyFactory: CursorJobFactory[S]
def jobPriorities: Seq[Int]
def copyPriority: Int
def start(): Unit
@@ -0,0 +1,7 @@
+package com.twitter.gizzard
+
+import scala.collection._
+
+trait Jsonable {
+ def toMap: Map[String, Any]
+}
@@ -1,162 +0,0 @@
-package com.twitter.gizzard.scheduler
-
-import com.twitter.ostrich.Stats
-import com.twitter.util.TimeConversions._
-import net.lag.logging.Logger
-import collection.mutable.ListBuffer
-import collection.mutable
-import nameserver.{NameServer, NonExistentShard}
-import shards.{Shard, ShardId, ShardDatabaseTimeoutException, ShardTimeoutException}
-
-object CopyJob {
- val MIN_COPY = 500
-}
-
-/**
- * A factory for creating a new copy job (with default count and a starting cursor) from a source
- * and destination shard ID.
- */
-trait CopyJobFactory[S <: Shard] extends ((ShardId, List[CopyDestination]) => CopyJob[S])
-
-case class CopyDestination(shardId: ShardId, baseId: Option[Long])
-case class CopyDestinationShard[S](shard: S, baseId: Option[Long])
-
-/**
- * A parser that creates a copy job out of json. The basic attributes (source shard ID, destination)
- * shard ID, and count) are parsed out first, and the remaining attributes are passed to
- * 'deserialize' to decode any shard-specific data (like a cursor).
- */
-trait CopyJobParser[S <: Shard] extends JsonJobParser {
- def deserialize(attributes: Map[String, Any], sourceId: ShardId,
- destinations: List[CopyDestination], count: Int): CopyJob[S]
-
- def apply(attributes: Map[String, Any]): JsonJob = {
- val sourceId = ShardId(attributes("source_shard_hostname").toString, attributes("source_shard_table_prefix").toString)
-
- deserialize(attributes,
- sourceId,
- parseDestinations(attributes).toList,
- attributes("count").asInstanceOf[{def toInt: Int}].toInt)
- }
-
- private def parseDestinations(attributes: Map[String, Any]) = {
- val destinations = new ListBuffer[CopyDestination]
- var i = 0
- while(attributes.contains("destination_" + i + "_hostname")) {
- val prefix = "destination_" + i
- val baseKey = prefix + "_base_id"
- val baseId = if (attributes.contains(baseKey)) {
- Some(attributes(baseKey).asInstanceOf[{def toLong: Long}].toLong)
- } else {
- None
- }
- val shardId = ShardId(attributes(prefix + "_shard_hostname").toString, attributes(prefix + "_shard_table_prefix").toString)
- destinations += CopyDestination(shardId, baseId)
- i += 1
- }
-
- destinations.toList
- }
-}
-
-/**
- * A json-encodable job that represents the state of a copy from one shard to another.
- *
- * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items.
- * Other shard-specific data (like the cursor) can be encoded in 'serialize'.
- *
- * 'copyPage' is called to do the actual data copying. It should return a new CopyJob representing
- * the next chunk of work to do, or None if the entire copying job is complete.
- */
-abstract case class CopyJob[S <: Shard](sourceId: ShardId,
- destinations: List[CopyDestination],
- var count: Int,
- nameServer: NameServer[S],
- scheduler: JobScheduler[JsonJob])
- extends JsonJob {
- private val log = Logger.get(getClass.getName)
-
- override def shouldReplicate = false
-
- def toMap = {
- Map("source_shard_hostname" -> sourceId.hostname,
- "source_shard_table_prefix" -> sourceId.tablePrefix,
- "count" -> count
- ) ++ serialize ++ destinationsToMap
- }
-
- private def destinationsToMap = {
- var i = 0
- val map = mutable.Map[String, Any]()
- destinations.foreach { destination =>
- map("destination_" + i + "_hostname") = destination.shardId.hostname
- map("destination_" + i + "_table_prefix") = destination.shardId.tablePrefix
- destination.baseId.foreach { id =>
- map("destination_" + i + "_base_id") = id
- }
- i += 1
- }
- map
- }
-
- def finish() {
- destinations.foreach { dest =>
- nameServer.markShardBusy(dest.shardId, shards.Busy.Normal)
- }
- log.info("Copying finished for (type %s) from %s to %s",
- getClass.getName.split("\\.").last, sourceId, destinations)
- Stats.clearGauge(gaugeName)
- }
-
- def apply() {
- try {
- log.info("Copying shard block (type %s) from %s to %s: state=%s",
- getClass.getName.split("\\.").last, sourceId, destinations, toMap)
- val sourceShard = nameServer.findShardById(sourceId)
-
- val destinationShards = destinations.map { dest =>
- CopyDestinationShard[S](nameServer.findShardById(dest.shardId), dest.baseId)
- }
-
- // do this on each iteration, so it happens in the queue and can be retried if the db is busy:
- destinations.foreach { dest =>
- nameServer.markShardBusy(dest.shardId, shards.Busy.Busy)
- }
-
- val nextJob = copyPage(sourceShard, destinationShards, count)
-
- nextJob match {
- case Some(job) =>
- incrGauge
- scheduler.put(job)
- case None =>
- finish()
- }
- } catch {
- case e: NonExistentShard =>
- log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.")
- case e: ShardTimeoutException if (count > CopyJob.MIN_COPY) =>
- log.warning("Shard block copy timed out; trying a smaller block size.")
- count = (count * 0.9).toInt
- scheduler.put(this)
- case e: ShardDatabaseTimeoutException =>
- log.warning("Shard block copy failed to get a database connection; retrying.")
- scheduler.put(this)
- case e: Throwable =>
- log.warning("Shard block copy stopped due to exception: %s", e)
- throw e
- }
- }
-
- private def incrGauge = {
- Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count)
- }
-
- private def gaugeName = {
- "x-copying-" + sourceId + "-" + destinations
- }
-
- def copyPage(sourceShard: S, destinationShards: List[CopyDestinationShard[S]], count: Int): Option[CopyJob[S]]
-
- def serialize: Map[String, Any]
-}
@@ -0,0 +1,94 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import net.lag.logging.Logger
+import com.twitter.gizzard.shards._
+import com.twitter.gizzard.nameserver.{NameServer, NonExistentShard, InvalidShard, NameserverUninitialized}
+
+import com.twitter.ostrich.Stats
+import scala.collection.mutable.Map
+
+object CursorJob {
+ val MIN_PAGE = 500
+}
+
+/**
+ * A json-encodable job that represents the state of a copy-like operation from one shard to another.
+ *
+ * The 'toMap' implementation encodes the source and destination shard IDs, and the count of items.
+ * Other shard-specific data (like the cursor) can be encoded in 'serialize'.
+ *
+ * 'cursorPage' is called to do the actual data cursoring. It should return a new CursorJob representing
+ * the next chunk of work to do, or None if the entire job is complete.
+ */
+abstract case class CursorJob[S <: Shard](source: Source,
+ destinations: DestinationList,
+ var count: Int,
+ nameServer: NameServer[S],
+ scheduler: JobScheduler[JsonJob])
+ extends JsonJob {
+ private val log = Logger.get(getClass.getName)
+
+ override def shouldReplicate = false
+
+ def name: String
+
+ def toMap = {
+ source.toMap ++ serialize ++ destinations.toMap
+ }
+
+ def finish() {
+ destinations.foreach { dest =>
+ nameServer.markShardBusy(dest.shardId, shards.Busy.Normal)
+ }
+ log.info("Finished for (type %s) from %s to %s",
+ getClass.getName.split("\\.").last, source, destinations)
+ Stats.clearGauge(gaugeName)
+ }
+
+ def apply() {
+ try {
+ log.info("Shard block (type %s) from %s to %s: state=%s",
+ getClass.getName.split("\\.").last, source, destinations, toMap)
+
+ // do this on each iteration, so it happens in the queue and can be retried if the db is busy:
+ destinations.foreach { dest =>
+ nameServer.markShardBusy(dest.shardId, shards.Busy.Busy)
+ }
+
+ val nextJob = cursorPage(source, destinations, count)
+
+ nextJob match {
+ case Some(job) =>
+ incrGauge
+ scheduler.put(job)
+ case None =>
+ finish()
+ }
+ } catch {
+ case e: NonExistentShard =>
+ log.error("Shard block "+name+" failed because one of the shards doesn't exist. Terminating...")
+ case e: ShardTimeoutException if (count > CursorJob.MIN_PAGE) =>
+ log.warning("Shard block "+name+" timed out; trying a smaller block size.")
+ count = (count * 0.9).toInt
+ scheduler.put(this)
+ case e: ShardDatabaseTimeoutException =>
+ log.warning("Shard block "+name+" failed to get a database connection; retrying.")
+ scheduler.put(this)
+ case e: Throwable =>
+ log.warning("Shard block "+name+" stopped due to exception: %s", e)
+ throw e
+ }
+ }
+
+ private def incrGauge = {
+ Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count)
+ }
+
+ private def gaugeName = {
+ "x-"+name+"-" + source + "-" + destinations
+ }
+
+ def cursorPage(source: Source, destinations: DestinationList, count: Int): Option[CursorJob[S]]
+
+ def serialize: collection.Map[String, Any] = Map.empty[String, Any]
+}
@@ -0,0 +1,5 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import com.twitter.gizzard.shards._
+
+trait CursorJobFactory[S <: Shard] extends ((Source[S], DestinationList[S]) => CursorJob[S])
@@ -0,0 +1,17 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import com.twitter.gizzard.shards._
+import scala.collection._
+
+trait CursorJobParser[S <: Shard] extends JsonJobParser {
+ def deserialize(attributes: Map[String, Any], source: Source,
+ destinations: DestinationList, count: Int): CursorJob[S]
+
+ def apply(attributes: Map[String, Any]): JsonJob = {
+ deserialize(attributes,
+ Source(attributes),
+ DestinationList(attributes),
+ attributes("count").asInstanceOf[{def toInt: Int}].toInt)
+ }
+
+}
@@ -0,0 +1,24 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import scala.collection._
+import com.twitter.gizzard.Jsonable
+import com.twitter.gizzard.shards._
+
+
+trait Destination extends Endpoint {
+ val serializeName = "destination"
+}
+
+class DestinationImpl(var shardId: ShardId) extends Destination
+
+object Destination extends EndpointUtils {
+ val serializeName = "source"
+
+ def apply(map: Map[String, Any]) = {
+ new DestinationImpl(shardIdFromMap(map))
+ }
+
+ def apply(shardId: ShardId) = {
+ new DestinationImpl(shardId)
+ }
+}
@@ -0,0 +1,52 @@
+package com.twitter.gizzard.scheduler.cursor
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.immutable._
+import com.twitter.gizzard.Jsonable
+import com.twitter.gizzard.shards._
+
+object DestinationList {
+ def apply(map: collection.Map[String, Any]): DestinationList = {
+ val list = new DestinationList
+ // FIXME
+ list
+ }
+
+ def apply(shardId: ShardId): DestinationList = apply(Seq(shardId))
+ def apply(shardIds: Collection[ShardId]): DestinationList = {
+ val list = new DestinationList
+ shardIds.foreach { id => list + Destination(id) }
+ list
+ }
+}
+
+class DestinationList extends ArrayBuffer[Destination] with Jsonable {
+ def toMap: Map[String, Any] = {
+ var map = Map.empty[String, Any]
+ var i = 0
+ this.foreach { dest =>
+ map ++= dest.toMap(i)
+ i += 1
+ }
+ map
+ }
+}
+//
+// private def parseDestinations(attributes: Map[String, Any]) = {
+// val destinations = DestinationLi
+// var i = 0
+// while(attributes.contains("destination_" + i + "_hostname")) {
+// val prefix = "destination_" + i
+// val baseKey = prefix + "_base_id"
+// val baseId = if (attributes.contains(baseKey)) {
+// Some(attributes(baseKey).asInstanceOf[{def toLong: Long}].toLong)
+// } else {
+// None
+// }
+// val shardId = ShardId(attributes(prefix + "_shard_hostname").toString, attributes(prefix + "_shard_table_prefix").toString)
+// destinations += CopyDestination(shardId, baseId)
+// i += 1
+// }
+//
+// destinations.toList
+// }
Oops, something went wrong.

0 comments on commit d8f4035

Please sign in to comment.