Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' into safe_stop
Browse files Browse the repository at this point in the history
  • Loading branch information
freels committed Jan 18, 2011
2 parents 3addbdf + 43938d7 commit db2ea09
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 47 deletions.
5 changes: 2 additions & 3 deletions project/build/GizzardProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ class GizzardProject(info: ProjectInfo) extends StandardProject(info) with Subve
override def filterScalaJars = false
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.7.7"

inline("com.twitter" %% "querulous" % "1.5.1")
inline("net.lag" % "configgy" % "1.6.8")
inline("com.twitter" %% "querulous" % "1.5.4")
inline("net.lag" % "configgy" % "1.6.10-SNAPSHOT")
inline("net.lag" % "kestrel" % "1.2.7")
inline("com.twitter" % "ostrich" % "1.2.10")
inline("com.twitter" % "util" % "1.1.2")
Expand All @@ -15,7 +15,6 @@ class GizzardProject(info: ProjectInfo) extends StandardProject(info) with Subve
val slf4j = "org.slf4j" % "slf4j-jdk14" % "1.5.2"
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.5.2"
val thrift = "thrift" % "libthrift" % "0.5.0"
val json = "com.twitter" %% "json" % "2.1.5"
val jackson = "org.codehaus.jackson" % "jackson-core-asl" % "1.6.1"
val jacksonMap = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.6.1"

Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ class JobRelayCluster(
framed: Boolean,
timeout: Duration,
retries: Int)
extends (Iterable[String] => Unit) {
extends (Iterable[Array[Byte]] => Unit) {
val client = new LoadBalancingChannel(hosts.map(h => new JobInjectorClient(h.hostname, h.port, framed, timeout, retries)))

def apply(jobs: Iterable[String]) {
def apply(jobs: Iterable[Array[Byte]]) {
val jobList = new JLinkedList[thrift.Job]()

jobs.foreach { j =>
val tj = new thrift.Job(priority, ByteBuffer.wrap(j.getBytes("UTF-8")))
val tj = new thrift.Job(priority, ByteBuffer.wrap(j))
tj.setIs_replicated(true)
jobList.add(tj)
}
Expand All @@ -83,10 +83,10 @@ object NullJobRelay extends JobRelay(Map(), 0, false, new Duration(0), 0)

object NullJobRelayCluster extends JobRelayCluster(Seq(), 0, false, new Duration(0), 0) {
override val client = null
override def apply(jobs: Iterable[String]) = ()
override def apply(jobs: Iterable[Array[Byte]]) = ()
}

class BlockedJobRelayCluster(cluster: String) extends JobRelayCluster(Seq(), 0, false, new Duration(0), 0) {
override val client = null
override def apply(jobs: Iterable[String]) { throw new ClusterBlockedException(cluster) }
override def apply(jobs: Iterable[Array[Byte]]) { throw new ClusterBlockedException(cluster) }
}
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/gizzard/scheduler/JsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class JsonCodec(unparsableJobHandler: Array[Byte] => Unit) extends Codec[JsonJob
def +=(item: (Regex, JsonJobParser)) = processors += item
def +=(r: Regex, p: JsonJobParser) = processors += ((r, p))

def flatten(job: JsonJob): Array[Byte] = job.toJson.getBytes
def flatten(job: JsonJob): Array[Byte] = job.toJsonBytes

def inflate(data: Array[Byte]): JsonJob = {
try {
val javaMap: JMap[String, Any] = mapper.readValue(new String(data), classOf[JMap[String, Any]])
val javaMap: JMap[String, Any] = mapper.readValue(new String(data, "UTF-8"), classOf[JMap[String, Any]])
val scalaMap = deepConvert(javaMap)
inflate(scalaMap)
} catch {
Expand Down
42 changes: 37 additions & 5 deletions src/main/scala/com/twitter/gizzard/scheduler/JsonJob.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.twitter.gizzard.scheduler

import com.twitter.json.{Json, JsonException}
import com.twitter.ostrich.{StatsProvider, W3CStats}
import org.codehaus.jackson.map.ObjectMapper
import net.lag.logging.Logger
import gizzard.proxy.LoggingProxy
import java.util.{Map => JMap, List => JList}

class UnparsableJsonException(s: String, cause: Throwable) extends Exception(s, cause)

Expand All @@ -12,15 +13,46 @@ class UnparsableJsonException(s: String, cause: Throwable) extends Exception(s,
* map containing 'className' => 'toMap', where 'toMap' should return a map of key/values from the
* job. The default 'className' is the job's java/scala class name.
*/
object JsonJob {
val mapper = new ObjectMapper
}

trait JsonJob extends Job {
def toMap: Map[String, Any]

def shouldReplicate = true
def className = getClass.getName

def toJson = {
def toJsonBytes = {
def json = toMap ++ Map("error_count" -> errorCount, "error_message" -> errorMessage)
Json.build(Map(className -> json)).toString
val javaMap = deepConvert(Map(className -> json))
JsonJob.mapper.writeValueAsBytes(javaMap)
}

def toJson = new String(toJsonBytes, "UTF-8")

private def deepConvert(scalaMap: Map[String, Any]): JMap[String, Any] = {
val map = new java.util.LinkedHashMap[String, Any]()
scalaMap.map { case (k, v) =>
v match {
case m: Map[String, Any] => map.put(k, deepConvert(m))
case a: Iterable[Any] => map.put(k, deepConvert(a))
case v => map.put(k, v)
}
}
map
}

private def deepConvert(scalaIterable: Iterable[Any]): JList[Any] = {
val list = new java.util.LinkedList[Any]()
scalaIterable.map { v =>
v match {
case m: Map[String, Any] => list.add(deepConvert(m))
case a: Iterable[Any] => list.add(deepConvert(a))
case v => list.add(v)
}
}
list
}

override def toString = toJson
Expand All @@ -31,14 +63,14 @@ trait JsonJob extends Job {
*/
class JsonNestedJob(jobs: Iterable[JsonJob]) extends NestedJob[JsonJob](jobs) with JsonJob {
def toMap: Map[String, Any] = Map("tasks" -> taskQueue.map { task => Map(task.className -> task.toMap) })
override def toString = toJson
//override def toString = toJson
}

/**
* A JobConsumer that encodes JsonJobs into a string and logs them at error level.
*/
class JsonJobLogger(logger: Logger) extends JobConsumer[JsonJob] {
def put(job: JsonJob) = logger.error(job.toJson)
def put(job: JsonJob) = logger.error(job.toString)
}

class LoggingJsonJobParser(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class KestrelJobQueue[J <: Job](queueName: String, val queue: PersistentQueue, c
def checkExpiration(flushLimit: Int) {
val count = queue.discardExpired(flushLimit)
if (count > 0) {
log.info("Replaying %d error jobs.", count)
log.info("Replaying %d error jobs from %s.", count, queueName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class ReplicatingJob(
relay: JobRelay,
jobs: Iterable[JsonJob],
clusters: Iterable[String],
serialized: Iterable[String])
serialized: Iterable[Array[Byte]])
extends JsonNestedJob(jobs) {

def this(relay: JobRelay, jobs: Iterable[JsonJob], clusters: Iterable[String]) =
this(relay, jobs, clusters, jobs.map(_.toJson))
this(relay, jobs, clusters, jobs.map(_.toJsonBytes))

def this(relay: JobRelay, jobs: Iterable[JsonJob]) = this(relay, jobs, relay.clusters)

Expand All @@ -69,7 +69,7 @@ extends JsonNestedJob(jobs) {
override def toMap: Map[String, Any] = {
var attrs = super.toMap.toList
if (!clustersQueue.isEmpty) attrs = "dest_clusters" -> clustersQueue.toList :: attrs
if (!serialized.isEmpty) attrs = "serialized" -> serialized :: attrs
if (!serialized.isEmpty) attrs = "serialized" -> serialized.map(new String(_, "UTF-8")) :: attrs
Map(attrs: _*)
}

Expand Down Expand Up @@ -108,7 +108,7 @@ extends JsonJobParser {

override def apply(json: Map[String, Any]): JsonJob = {
val clusters = json.get("dest_clusters").map(_.asInstanceOf[Iterable[String]]) getOrElse Nil
val serialized = json.get("serialized").map(_.asInstanceOf[Iterable[String]]) getOrElse Nil
val serialized = json.get("serialized").map(_.asInstanceOf[Iterable[String]].map(_.getBytes("UTF-8"))) getOrElse Nil
val tasks = json("tasks").asInstanceOf[Tasks].map(codec.inflate)

new ReplicatingJob(relay, tasks, clusters, serialized)
Expand Down
19 changes: 13 additions & 6 deletions src/main/scala/com/twitter/gizzard/shards/ReplicatingShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,21 @@ class ReplicatingShard[S <: Shard](
}

protected def fanoutWriteSerial[A](method: (S => A), replicas: Seq[S]): A = {
val exceptions = new mutable.ListBuffer[Throwable]

val results = replicas.flatMap { shard =>
try {
Some(method(shard))
} catch {
case e: ShardBlackHoleException =>
None
case e =>
exceptions += e
None
}
}

exceptions.map { throw _ }
if (results.size == 0) {
throw new ShardBlackHoleException(shardInfo.id)
}
Expand All @@ -112,10 +119,10 @@ class ReplicatingShard[S <: Shard](
try {
f(shard)
} catch {
case e: ShardRejectedOperationException =>
failover(f, remainder)
case e: ShardException =>
if (!e.isInstanceOf[ShardRejectedOperationException]) {
log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
}
log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
failover(f, remainder)
}
}
Expand All @@ -141,10 +148,10 @@ class ReplicatingShard[S <: Shard](
Some(answer)
}
} catch {
case e: ShardRejectedOperationException =>
rebuildableFailover(f, rebuild, remainder, toRebuild, everSuccessful)
case e: ShardException =>
if (!e.isInstanceOf[ShardRejectedOperationException]) {
log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
}
log.warning(e, "Error on %s: %s", shard.shardInfo.id, e)
rebuildableFailover(f, rebuild, remainder, toRebuild, everSuccessful)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ extends JobInjector.Iface {
def apply = deserialized.apply()
def toMap = deserialized.toMap

override def toJson = {
override def toJsonBytes = {
if (isDeserialized) {
deserialized.toJson
deserialized.toJsonBytes
} else {
new String(serialized, "UTF-8")
serialized
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.twitter.util.Eval


trait ConfiguredSpecification extends Specification {
noDetailedDiffs()
val config = Eval[gizzard.config.GizzardServer](new File("config/test.scala"))
config.logging()
}
Expand Down
12 changes: 7 additions & 5 deletions src/test/scala/com/twitter/gizzard/integration/TestServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ extends TestShard {
}

def get(key: Int) = evaluator.selectOne(getSql, key)(asResult)
def getAll(key: Int, count: Int) = evaluator.select(getSql, key, count)(asResult)
def getAll(key: Int, count: Int) = evaluator.select(getAllSql, key, count)(asResult)
}


Expand Down Expand Up @@ -235,10 +235,12 @@ extends CopyJob[TestShard](srcId, destId, count, ns, s) {
def copyPage(src: TestShard, dest: TestShard, count: Int) = {
val rows = src.getAll(cursor, count).map { case (k,v,c) => (k,v) }

dest.putAll(rows)

if (rows.isEmpty) None
else Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s))
if (rows.isEmpty) {
None
} else {
dest.putAll(rows)
Some(new TestCopy(srcId, destId, rows.last._1, count, ns, s))
}
}

def serialize = Map("cursor" -> cursor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,32 @@ class JsonJobParserSpec extends ConfiguredSpecification with JMocker with ClassM
}

"JsonJob" in {
job.toJson mustEqual """{"FakeJob":{"a":1,"error_count":0,"error_message":"(none)"}}"""
val json = job.toJson
json mustMatch "\"FakeJob\""
json mustMatch "\"a\":1"
json mustMatch "\"error_count\":0"
json mustMatch "\"error_message\":\"\\(none\\)\""
}

"JsonNestedJob" in {
val nestedJob = new JsonNestedJob(List(job))
val json = nestedJob.toJson

nestedJob.toJson mustEqual """{"com.twitter.gizzard.scheduler.JsonNestedJob":{"error_count":0,"error_message":"(none)","tasks":[{"FakeJob":{"a":1}}]}}"""
json mustMatch "\"com.twitter.gizzard.scheduler.JsonNestedJob\":\\{"
json mustMatch "\"error_count\":0"
json mustMatch "\"error_message\":\"\\(none\\)\""
json mustMatch "\"tasks\":\\[\\{\"FakeJob\":\\{\"a\":1\\}\\}\\]"
}

"errors" in {
job.errorCount = 23
job.errorMessage = "Good heavens!"

job.toJson mustEqual """{"FakeJob":{"a":1,"error_count":23,"error_message":"Good heavens!"}}"""
val json = job.toJson
json mustMatch "\\{\"FakeJob\":\\{"
json mustMatch "\"a\":1"
json mustMatch "\"error_count\":23"
json mustMatch "\"error_message\":\"Good heavens!\""
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import scala.collection.mutable
import com.twitter.json.Json
import org.specs.mock.{ClassMocker, JMocker}
import org.specs.Specification
import shards.ShardRejectedOperationException

class NestedJobSpec extends ConfiguredSpecification with JMocker with ClassMocker {
"NestedJob" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ class ReplicatingJobSpec extends ConfiguredSpecification with JMocker with Class

"toMap" in {
expect {
one(job1).toJson willReturn """{"foo":"bar"}"""
one(job1).className willReturn testJsonJobClass
one(job1).toMap willReturn Map[String, Any]()
one(job1).toJsonBytes willReturn """{"foo":"bar"}""".getBytes("UTF-8")
one(job1).className willReturn testJsonJobClass
one(job1).toMap willReturn Map[String, Any]()
}

val job = new ReplicatingJob(relay, Array(job1), List("c1"))
Expand All @@ -28,10 +28,10 @@ class ReplicatingJobSpec extends ConfiguredSpecification with JMocker with Class
}

"replicate when list of clusters is present" in {
val json = """{"foo":"bar"}"""
val json = """{"foo":"bar"}""".getBytes("UTF-8")

expect {
one(job1).toJson willReturn json
one(job1).toJsonBytes willReturn json
}

val job = new ReplicatingJob(relay, List(job1), List("c1"))
Expand All @@ -48,7 +48,7 @@ class ReplicatingJobSpec extends ConfiguredSpecification with JMocker with Class

"not replicate when list of clusters is empty" in {
expect {
one(job1).toJson willReturn """{"foo":"bar"}"""
one(job1).toJsonBytes willReturn """{"foo":"bar"}""".getBytes("UTF-8")
one(job1).apply()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ object ReplicatingShardSpec extends ConfiguredSpecification with JMocker {

"with an exception" in {
expect {
one(shard1).put("name", "carol")
one(shard2).put("name", "carol") willThrow new ShardException("o noes")
one(shard1).put("name", "carol") willThrow new ShardException("o noes")
one(shard2).put("name", "carol")
}
replicatingShard.put("name", "carol") must throwA[Exception]
replicatingShard.put("name", "carol") must throwA[ShardException]
}

"with a black hole" in {
Expand Down

0 comments on commit db2ea09

Please sign in to comment.