Permalink
Browse files

Merge branch 'tbird' of github.com:twitter/gizzard into tbird

  • Loading branch information...
2 parents 6c40a93 + 6df36cb commit ae5a9195ff7375cf38bb9a357e4ab3c741c5b8ed @freels freels committed Nov 4, 2010
Showing with 1,448 additions and 1,442 deletions.
  1. +1 −1 project/build.properties
  2. +1 −2 src/main/scala/com/twitter/gizzard/Process.scala
  3. +0 −20 src/main/scala/com/twitter/gizzard/jobs/BoundJob.scala
  4. +0 −85 src/main/scala/com/twitter/gizzard/jobs/Copy.scala
  5. +0 −51 src/main/scala/com/twitter/gizzard/jobs/ErrorHandlingJob.scala
  6. +0 −11 src/main/scala/com/twitter/gizzard/jobs/Job.scala
  7. +0 −24 src/main/scala/com/twitter/gizzard/jobs/JobParser.scala
  8. +0 −31 src/main/scala/com/twitter/gizzard/jobs/JobWithTasks.scala
  9. +0 −21 src/main/scala/com/twitter/gizzard/jobs/JournaledJob.scala
  10. +0 −14 src/main/scala/com/twitter/gizzard/jobs/LoggingJob.scala
  11. +0 −28 src/main/scala/com/twitter/gizzard/jobs/PolymorphicJobParser.scala
  12. +0 −20 src/main/scala/com/twitter/gizzard/jobs/Schedulable.scala
  13. +0 −20 src/main/scala/com/twitter/gizzard/jobs/SchedulableWithTasks.scala
  14. +0 −2 src/main/scala/com/twitter/gizzard/nameserver/MemoryShard.scala
  15. +4 −6 src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala
  16. +0 −3 src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
  17. +0 −96 src/main/scala/com/twitter/gizzard/scheduler/ErrorHandlingJobQueue.scala
  18. +0 −53 src/main/scala/com/twitter/gizzard/scheduler/JobEvaluatorThread.scala
  19. +0 −8 src/main/scala/com/twitter/gizzard/scheduler/JobQueue.scala
  20. +0 −85 src/main/scala/com/twitter/gizzard/scheduler/JobScheduler.scala
  21. +0 −83 src/main/scala/com/twitter/gizzard/scheduler/KestrelMessageQueue.scala
  22. +0 −8 src/main/scala/com/twitter/gizzard/scheduler/LoggerScheduler.scala
  23. +0 −8 src/main/scala/com/twitter/gizzard/scheduler/MessageQueue.scala
  24. +0 −37 src/main/scala/com/twitter/gizzard/scheduler/PrioritizingJobScheduler.scala
  25. +0 −9 src/main/scala/com/twitter/gizzard/scheduler/Scheduler.scala
  26. +113 −0 src/main/scala/com/twitter/gizzard/scheduler_new/CopyJob.scala
  27. +22 −0 src/main/scala/com/twitter/gizzard/scheduler_new/Job.scala
  28. +36 −0 src/main/scala/com/twitter/gizzard/scheduler_new/JobQueue.scala
  29. +172 −0 src/main/scala/com/twitter/gizzard/scheduler_new/JobScheduler.scala
  30. +21 −0 src/main/scala/com/twitter/gizzard/scheduler_new/JournaledJob.scala
  31. +58 −0 src/main/scala/com/twitter/gizzard/scheduler_new/JsonCodec.scala
  32. +60 −0 src/main/scala/com/twitter/gizzard/scheduler_new/JsonJob.scala
  33. +88 −0 src/main/scala/com/twitter/gizzard/scheduler_new/KestrelJobQueue.scala
  34. +90 −0 src/main/scala/com/twitter/gizzard/scheduler_new/MemoryJobQueue.scala
  35. +36 −0 src/main/scala/com/twitter/gizzard/scheduler_new/NestedJob.scala
  36. +47 −0 src/main/scala/com/twitter/gizzard/scheduler_new/PrioritizingJobScheduler.scala
  37. +7 −6 src/main/scala/com/twitter/gizzard/thrift/GizzardServices.scala
  38. +1 −11 src/main/scala/com/twitter/gizzard/thrift/JobManagerService.scala
  39. +3 −4 src/main/scala/com/twitter/gizzard/thrift/ShardManagerService.scala
  40. +0 −1 src/main/thrift/JobManager.thrift
  41. +0 −17 src/test/scala/com/twitter/gizzard/fake/Job.scala
  42. +0 −9 src/test/scala/com/twitter/gizzard/fake/JobParser.scala
  43. +0 −41 src/test/scala/com/twitter/gizzard/fake/MessageQueue.scala
  44. +0 −24 src/test/scala/com/twitter/gizzard/jobs/BoundJobSpec.scala
  45. +0 −57 src/test/scala/com/twitter/gizzard/jobs/ErrorHandlingJobParserSpec.scala
  46. +0 −61 src/test/scala/com/twitter/gizzard/jobs/ErrorHandlingJobSpec.scala
  47. +0 −22 src/test/scala/com/twitter/gizzard/jobs/JobParserSpec.scala
  48. +0 −63 src/test/scala/com/twitter/gizzard/jobs/JobWithTasksSpec.scala
  49. +0 −25 src/test/scala/com/twitter/gizzard/jobs/PolymorphicJobParserSpec.scala
  50. +0 −39 src/test/scala/com/twitter/gizzard/jobs/SchedulableWithTasksSpec.scala
  51. +0 −109 src/test/scala/com/twitter/gizzard/scheduler/ErrorHandlingJobQueueSpec.scala
  52. +0 −32 src/test/scala/com/twitter/gizzard/scheduler/JobEvaluatorThreadSpec.scala
  53. +0 −45 src/test/scala/com/twitter/gizzard/scheduler/JobSchedulerSpec.scala
  54. +0 −63 src/test/scala/com/twitter/gizzard/scheduler/KestrelMessageQueueSpec.scala
  55. +40 −41 src/test/scala/com/twitter/gizzard/{jobs/CopySpec.scala → scheduler_new/CopyJobSpec.scala}
  56. +202 −0 src/test/scala/com/twitter/gizzard/scheduler_new/JobSchedulerSpec.scala
  57. +12 −10 src/test/scala/com/twitter/gizzard/{jobs → scheduler_new}/JournaledJobSpec.scala
  58. +37 −0 src/test/scala/com/twitter/gizzard/scheduler_new/JsonCodecSpec.scala
  59. +73 −0 src/test/scala/com/twitter/gizzard/scheduler_new/JsonJobParserSpec.scala
  60. +157 −0 src/test/scala/com/twitter/gizzard/scheduler_new/KestrelJobQueueSpec.scala
  61. +79 −0 src/test/scala/com/twitter/gizzard/scheduler_new/MemoryJobQueueSpec.scala
  62. +63 −0 src/test/scala/com/twitter/gizzard/scheduler_new/NestedJobSpec.scala
  63. +14 −12 src/test/scala/com/twitter/gizzard/{scheduler → scheduler_new}/PrioritizingJobSchedulerSpec.scala
  64. +3 −15 src/test/scala/com/twitter/gizzard/thrift/JobManagerServiceSpec.scala
  65. +8 −9 src/test/scala/com/twitter/gizzard/thrift/ShardManagerServiceSpec.scala
@@ -3,7 +3,7 @@
project.organization=com.twitter
project.name=gizzard
sbt.version=0.7.4
-project.version=1.4.4-tbird-SNAPSHOT
+project.version=1.5.0-tbird-SNAPSHOT
def.scala.version=2.7.7
build.scala.versions=2.7.7
project.initialize=false
@@ -1,10 +1,9 @@
package com.twitter.gizzard
-
trait Process {
def start()
def pause()
def resume()
def shutdown()
def isShutdown: Boolean
-}
+}
@@ -1,20 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import scala.reflect.Manifest
-
-trait UnboundJobParser[E] extends (Map[String, Any] => UnboundJob[E])
-
-trait UnboundJob[E] extends Schedulable {
- def apply(environment: E)
-}
-
-class BoundJobParser[E](unboundJobParser: UnboundJobParser[E], bindingEnvironment: E) extends JobParser {
- def apply(json: Map[String, Map[String, Any]]) = {
- val attributes = json.toList.first._2
- new BoundJob(unboundJobParser(attributes), bindingEnvironment)
- }
-}
-
-case class BoundJob[E](unboundJob: UnboundJob[E], environment: E) extends SchedulableProxy(unboundJob) with Job {
- def apply() { unboundJob(environment) }
-}
@@ -1,85 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import com.twitter.xrayspecs.TimeConversions._
-import net.lag.logging.Logger
-import com.twitter.ostrich.Stats
-
-import scheduler.JobScheduler
-import nameserver._
-import shards._
-
-
-object Copy {
- val MIN_COPY = 500
-}
-
-trait CopyFactory[S <: shards.Shard] extends ((ShardId, ShardId) => Copy[S])
-
-trait CopyParser[S <: shards.Shard] extends jobs.UnboundJobParser[(NameServer[S], JobScheduler)] {
- def apply(attributes: Map[String, Any]): Copy[S]
-}
-
-abstract case class Copy[S <: shards.Shard](sourceId: ShardId, destinationId: ShardId, var count: Int) extends UnboundJob[(NameServer[S], JobScheduler)] {
- private val log = Logger.get(getClass.getName)
-
- def toMap = {
- Map("source_shard_hostname" -> sourceId.hostname,
- "source_shard_table_prefix" -> sourceId.tablePrefix,
- "destination_shard_hostname" -> destinationId.hostname,
- "destination_shard_table_prefix" -> destinationId.tablePrefix,
- "count" -> count
- ) ++ serialize
- }
-
- def finish(nameServer: NameServer[S], scheduler: JobScheduler) {
- nameServer.markShardBusy(destinationId, Busy.Normal)
- log.info("Copying finished for (type %s) from %s to %s",
- getClass.getName.split("\\.").last, sourceId, destinationId)
- Stats.clearGauge(gaugeName)
- }
-
- def apply(environment: (NameServer[S], JobScheduler)) {
- val (nameServer, scheduler) = environment
- try {
- log.info("Copying shard block (type %s) from %s to %s: state=%s",
- getClass.getName.split("\\.").last, sourceId, destinationId, toMap)
- val sourceShard = nameServer.findShardById(sourceId)
- val destinationShard = nameServer.findShardById(destinationId)
- // do this on each iteration, so it happens in the queue and can be retried if the db is busy:
- nameServer.markShardBusy(destinationId, Busy.Busy)
-
- val nextJob = copyPage(sourceShard, destinationShard, count)
- nextJob match {
- case Some(job) => {
- incrGauge
- scheduler(job)
- }
- case None => finish(nameServer, scheduler)
- }
- } catch {
- case e: NonExistentShard =>
- log.error("Shard block copy failed because one of the shards doesn't exist. Terminating the copy.")
- case e: ShardTimeoutException if (count > Copy.MIN_COPY) =>
- log.warning("Shard block copy timed out; trying a smaller block size.")
- count = (count * 0.9).toInt
- scheduler(this)
- case e: ShardDatabaseTimeoutException =>
- log.warning("Shard block copy failed to get a database connection; retrying.")
- scheduler(this)
- case e: Throwable =>
- log.warning("Shard block copy stopped due to exception: %s", e)
- throw e
- }
- }
-
- def copyPage(sourceShard: S, destinationShard: S, count: Int): Option[Copy[S]]
- def serialize: Map[String, Any]
-
- private def incrGauge = {
- Stats.setGauge(gaugeName, Stats.getGauge(gaugeName).getOrElse(0.0) + count)
- }
-
- private def gaugeName = {
- "x-copying-" + sourceId + "-" + destinationId
- }
-}
@@ -1,51 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import com.twitter.ostrich.Stats
-import net.lag.logging.Logger
-import shards.ShardRejectedOperationException
-import scheduler.{ErrorHandlingConfig, MessageQueue, Scheduler}
-
-
-class ErrorHandlingJobParser(config: ErrorHandlingConfig, errorJobQueue: Scheduler[Schedulable])
- extends JobParser {
-
- def apply(json: Map[String, Map[String, Any]]) = {
- val (_, attributes) = json.toList.first
- val job = config.jobParser(json)
- val errorCount = attributes.getOrElse("error_count", 0).asInstanceOf[Int]
- val errorMessage = attributes.getOrElse("error_message", "(none)").asInstanceOf[String]
- new ErrorHandlingJob(job, errorCount, errorMessage, errorJobQueue, config)
- }
-}
-
-class ErrorHandlingJob(job: Job, var errorCount: Int, var errorMessage: String,
- errorJobQueue: Scheduler[Schedulable], config: ErrorHandlingConfig)
- extends JobProxy(job) {
-
- private val log = Logger.get(getClass.getName)
- val (errorLimit, badJobQueue) = (config.errorLimit, config.badJobQueue)
-
- def apply() {
- try {
- job()
- Stats.incr("job-success-count")
- } catch {
- case e: ShardRejectedOperationException =>
- Stats.incr("job-darkmoded-count")
- errorJobQueue.put(this)
- case e =>
- Stats.incr("job-error-count")
- log.error(e, "Error in Job: " + e)
- errorCount += 1
- errorMessage = e.toString
- if (errorCount > errorLimit) {
- badJobQueue.put(this)
- } else {
- errorJobQueue.put(this)
- }
- }
- }
-
- override def toMap = job.toMap ++ Map("error_count" -> errorCount, "error_message" -> errorMessage)
- override def toString = "ErrorHandlingJob(%s, %s, %d)".format(job, errorJobQueue, errorCount)
-}
@@ -1,11 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import com.twitter.json.Json
-import net.lag.configgy.Configgy
-
-
-trait Job extends Schedulable {
- @throws(classOf[Exception]) def apply()
-}
-
-abstract class JobProxy(job: Job) extends SchedulableProxy(job) with Job
@@ -1,24 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import com.twitter.json.{Json, JsonException}
-
-
-class UnparsableJobException(s: String, cause: Throwable) extends Exception(s, cause)
-class BadJsonException(e: JsonException) extends UnparsableJobException(e.toString, e)
-
-trait JobParser extends (String => Job) {
- @throws(classOf[UnparsableJobException])
- def apply(data: String) = {
- try {
- Json.parse(data) match {
- case job: Map[_, _] =>
- assert(job.size == 1)
- apply(job.asInstanceOf[Map[String, Map[String, Any]]])
- }
- } catch {
- case e: JsonException => throw new BadJsonException(e)
- }
- }
-
- def apply(json: Map[String, Map[String, Any]]): Job
-}
@@ -1,31 +0,0 @@
-package com.twitter.gizzard.jobs
-import scala.collection.mutable.Queue
-
-class JobWithTasksParser(jobParser: JobParser) extends JobParser {
- def apply(json: Map[String, Map[String, Any]]) = {
- val (_, attributes) = json.toList.first
- attributes.get("tasks").map { taskJsons =>
- val tasks = taskJsons.asInstanceOf[Iterable[Map[String, Map[String, Any]]]].map(this(_))
- new JobWithTasks(tasks)
- } getOrElse {
- jobParser(json)
- }
- }
-}
-
-case class JobWithTasks(override val tasks: Iterable[Job]) extends SchedulableWithTasks(tasks) with Job {
- private val taskQueue = {
- val q = new Queue[Job]()
- q ++= tasks
- q
- }
-
- override val remainingTasks = taskQueue
-
- def apply() = {
- while (!taskQueue.isEmpty) {
- taskQueue.first.apply()
- taskQueue.dequeue()
- }
- }
-}
@@ -1,21 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import net.lag.logging.Logger
-
-
-class JournaledJobParser(jobParser: JobParser, journaller: String => Unit) extends JobParser {
- def apply(json: Map[String, Map[String, Any]]) = new JournaledJob(jobParser(json), journaller)
-}
-
-class JournaledJob(job: Job, journaller: String => Unit) extends JobProxy(job) {
- def apply() {
- job()
- try {
- journaller(job.toJson)
- } catch {
- case e: Exception =>
- val log = Logger.get(getClass.getName)
- log.warning(e, "Failed to journal job: %s", job.toJson)
- }
- }
-}
@@ -1,14 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import com.twitter.ostrich.W3CStats
-import com.twitter.gizzard.proxy.LoggingProxy
-import com.twitter.ostrich.StatsProvider
-
-
-class LoggingJobParser(stats: StatsProvider, w3cStats: W3CStats, jobParser: JobParser) extends JobParser {
- def apply(json: Map[String, Map[String, Any]]) = new LoggingJob(stats, w3cStats, jobParser(json))
-}
-
-class LoggingJob(stats: StatsProvider, w3cStats: W3CStats, job: Job) extends JobProxy(job) {
- def apply() { LoggingProxy(stats, w3cStats, job.loggingName, job).apply() }
-}
@@ -1,28 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import scala.util.matching.Regex
-import scala.collection.mutable
-
-
-class PolymorphicJobParser extends JobParser {
- private val processors = mutable.Map.empty[Regex, JobParser]
-
- def +=(item: (Regex, JobParser)) = processors += item
- def +=(r: Regex, p: JobParser) = processors += ((r, p))
-
- override def apply(json: Map[String, Map[String, Any]]) = {
- val (jobType, attributes) = json.toList.first
- val regexpAndProcessor = processors find { p =>
- val (processorRegex, _) = p
- processorRegex.findFirstIn(jobType).isDefined
- } getOrElse {
- throw new UnparsableJobException("Can't find matching processor for '%s' in %s".format(jobType, processors), null)
- }
- try {
- val (_, processor) = regexpAndProcessor
- processor(json)
- } catch {
- case e => throw new UnparsableJobException("Processor blew up: " + e.toString, e)
- }
- }
-}
@@ -1,20 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import com.twitter.json.Json
-
-
-trait Schedulable {
- def toMap: Map[String, Any]
- def className = getClass.getName
- def toJson = Json.build(Map(className -> toMap)).toString
- def loggingName = className.lastIndexOf('.') match {
- case -1 => className
- case n => className.substring(n + 1)
- }
-}
-
-abstract class SchedulableProxy(schedulable: Schedulable) extends Schedulable {
- def toMap = schedulable.toMap
- override def className = schedulable.className
- override def loggingName = schedulable.loggingName
-}
@@ -1,20 +0,0 @@
-package com.twitter.gizzard.jobs
-
-import com.twitter.json.Json
-
-
-class SchedulableWithTasks(protected val tasks: Iterable[Schedulable]) extends Schedulable {
- override def loggingName = tasks.map(_.loggingName).mkString(",")
-
- def remainingTasks = tasks
-
- def toMap = Map("tasks" -> remainingTasks.map { task => (Map(task.className -> task.toMap)) })
- override def equals(other: Any) = {
- other match {
- case other: SchedulableWithTasks =>
- remainingTasks.toList == other.remainingTasks.toList
- case _ =>
- false
- }
- }
-}
@@ -2,10 +2,8 @@ package com.twitter.gizzard.nameserver
import scala.collection.Map
import scala.collection.mutable
-import scheduler.JobScheduler
import shards._
-
/**
* NameServer implementation that doesn't actually store anything anywhere.
* Useful for tests or stubbing out the partitioning scheme.
@@ -51,14 +51,12 @@ object NameServer {
val shard = new ReadWriteShardAdapter(
new ReplicatingShard(shardInfo, 0, replicas, loadBalancer, replicationFuture, writeTimeout))
- val mappingFunction: (Long => Long) = config.getString("mapping") match {
- case None =>
+ val mappingFunction: (Long => Long) = config.getString("mapping", "identity") match {
+ case "identity" =>
{ n => n }
- case Some("byte_swapper") =>
+ case "byte_swapper" =>
ByteSwapper
- case Some("identity") =>
- { n => n }
- case Some("fnv1a-64") =>
+ case "fnv1a-64" =>
FnvHasher
}
new NameServer(shard, shardRepository, mappingFunction)
Oops, something went wrong. Retry.

0 comments on commit ae5a919

Please sign in to comment.