Skip to content
Browse files

integration tests and merged master

  • Loading branch information...
2 parents bd1ea66 + 89c8da7 commit a92bdbd4d3d37a95e235fb14d36516dd939fc6a9 Kyle Maxwell committed Jan 3, 2011
Showing with 371 additions and 119 deletions.
  1. +2 −2 config/test.scala
  2. +7 −6 project/build/GizzardProject.scala
  3. +12 −5 src/main/scala/com/twitter/gizzard/config/JobScheduler.scala
  4. +2 −1 src/main/scala/com/twitter/gizzard/config/NameServer.scala
  5. +1 −1 src/main/scala/com/twitter/gizzard/config/TServer.scala
  6. +20 −14 src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
  7. +4 −0 src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala
  8. +14 −14 src/main/scala/com/twitter/gizzard/nameserver/NameserverState.scala
  9. +2 −2 src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
  10. +30 −6 src/main/scala/com/twitter/gizzard/scheduler/JsonCodec.scala
  11. +37 −5 src/main/scala/com/twitter/gizzard/scheduler/JsonJob.scala
  12. +3 −3 src/main/scala/com/twitter/gizzard/scheduler/KestrelJobQueue.scala
  13. +4 −4 src/main/scala/com/twitter/gizzard/scheduler/ReplicatingJob.scala
  14. +13 −6 src/main/scala/com/twitter/gizzard/shards/ReplicatingShard.scala
  15. +5 −1 src/main/scala/com/twitter/gizzard/thrift/JobInjectorClient.scala
  16. +3 −3 src/main/scala/com/twitter/gizzard/thrift/JobInjectorService.scala
  17. +9 −6 src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala
  18. +13 −12 src/test/scala/com/twitter/gizzard/FutureSpec.scala
  19. +95 −0 src/test/scala/com/twitter/gizzard/integration/MulticopyIntegrationSpec.scala
  20. +36 −4 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
  21. +15 −0 src/test/scala/com/twitter/gizzard/scheduler_new/JsonCodecSpec.scala
  22. +15 −3 src/test/scala/com/twitter/gizzard/scheduler_new/JsonJobParserSpec.scala
  23. +10 −3 src/test/scala/com/twitter/gizzard/scheduler_new/KestrelJobQueueSpec.scala
  24. +7 −5 src/test/scala/com/twitter/gizzard/scheduler_new/MemoryJobQueueSpec.scala
  25. +0 −1 src/test/scala/com/twitter/gizzard/scheduler_new/NestedJobSpec.scala
  26. +3 −3 src/test/scala/com/twitter/gizzard/scheduler_new/ReplicatingJobIntegrationSpec.scala
  27. +6 −6 src/test/scala/com/twitter/gizzard/scheduler_new/ReplicatingJobSpec.scala
  28. +3 −3 src/test/scala/com/twitter/gizzard/shards/ReplicatingShardSpec.scala
View
4 config/test.scala
@@ -28,8 +28,8 @@ object TestQueryEvaluator extends QueryEvaluator {
class TestScheduler(val name: String) extends Scheduler {
val schedulerType = new KestrelScheduler {
- val queuePath = "/tmp"
- override val keepJournal = false
+ path = "/tmp"
+ keepJournal = false
}
errorLimit = 25
badJobQueue = new JsonJobLogger { name = "bad_jobs" }
View
13 project/build/GizzardProject.scala
@@ -5,17 +5,18 @@ class GizzardProject(info: ProjectInfo) extends StandardProject(info) with Subve
override def filterScalaJars = false
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.7.7"
- inline("com.twitter" %% "querulous" % "1.4.3-config-SNAPSHOT")
- inline("net.lag" % "configgy" % "1.6.8")
- inline("net.lag" % "kestrel" % "1.3-config-SNAPSHOT")
+ inline("com.twitter" %% "querulous" % "1.5.1")
+ inline("net.lag" % "configgy" % "1.6.10-SNAPSHOT")
+ inline("net.lag" % "kestrel" % "1.2.7")
inline("com.twitter" % "ostrich" % "1.2.10")
- inline("com.twitter" % "util" % "1.1.2-SNAPSHOT")
+ inline("com.twitter" % "util" % "1.1.2")
- val rpcclient = "com.twitter" %% "rpcclient" % "1.1.2-SNAPSHOT"
+ val rpcclient = "com.twitter" %% "rpcclient" % "1.2.0-SNAPSHOT"
val slf4j = "org.slf4j" % "slf4j-jdk14" % "1.5.2"
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.5.2"
val thrift = "thrift" % "libthrift" % "0.5.0"
- val json = "com.twitter" %% "json" % "2.1.5"
+ val jackson = "org.codehaus.jackson" % "jackson-core-asl" % "1.6.1"
+ val jacksonMap = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.6.1"
// test jars
val specs = "org.scala-tools.testing" % "specs" % "1.6.2.1" % "test"
View
17 src/main/scala/com/twitter/gizzard/config/JobScheduler.scala
@@ -3,12 +3,19 @@ package com.twitter.gizzard.config
import com.twitter.util.Duration
import com.twitter.util.TimeConversions._
import net.lag.logging.Logger
-import net.lag.kestrel.config.PersistentQueue
+import net.lag.kestrel.{PersistentQueue, PersistentQueueConfig}
import gizzard.scheduler.{JsonJob, Codec, MemoryJobQueue, KestrelJobQueue, JobConsumer}
trait SchedulerType
-trait KestrelScheduler extends SchedulerType with PersistentQueue {
- def queuePath: String
+trait KestrelScheduler extends PersistentQueueConfig with SchedulerType with Cloneable {
+ def apply(newName: String): PersistentQueue = {
+ // ugh
+ val oldName = name
+ name = newName
+ val q = apply()
+ name = oldName
+ q
+ }
}
class MemoryScheduler extends SchedulerType {
var sizeLimit = 0
@@ -46,9 +53,9 @@ trait Scheduler {
def apply(codec: Codec[JsonJob]): gizzard.scheduler.JobScheduler[JsonJob] = {
val (jobQueue, errorQueue) = schedulerType match {
case kestrel: KestrelScheduler => {
- val persistentJobQueue = kestrel(kestrel.queuePath, jobQueueName)
+ val persistentJobQueue = kestrel(jobQueueName)
val jobQueue = new KestrelJobQueue[JsonJob](jobQueueName, persistentJobQueue, codec)
- val persistentErrorQueue = kestrel(kestrel.queuePath, errorQueueName)
+ val persistentErrorQueue = kestrel(errorQueueName)
val errorQueue = new KestrelJobQueue[JsonJob](errorQueueName, persistentErrorQueue, codec)
(jobQueue, errorQueue)
View
3 src/main/scala/com/twitter/gizzard/config/NameServer.scala
@@ -34,8 +34,9 @@ class JobRelay {
var priority: Int = 0
var framed: Boolean = true
var timeout: Duration = 1.seconds
+ var retries: Int = 3
- def apply() = new JobRelayFactory(priority, framed, timeout)
+ def apply() = new JobRelayFactory(priority, framed, timeout, retries)
}
object NoJobRelay extends JobRelay {
View
2 src/main/scala/com/twitter/gizzard/config/TServer.scala
@@ -28,7 +28,7 @@ trait TServer extends (thrift.TProcessor => thrift.server.TServer) {
var idleTimeout = 60.seconds
var threadPool = new ThreadPool
- def getPool = threadPool(name + " ThreadPool")
+ def getPool = threadPool(name + "_thread_pool")
def apply(processor: thrift.TProcessor): thrift.server.TServer
}
View
34 src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
@@ -15,17 +15,22 @@ extends Exception("Job replication to cluster '" + cluster + "' is blocked.", ca
class JobRelayFactory(
priority: Int,
framed: Boolean,
- timeout: Duration)
+ timeout: Duration,
+ retries: Int)
extends (Map[String, Seq[Host]] => JobRelay) {
+
+ def this(priority: Int, framed: Boolean, timeout: Duration) = this(priority, framed, timeout, 0)
+
def apply(hostMap: Map[String, Seq[Host]]) =
- new JobRelay(hostMap, priority, framed, timeout)
+ new JobRelay(hostMap, priority, framed, timeout, retries)
}
class JobRelay(
hostMap: Map[String, Seq[Host]],
priority: Int,
framed: Boolean,
- timeout: Duration)
+ timeout: Duration,
+ retries: Int)
extends (String => JobRelayCluster) {
private val clients = Map(hostMap.flatMap { case (c, hs) =>
@@ -39,7 +44,7 @@ extends (String => JobRelayCluster) {
if (onlineHosts.isEmpty) {
if (blocked) Seq(c -> new BlockedJobRelayCluster(c)) else Seq()
} else {
- Seq(c -> new JobRelayCluster(onlineHosts, priority, framed, timeout))
+ Seq(c -> new JobRelayCluster(onlineHosts, priority, framed, timeout, retries))
}
}.toSeq: _*)
@@ -52,15 +57,16 @@ class JobRelayCluster(
hosts: Seq[Host],
priority: Int,
framed: Boolean,
- timeout: Duration)
-extends (Iterable[String] => Unit) {
- val client = new LoadBalancingChannel(hosts.map(h => new JobInjectorClient(h.hostname, h.port, framed, timeout)))
+ timeout: Duration,
+ retries: Int)
+extends (Iterable[Array[Byte]] => Unit) {
+ val client = new LoadBalancingChannel(hosts.map(h => new JobInjectorClient(h.hostname, h.port, framed, timeout, retries)))
- def apply(jobs: Iterable[String]) {
+ def apply(jobs: Iterable[Array[Byte]]) {
val jobList = new JLinkedList[thrift.Job]()
jobs.foreach { j =>
- val tj = new thrift.Job(priority, ByteBuffer.wrap(j.getBytes("UTF-8")))
+ val tj = new thrift.Job(priority, ByteBuffer.wrap(j))
tj.setIs_replicated(true)
jobList.add(tj)
}
@@ -73,14 +79,14 @@ object NullJobRelayFactory extends JobRelayFactory(0, false, new Duration(0)) {
override def apply(h: Map[String, Seq[Host]]) = NullJobRelay
}
-object NullJobRelay extends JobRelay(Map(), 0, false, new Duration(0))
+object NullJobRelay extends JobRelay(Map(), 0, false, new Duration(0), 0)
-object NullJobRelayCluster extends JobRelayCluster(Seq(), 0, false, new Duration(0)) {
+object NullJobRelayCluster extends JobRelayCluster(Seq(), 0, false, new Duration(0), 0) {
override val client = null
- override def apply(jobs: Iterable[String]) = ()
+ override def apply(jobs: Iterable[Array[Byte]]) = ()
}
-class BlockedJobRelayCluster(cluster: String) extends JobRelayCluster(Seq(), 0, false, new Duration(0)) {
+class BlockedJobRelayCluster(cluster: String) extends JobRelayCluster(Seq(), 0, false, new Duration(0), 0) {
override val client = null
- override def apply(jobs: Iterable[String]) { throw new ClusterBlockedException(cluster) }
+ override def apply(jobs: Iterable[Array[Byte]]) { throw new ClusterBlockedException(cluster) }
}
View
4 src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala
@@ -13,6 +13,8 @@ import shards._
class NonExistentShard(message: String) extends ShardException(message: String)
class InvalidShard(message: String) extends ShardException(message: String)
+class NameserverUninitialized extends ShardException("Please call reload() before operating on the NameServer")
+
class NameServer[S <: shards.Shard](
nameServerShard: Shard,
shardRepository: ShardRepository[S],
@@ -40,6 +42,7 @@ extends Shard {
def getShardInfo(id: ShardId) = shardInfos(id)
def getChildren(id: ShardId) = {
+ if(familyTree == null) throw new NameserverUninitialized
familyTree.getOrElse(id, new mutable.ArrayBuffer[LinkInfo])
}
@@ -90,6 +93,7 @@ extends Shard {
def findShardById(id: ShardId): S = findShardById(id, 1)
def findCurrentForwarding(tableId: Int, id: Long) = {
+ if(forwardings == null) throw new NameserverUninitialized
val shardInfo = forwardings.get(tableId).flatMap { bySourceIds =>
val item = bySourceIds.floorEntry(mappingFunction(id))
if (item != null) {
View
28 src/main/scala/com/twitter/gizzard/nameserver/NameserverState.scala
@@ -1,6 +1,6 @@
package com.twitter.gizzard.nameserver
-import gizzard.shards.ShardId
+import gizzard.shards.{ShardId, ShardInfo, LinkInfo}
import thrift.conversions.ShardInfo._
import thrift.conversions.LinkInfo._
import thrift.conversions.Forwarding._
@@ -10,36 +10,36 @@ import scala.collection.mutable.ListBuffer
class NameserverState(initialShards: List[gizzard.shards.ShardInfo],
initialLinks: List[gizzard.shards.LinkInfo],
initialForwardings: List[Forwarding], tableId: Int) {
-
- private val shardsById = initialShards.foldLeft(Map.empty[gizzard.shards.ShardId, gizzard.shards.ShardInfo]) { (map, shard) => map + ((shard.id, shard)) }
-
- private val linksByParent = initialLinks.foldLeft(Map[gizzard.shards.ShardId, List[gizzard.shards.LinkInfo]]()) { (map, link) =>
- val key = link.upId
- val entry = map.getOrElse(key, List[gizzard.shards.LinkInfo]())
-
- map + ((key, entry + link))
+
+
+ private val shardsById = Map(initialShards.map(s => s.id -> s): _*)
+
+ private val linksByParent = initialLinks.foldLeft(Map[ShardId, List[LinkInfo]]()) { (map, link) =>
+ val key = link.upId
+ val entry = map.get(key).map(link :: _).getOrElse(List(link))
+
+ map + (key -> entry)
}
-
+
val forwardings = initialForwardings.filter(_.tableId == tableId)
var links = new ListBuffer[gizzard.shards.LinkInfo]
var shards = new ListBuffer[gizzard.shards.ShardInfo]
-
+
private def computeSubtree(id: ShardId): Unit = {
shardsById.get(id).foreach { shardInfo =>
shards += shardInfo
}
linksByParent.get(id).foreach { linksOption =>
- linksOption.foreach { link =>
+ linksOption.foreach { link =>
links += link
computeSubtree(link.downId)
}
}
-
}
forwardings.foreach { forwarding => computeSubtree(forwarding.shardId) }
-
+
def toThrift = {
val thriftForwardings = forwardings.map(_.toThrift).toJavaList
val thriftLinks = links.map(_.toThrift).toJavaList
View
4 src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
@@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS shards (
destination_type VARCHAR(125),
busy TINYINT NOT NULL DEFAULT 0,
- PRIMARY KEY primary_key_table_prefix_hostname (hostname, table_prefix)
+ PRIMARY KEY (hostname, table_prefix)
) ENGINE=INNODB
"""
@@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS shard_children (
child_table_prefix VARCHAR(125) NOT NULL,
weight INT NOT NULL DEFAULT 1,
- PRIMARY KEY primary_key_family (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
+ PRIMARY KEY (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
INDEX child (child_hostname, child_table_prefix)
) ENGINE=INNODB
"""
View
36 src/main/scala/com/twitter/gizzard/scheduler/JsonCodec.scala
@@ -4,6 +4,9 @@ import scala.collection.mutable
import scala.util.matching.Regex
import com.twitter.json.Json
import net.lag.logging.Logger
+import org.codehaus.jackson.map.ObjectMapper
+import java.util.{Map => JMap, List => JList}
+import scala.collection.jcl
/**
* Codec for json-encoded jobs.
@@ -18,27 +21,28 @@ import net.lag.logging.Logger
* Jobs that can't be parsed by the json library are handed to 'unparsableJobHandler'.
*/
class JsonCodec(unparsableJobHandler: Array[Byte] => Unit) extends Codec[JsonJob] {
+ private val mapper = new ObjectMapper
+
protected val log = Logger.get(getClass.getName)
protected val processors = {
val p = mutable.Map.empty[Regex, JsonJobParser]
p += (("JsonNestedJob".r, new JsonNestedJobParser(this)))
// for backward compat:
p += (("JobWithTasks".r, new JsonNestedJobParser(this)))
+ p += (("SchedulableWithTasks".r, new JsonNestedJobParser(this)))
p
}
def +=(item: (Regex, JsonJobParser)) = processors += item
def +=(r: Regex, p: JsonJobParser) = processors += ((r, p))
- def flatten(job: JsonJob): Array[Byte] = job.toJson.getBytes
+ def flatten(job: JsonJob): Array[Byte] = job.toJsonBytes
def inflate(data: Array[Byte]): JsonJob = {
try {
- Json.parse(new String(data)) match {
- case json: Map[_, _] =>
- assert(json.size == 1)
- inflate(json.asInstanceOf[Map[String, Any]])
- }
+ val javaMap: JMap[String, Any] = mapper.readValue(new String(data, "UTF-8"), classOf[JMap[String, Any]])
+ val scalaMap = deepConvert(javaMap)
+ inflate(scalaMap)
} catch {
case e =>
log.error(e, "Unparsable JsonJob; dropping: " + e.toString)
@@ -61,4 +65,24 @@ class JsonCodec(unparsableJobHandler: Array[Byte] => Unit) extends Codec[JsonJob
throw new UnparsableJsonException("Processor '%s' blew up: %s".format(jobType, e.toString), e)
}
}
+
+ private def deepConvert(javaList: JList[Any]): List[Any] = {
+ jcl.Buffer(javaList).map { v =>
+ v match {
+ case jm: JMap[String, Any] => deepConvert(jm)
+ case jl: JList[Any] => deepConvert(jl)
+ case _ => v
+ }
+ }.toList
+ }
+
+ private def deepConvert(javaMap: JMap[String, Any]): Map[String, Any] = {
+ Map(jcl.Map(javaMap).toSeq.map { case (k,v) =>
+ v match {
+ case jm: JMap[String, Any] => (k, deepConvert(jm))
+ case jl: JList[Any] => (k, deepConvert(jl))
+ case _ => (k, v)
+ }
+ }: _*)
+ }
}
View
42 src/main/scala/com/twitter/gizzard/scheduler/JsonJob.scala
@@ -1,9 +1,10 @@
package com.twitter.gizzard.scheduler
-import com.twitter.json.{Json, JsonException}
import com.twitter.ostrich.{StatsProvider, W3CStats}
+import org.codehaus.jackson.map.ObjectMapper
import net.lag.logging.Logger
import gizzard.proxy.LoggingProxy
+import java.util.{Map => JMap, List => JList}
class UnparsableJsonException(s: String, cause: Throwable) extends Exception(s, cause)
@@ -12,15 +13,46 @@ class UnparsableJsonException(s: String, cause: Throwable) extends Exception(s,
* map containing 'className' => 'toMap', where 'toMap' should return a map of key/values from the
* job. The default 'className' is the job's java/scala class name.
*/
+object JsonJob {
+ val mapper = new ObjectMapper
+}
+
trait JsonJob extends Job {
def toMap: Map[String, Any]
def shouldReplicate = true
def className = getClass.getName
- def toJson = {
+ def toJsonBytes = {
def json = toMap ++ Map("error_count" -> errorCount, "error_message" -> errorMessage)
- Json.build(Map(className -> json)).toString
+ val javaMap = deepConvert(Map(className -> json))
+ JsonJob.mapper.writeValueAsBytes(javaMap)
+ }
+
+ def toJson = new String(toJsonBytes, "UTF-8")
+
+ private def deepConvert(scalaMap: Map[String, Any]): JMap[String, Any] = {
+ val map = new java.util.LinkedHashMap[String, Any]()
+ scalaMap.map { case (k, v) =>
+ v match {
+ case m: Map[String, Any] => map.put(k, deepConvert(m))
+ case a: Iterable[Any] => map.put(k, deepConvert(a))
+ case v => map.put(k, v)
+ }
+ }
+ map
+ }
+
+ private def deepConvert(scalaIterable: Iterable[Any]): JList[Any] = {
+ val list = new java.util.LinkedList[Any]()
+ scalaIterable.map { v =>
+ v match {
+ case m: Map[String, Any] => list.add(deepConvert(m))
+ case a: Iterable[Any] => list.add(deepConvert(a))
+ case v => list.add(v)
+ }
+ }
+ list
}
override def toString = toJson
@@ -31,14 +63,14 @@ trait JsonJob extends Job {
*/
class JsonNestedJob(jobs: Iterable[JsonJob]) extends NestedJob[JsonJob](jobs) with JsonJob {
def toMap: Map[String, Any] = Map("tasks" -> taskQueue.map { task => Map(task.className -> task.toMap) })
- override def toString = toJson
+ //override def toString = toJson
}
/**
* A JobConsumer that encodes JsonJobs into a string and logs them at error level.
*/
class JsonJobLogger(logger: Logger) extends JobConsumer[JsonJob] {
- def put(job: JsonJob) = logger.error(job.toJson)
+ def put(job: JsonJob) = logger.error(job.toString)
}
class LoggingJsonJobParser(
View
6 src/main/scala/com/twitter/gizzard/scheduler/KestrelJobQueue.scala
@@ -70,14 +70,14 @@ class KestrelJobQueue[J <: Job](queueName: String, val queue: PersistentQueue, c
}
def drainTo(otherQueue: JobQueue[J], delay: Duration) {
- queue.expiredQueue = Some(otherQueue.asInstanceOf[KestrelJobQueue[J]].queue)
- queue.maxAge = delay.inMilliseconds.toInt
+ queue.expiredQueue set Some(Some(otherQueue.asInstanceOf[KestrelJobQueue[J]].queue))
+ queue.maxAge set Some(delay.inMilliseconds.toInt)
}
def checkExpiration(flushLimit: Int) {
val count = queue.discardExpired(flushLimit)
if (count > 0) {
- log.info("Replaying %d error jobs.", count)
+ log.info("Replaying %d error jobs from %s.", count, queueName)
}
}
View
8 src/main/scala/com/twitter/gizzard/scheduler/ReplicatingJob.scala
@@ -55,11 +55,11 @@ class ReplicatingJob(
relay: JobRelay,
jobs: Iterable[JsonJob],
clusters: Iterable[String],
- serialized: Iterable[String])
+ serialized: Iterable[Array[Byte]])
extends JsonNestedJob(jobs) {
def this(relay: JobRelay, jobs: Iterable[JsonJob], clusters: Iterable[String]) =
- this(relay, jobs, clusters, jobs.map(_.toJson))
+ this(relay, jobs, clusters, jobs.map(_.toJsonBytes))
def this(relay: JobRelay, jobs: Iterable[JsonJob]) = this(relay, jobs, relay.clusters)
@@ -69,7 +69,7 @@ extends JsonNestedJob(jobs) {
override def toMap: Map[String, Any] = {
var attrs = super.toMap.toList
if (!clustersQueue.isEmpty) attrs = "dest_clusters" -> clustersQueue.toList :: attrs
- if (!serialized.isEmpty) attrs = "serialized" -> serialized :: attrs
+ if (!serialized.isEmpty) attrs = "serialized" -> serialized.map(new String(_, "UTF-8")) :: attrs
Map(attrs: _*)
}
@@ -108,7 +108,7 @@ extends JsonJobParser {
override def apply(json: Map[String, Any]): JsonJob = {
val clusters = json.get("dest_clusters").map(_.asInstanceOf[Iterable[String]]) getOrElse Nil
- val serialized = json.get("serialized").map(_.asInstanceOf[Iterable[String]]) getOrElse Nil
+ val serialized = json.get("serialized").map(_.asInstanceOf[Iterable[String]].map(_.getBytes("UTF-8"))) getOrElse Nil
val tasks = json("tasks").asInstanceOf[Tasks].map(codec.inflate)
new ReplicatingJob(relay, tasks, clusters, serialized)
View
19 src/main/scala/com/twitter/gizzard/shards/ReplicatingShard.scala
@@ -83,14 +83,21 @@ class ReplicatingShard[S <: Shard](
}
protected def fanoutWriteSerial[A](method: (S => A), replicas: Seq[S]): A = {
+ val exceptions = new mutable.ListBuffer[Throwable]
+
val results = replicas.flatMap { shard =>
try {
Some(method(shard))
} catch {
case e: ShardBlackHoleException =>
None
+ case e =>
+ exceptions += e
+ None
}
}
+
+ exceptions.map { throw _ }
if (results.size == 0) {
throw new ShardBlackHoleException(shardInfo.id)
}
@@ -112,10 +119,10 @@ class ReplicatingShard[S <: Shard](
try {
f(shard)
} catch {
+ case e: ShardRejectedOperationException =>
+ failover(f, remainder)
case e: ShardException =>
- if (!e.isInstanceOf[ShardRejectedOperationException]) {
- log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
- }
+ log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
failover(f, remainder)
}
}
@@ -141,10 +148,10 @@ class ReplicatingShard[S <: Shard](
Some(answer)
}
} catch {
+ case e: ShardRejectedOperationException =>
+ rebuildableFailover(f, rebuild, remainder, toRebuild, everSuccessful)
case e: ShardException =>
- if (!e.isInstanceOf[ShardRejectedOperationException]) {
- log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
- }
+ log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
rebuildableFailover(f, rebuild, remainder, toRebuild, everSuccessful)
}
}
View
6 src/main/scala/com/twitter/gizzard/thrift/JobInjectorClient.scala
@@ -3,8 +3,12 @@ package com.twitter.gizzard.thrift
import com.twitter.rpcclient.{PooledClient, ThriftConnection}
import com.twitter.util.Duration
-class JobInjectorClient(host: String, port: Int, framed: Boolean, soTimeout: Duration)
+class JobInjectorClient(host: String, port: Int, framed: Boolean, soTimeout: Duration, retryCount: Int)
extends PooledClient[JobInjector.Iface] {
+
+ def this(host: String, port: Int, framed: Boolean, soTimeout: Duration) =
+ this(host, port, framed, soTimeout, 0)
+
val name = "JobManagerClient"
def createConnection =
View
6 src/main/scala/com/twitter/gizzard/thrift/JobInjectorService.scala
@@ -27,11 +27,11 @@ extends JobInjector.Iface {
def apply = deserialized.apply()
def toMap = deserialized.toMap
- override def toJson = {
+ override def toJsonBytes = {
if (isDeserialized) {
- deserialized.toJson
+ deserialized.toJsonBytes
} else {
- new String(serialized, "UTF-8")
+ serialized
}
}
}
View
15 src/test/scala/com/twitter/gizzard/ConfiguredSpecification.scala
@@ -11,6 +11,7 @@ import com.twitter.util.Eval
trait ConfiguredSpecification extends Specification {
+ noDetailedDiffs()
val config = Eval[gizzard.config.GizzardServer](new File("config/test.scala"))
config.logging()
}
@@ -21,7 +22,7 @@ trait IntegrationSpecification extends Specification {
trait TestServerFacts {
def enum: Int; def nsDatabaseName: String; def databaseName: String
def basePort: Int; def injectorPort: Int; def managerPort: Int
- def sqlShardInfo: shards.ShardInfo; def forwarding: nameserver.Forwarding
+ def sqlShardInfos: List[shards.ShardInfo]; def forwardings: List[nameserver.Forwarding]
def kestrelQueues: Seq[String]
}
@@ -35,9 +36,11 @@ trait IntegrationSpecification extends Specification {
val basePort = port
val injectorPort = port + 1
val managerPort = port + 2
- val sqlShardInfo = shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
- "TestShard", "int", "int", shards.Busy.Normal)
- val forwarding = nameserver.Forwarding(0, 0, sqlShardInfo.id)
+ val sqlShardInfos = List(shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
+ "TestShard", "int", "int", shards.Busy.Normal))
+
+ val forwardings = List(nameserver.Forwarding(0, 0, sqlShardInfos.first.id))
+
val kestrelQueues = Seq("gizzard_test_"+name+"_high_queue",
"gizzard_test_"+name+"_high_queue_errors",
"gizzard_test_"+name+"_low_queue",
@@ -80,8 +83,8 @@ trait IntegrationSpecification extends Specification {
servers.foreach { s =>
createTestServerDBs(s)
s.nameServer.rebuildSchema()
- s.nameServer.setForwarding(s.forwarding)
- s.nameServer.createShard(s.sqlShardInfo)
+ s.forwardings.foreach { f => s.nameServer.setForwarding(f) }
+ s.sqlShardInfos.foreach { ssi => s.nameServer.createShard(ssi) }
s.nameServer.reload()
}
}
View
25 src/test/scala/com/twitter/gizzard/FutureSpec.scala
@@ -35,18 +35,19 @@ object FutureSpec extends ConfiguredSpecification with JMocker with ClassMocker
}
"timeout a stuffed-up queue" in {
- future.executor.shutdown()
- future.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
- def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
- // do nothing.
- }
- })
- future.executor.awaitTermination(1, TimeUnit.MINUTES)
- Time.freeze
- val f = future { 3 * 4 }
- Time.advance(23.seconds)
- f.run()
- f.get(1, TimeUnit.MILLISECONDS) must throwA[Exception]
+ Time.withCurrentTimeFrozen { time =>
+ future.executor.shutdown()
+ future.executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
+ // do nothing.
+ }
+ })
+ future.executor.awaitTermination(1, TimeUnit.MINUTES)
+ val f = future { 3 * 4 }
+ time.advance(23.seconds)
+ f.run()
+ f.get(1, TimeUnit.MILLISECONDS) must throwA[Exception]
+ }
}
"run sequences in parallel" in {
View
95 src/test/scala/com/twitter/gizzard/integration/MulticopyIntegrationSpec.scala
@@ -0,0 +1,95 @@
+package com.twitter.gizzard.integration
+
+import com.twitter.gizzard.thrift.conversions.Sequences._
+import testserver._
+import testserver.config.TestServerConfig
+import testserver.thrift.TestResult
+import java.io.File
+import scheduler.CopyDestination
+import java.util.concurrent.atomic.AtomicInteger
+import org.specs.mock.{ClassMocker, JMocker}
+import net.lag.configgy.{Config => CConfig}
+import com.twitter.util.TimeConversions._
+import com.twitter.gizzard.thrift.{JobInjectorService, TThreadServer, JobInjector}
+import nameserver.{Host, HostStatus, JobRelay}
+
+class MulticopyIntegrationSpec extends IntegrationSpecification with ConfiguredSpecification {
+ override def testServer(i: Int) = {
+ val port = 8000 + (i - 1) * 3
+ val name = "testserver" + i
+ new TestServer(TestServerConfig(name, port)) with TestServerFacts {
+ val enum = i
+ val nsDatabaseName = "gizzard_test_"+name+"_ns"
+ val databaseName = "gizzard_test_"+name
+ val basePort = port
+ val injectorPort = port + 1
+ val managerPort = port + 2
+ val sqlShardInfos = List(
+ shards.ShardInfo(shards.ShardId("localhost", "t0_0"),
+ "TestShard", "int", "int", shards.Busy.Normal),
+ shards.ShardInfo(shards.ShardId("localhost", "t0_1"),
+ "TestShard", "int", "int", shards.Busy.Normal),
+ shards.ShardInfo(shards.ShardId("localhost", "t0_2"),
+ "TestShard", "int", "int", shards.Busy.Normal)
+ )
+ val forwardings = List(
+ nameserver.Forwarding(0, 0, sqlShardInfos.first.id)
+ )
+ val kestrelQueues = Seq("gizzard_test_"+name+"_high_queue",
+ "gizzard_test_"+name+"_high_queue_errors",
+ "gizzard_test_"+name+"_low_queue",
+ "gizzard_test_"+name+"_low_queue_errors")
+ }
+ }
+
+ "Multicopy" should {
+ val server = testServer(1)
+ val client = testServerClient(server)
+
+ doBefore {
+ resetTestServerDBs(server)
+ setupServers(server)
+ server.nameServer.reload()
+ }
+
+ doAfter { stopServers(server) }
+
+ "copy to multiple locations" in {
+ startServers(server)
+ val nameserver = server.nameServer
+
+ val sourceInfo :: dest0info :: dest2info :: _ = server.sqlShardInfos
+
+ client.put(0, "foo")
+ client.put(1, "bar")
+ client.put(2, "baz")
+ client.put(3, "bonk")
+ client.get(3) must eventually(be_==(List(new TestResult(3, "bonk", 1)).toJavaList))
+
+ val dest0 = CopyDestination(dest0info.id, Some(0))
+ val dest2 = CopyDestination(dest2info.id, Some(2))
+
+ val copy = new TestSplitFactory(server.nameServer, server.jobScheduler(Priority.Low.id))(sourceInfo.id, List(dest0, dest2))
+
+ copy()
+
+ val sourceShard = nameserver.findShardById(sourceInfo.id)
+ val dest0Shard = nameserver.findShardById(dest0info.id)
+ val dest2Shard = nameserver.findShardById(dest2info.id)
+
+ dest0Shard.get(0) must eventually(be_==(Some((0, "foo", 1))))
+ dest0Shard.get(1) must eventually(be_==(Some((1, "bar", 1))))
+ dest0Shard.get(2) must eventually(be_==(None))
+ dest0Shard.get(3) must eventually(be_==(None))
+
+ dest2Shard.get(0) must eventually(be_==(None))
+ dest2Shard.get(1) must eventually(be_==(None))
+ dest2Shard.get(2) must eventually(be_==(Some((2, "baz", 1))))
+ dest2Shard.get(3) must eventually(be_==(Some((3, "bonk", 1))))
+ }
+
+
+
+ }
+}
+
View
40 src/test/scala/com/twitter/gizzard/integration/TestServer.scala
@@ -1,6 +1,7 @@
package com.twitter.gizzard.testserver
import java.sql.{ResultSet, SQLException}
+import java.util.TreeMap
import com.twitter.querulous.evaluator.{QueryEvaluatorFactory, QueryEvaluator}
import com.twitter.querulous.config.Connection
import com.twitter.querulous.query.SqlQueryTimeoutException
@@ -41,8 +42,8 @@ object config {
trait TestJobScheduler extends Scheduler {
val schedulerType = new KestrelScheduler {
- val queuePath = "/tmp"
- override val keepJournal = false
+ path = "/tmp"
+ keepJournal = false
}
errorLimit = 25
}
@@ -199,7 +200,7 @@ extends TestShard {
}
def get(key: Int) = evaluator.selectOne(getSql, key)(asResult)
- def getAll(key: Int, count: Int) = evaluator.select(getSql, key, count)(asResult)
+ def getAll(key: Int, count: Int) = evaluator.select(getAllSql, key, count)(asResult)
}
@@ -218,7 +219,7 @@ class PutJob(key: Int, value: String, forwarding: Long => TestShard) extends Jso
class TestCopyFactory(ns: NameServer[TestShard], s: JobScheduler[JsonJob])
extends CopyJobFactory[TestShard] {
- def apply(src: ShardId, dests: List[CopyDestination]) = new TestCopy(src, dests, 0, 500, ns, s)
+ def apply(src: ShardId, dests: List[CopyDestination]) = new TestCopy(src, dests, -1, 500, ns, s)
}
class TestCopyParser(ns: NameServer[TestShard], s: JobScheduler[JsonJob])
@@ -244,3 +245,34 @@ extends CopyJob[TestShard](srcId, destinations, count, ns, s) {
def serialize = Map("cursor" -> cursor)
}
+
+class TestSplitFactory(ns: NameServer[TestShard], s: JobScheduler[JsonJob])
+extends CopyJobFactory[TestShard] {
+ def apply(src: ShardId, dests: List[CopyDestination]) = new TestSplit(src, dests, -1, 500, ns, s)
+}
+
+class TestSplit(srcId: ShardId, destinations: List[CopyDestination], cursor: Int, count: Int,
+ ns: NameServer[TestShard], s: JobScheduler[JsonJob])
+extends CopyJob[TestShard](srcId, destinations, count, ns, s) {
+ def copyPage(src: TestShard, dests: List[CopyDestinationShard[TestShard]], count: Int) = {
+ val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) }
+
+ val byBaseIds = new TreeMap[Long, TestShard]()
+ dests.foreach { d => byBaseIds.put(d.baseId.getOrElse(0), d.shard) }
+
+ rows.foreach { row =>
+ val shard = byBaseIds.floorEntry(nameServer.mappingFunction(row._1)).getValue
+ if(shard != null) {
+ shard.put(row._1, row._2)
+ } else {
+ println("wtf: " + row._1 + " mapped to " + nameServer.mappingFunction(row._1) + " returned null")
+ }
+ }
+
+ if (rows.isEmpty) None
+ else Some(new TestCopy(srcId, destinations, rows.last._1, count, ns, s))
+ }
+
+ def serialize = Map("cursor" -> cursor)
+}
+
View
15 src/test/scala/com/twitter/gizzard/scheduler_new/JsonCodecSpec.scala
@@ -10,6 +10,21 @@ class JsonCodecSpec extends ConfiguredSpecification with JMocker with ClassMocke
var unparsed = new mutable.ListBuffer[String]
val codec = new JsonCodec({ (unparsable: Array[Byte]) => unparsed += new String(unparsable) })
+ "parse some json" in {
+ val jsonMap = Map("has" -> "to", "be" -> "a", "map" -> List(3, Map("like" -> "so")))
+
+ val job = mock[JsonJob]
+ val parser = mock[JsonJobParser]
+
+ codec += ("this".r, parser)
+ expect {
+ one(parser).parse(jsonMap) willReturn job
+ }
+
+ val json = """{"this":{"has":"to","be":"a","map":[3, {"like":"so"}]}}"""
+ codec.inflate(json.getBytes()) mustEqual job
+ }
+
"fail gracefully" in {
codec.inflate("gobbledygook".getBytes) must throwA[UnparsableJsonException]
}
View
18 src/test/scala/com/twitter/gizzard/scheduler_new/JsonJobParserSpec.scala
@@ -55,20 +55,32 @@ class JsonJobParserSpec extends ConfiguredSpecification with JMocker with ClassM
}
"JsonJob" in {
- job.toJson mustEqual """{"FakeJob":{"a":1,"error_count":0,"error_message":"(none)"}}"""
+ val json = job.toJson
+ json mustMatch "\"FakeJob\""
+ json mustMatch "\"a\":1"
+ json mustMatch "\"error_count\":0"
+ json mustMatch "\"error_message\":\"\\(none\\)\""
}
"JsonNestedJob" in {
val nestedJob = new JsonNestedJob(List(job))
+ val json = nestedJob.toJson
- nestedJob.toJson mustEqual """{"com.twitter.gizzard.scheduler.JsonNestedJob":{"error_count":0,"error_message":"(none)","tasks":[{"FakeJob":{"a":1}}]}}"""
+ json mustMatch "\"com.twitter.gizzard.scheduler.JsonNestedJob\":\\{"
+ json mustMatch "\"error_count\":0"
+ json mustMatch "\"error_message\":\"\\(none\\)\""
+ json mustMatch "\"tasks\":\\[\\{\"FakeJob\":\\{\"a\":1\\}\\}\\]"
}
"errors" in {
job.errorCount = 23
job.errorMessage = "Good heavens!"
- job.toJson mustEqual """{"FakeJob":{"a":1,"error_count":23,"error_message":"Good heavens!"}}"""
+ val json = job.toJson
+ json mustMatch "\\{\"FakeJob\":\\{"
+ json mustMatch "\"a\":1"
+ json mustMatch "\"error_count\":23"
+ json mustMatch "\"error_message\":\"Good heavens!\""
}
}
}
View
13 src/test/scala/com/twitter/gizzard/scheduler_new/KestrelJobQueueSpec.scala
@@ -3,7 +3,7 @@ package com.twitter.gizzard.scheduler
import scala.collection.mutable
import com.twitter.util.Time
import com.twitter.util.TimeConversions._
-import net.lag.kestrel.{PersistentQueue, QItem}
+import net.lag.kestrel.{PersistentQueue, QItem, OverlaySetting}
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
@@ -126,10 +126,17 @@ object KestrelJobQueueSpec extends ConfiguredSpecification with JMocker with Cla
}
"drainTo" in {
+ val expiredQueueOverlay = mock[OverlaySetting[Option[PersistentQueue]]]
+ val maxAgeOverlay = mock[OverlaySetting[Int]]
+
expect {
one(destinationQueue).queue willReturn queue2
- one(queue).expiredQueue_=(Some(queue2))
- one(queue).maxAge_=(1)
+
+ one(queue).expiredQueue willReturn expiredQueueOverlay
+ one(expiredQueueOverlay).set(Some(Some(queue2)))
+
+ one(queue).maxAge willReturn maxAgeOverlay
+ one(maxAgeOverlay).set(Some(1))
}
kestrelJobQueue.drainTo(destinationQueue, 1.millisecond)
View
12 src/test/scala/com/twitter/gizzard/scheduler_new/MemoryJobQueueSpec.scala
@@ -72,11 +72,13 @@ object MemoryJobQueueSpec extends ConfiguredSpecification with JMocker with Clas
one(destinationQueue).put(job2)
}
- queue.drainTo(destinationQueue, 1.millisecond)
- queue.put(job1)
- queue.put(job2)
- Time.advance(2.milliseconds)
- queue.checkExpiration(10)
+ Time.withCurrentTimeFrozen { time =>
+ queue.drainTo(destinationQueue, 1.millisecond)
+ queue.put(job1)
+ queue.put(job2)
+ time.advance(2.milliseconds)
+ queue.checkExpiration(10)
+ }
}
}
}
View
1 src/test/scala/com/twitter/gizzard/scheduler_new/NestedJobSpec.scala
@@ -4,7 +4,6 @@ import scala.collection.mutable
import com.twitter.json.Json
import org.specs.mock.{ClassMocker, JMocker}
import org.specs.Specification
-import shards.ShardRejectedOperationException
class NestedJobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"NestedJob" should {
View
6 src/test/scala/com/twitter/gizzard/scheduler_new/ReplicatingJobIntegrationSpec.scala
@@ -13,7 +13,7 @@ object ReplicatingJobIntegrationSpec extends ConfiguredSpecification with JMocke
// TODO: make configurable
val port = 12313
val host = Host("localhost", port, "c1", HostStatus.Normal)
- val relay = new JobRelay(Map("c1" -> List(host)), 1, false, 1.second)
+ val relay = new JobRelay(Map("c1" -> List(host)), 1, false, 1.second, 0)
val codec = new ReplicatingJsonCodec(relay, { badJob =>
println(new String(badJob, "UTF-8"))
})
@@ -32,8 +32,8 @@ object ReplicatingJobIntegrationSpec extends ConfiguredSpecification with JMocke
val schedulerConfig = new gizzard.config.Scheduler {
val name = "tbird_test_q"
val schedulerType = new gizzard.config.KestrelScheduler {
- val queuePath = "/tmp"
- override val keepJournal = false
+ path = "/tmp"
+ keepJournal = false
}
errorLimit = 10
View
12 src/test/scala/com/twitter/gizzard/scheduler_new/ReplicatingJobSpec.scala
@@ -13,9 +13,9 @@ class ReplicatingJobSpec extends ConfiguredSpecification with JMocker with Class
"toMap" in {
expect {
- one(job1).toJson willReturn """{"foo":"bar"}"""
- one(job1).className willReturn testJsonJobClass
- one(job1).toMap willReturn Map[String, Any]()
+ one(job1).toJsonBytes willReturn """{"foo":"bar"}""".getBytes("UTF-8")
+ one(job1).className willReturn testJsonJobClass
+ one(job1).toMap willReturn Map[String, Any]()
}
val job = new ReplicatingJob(relay, Array(job1), List("c1"))
@@ -28,10 +28,10 @@ class ReplicatingJobSpec extends ConfiguredSpecification with JMocker with Class
}
"replicate when list of clusters is present" in {
- val json = """{"foo":"bar"}"""
+ val json = """{"foo":"bar"}""".getBytes("UTF-8")
expect {
- one(job1).toJson willReturn json
+ one(job1).toJsonBytes willReturn json
}
val job = new ReplicatingJob(relay, List(job1), List("c1"))
@@ -48,7 +48,7 @@ class ReplicatingJobSpec extends ConfiguredSpecification with JMocker with Class
"not replicate when list of clusters is empty" in {
expect {
- one(job1).toJson willReturn """{"foo":"bar"}"""
+ one(job1).toJsonBytes willReturn """{"foo":"bar"}""".getBytes("UTF-8")
one(job1).apply()
}
View
6 src/test/scala/com/twitter/gizzard/shards/ReplicatingShardSpec.scala
@@ -101,10 +101,10 @@ object ReplicatingShardSpec extends ConfiguredSpecification with JMocker {
"with an exception" in {
expect {
- one(shard1).put("name", "carol")
- one(shard2).put("name", "carol") willThrow new ShardException("o noes")
+ one(shard1).put("name", "carol") willThrow new ShardException("o noes")
+ one(shard2).put("name", "carol")
}
- replicatingShard.put("name", "carol") must throwA[Exception]
+ replicatingShard.put("name", "carol") must throwA[ShardException]
}
"with a black hole" in {

0 comments on commit a92bdbd

Please sign in to comment.
Something went wrong with that request. Please try again.