Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Merge branch 'multi_cluster' into copy_refactor2
Browse files Browse the repository at this point in the history
Conflicts:
	src/main/scala/com/twitter/gizzard/thrift/ManagerService.scala
  • Loading branch information
freels committed Dec 22, 2010
2 parents d12034b + 14696a3 commit 88e6189
Show file tree
Hide file tree
Showing 25 changed files with 252 additions and 60 deletions.
4 changes: 2 additions & 2 deletions config/test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ object TestQueryEvaluator extends QueryEvaluator {

class TestScheduler(val name: String) extends Scheduler {
val schedulerType = new KestrelScheduler {
val queuePath = "/tmp"
override val keepJournal = false
path = "/tmp"
keepJournal = false
}
errorLimit = 25
badJobQueue = new JsonJobLogger { name = "bad_jobs" }
Expand Down
10 changes: 6 additions & 4 deletions project/build/GizzardProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ class GizzardProject(info: ProjectInfo) extends StandardProject(info) with Subve
override def filterScalaJars = false
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.7.7"

inline("com.twitter" %% "querulous" % "1.4.3-config-SNAPSHOT")
inline("com.twitter" %% "querulous" % "1.5.0")
inline("net.lag" % "configgy" % "1.6.8")
inline("net.lag" % "kestrel" % "1.3-config-SNAPSHOT")
inline("net.lag" % "kestrel" % "1.2.7")
inline("com.twitter" % "ostrich" % "1.2.10")
inline("com.twitter" % "util" % "1.1.2-SNAPSHOT")
inline("com.twitter" % "util" % "1.1.2")

val rpcclient = "com.twitter" %% "rpcclient" % "1.1.2-SNAPSHOT"
val rpcclient = "com.twitter" %% "rpcclient" % "1.2.0-SNAPSHOT"
val slf4j = "org.slf4j" % "slf4j-jdk14" % "1.5.2"
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.5.2"
val thrift = "thrift" % "libthrift" % "0.5.0"
val json = "com.twitter" %% "json" % "2.1.5"
val jackson = "org.codehaus.jackson" % "jackson-core-asl" % "1.6.1"
val jacksonMap = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.6.1"

// test jars
val specs = "org.scala-tools.testing" % "specs" % "1.6.2.1" % "test"
Expand Down
17 changes: 12 additions & 5 deletions src/main/scala/com/twitter/gizzard/config/JobScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ package com.twitter.gizzard.config
import com.twitter.util.Duration
import com.twitter.util.TimeConversions._
import net.lag.logging.Logger
import net.lag.kestrel.config.PersistentQueue
import net.lag.kestrel.{PersistentQueue, PersistentQueueConfig}
import gizzard.scheduler.{JsonJob, Codec, MemoryJobQueue, KestrelJobQueue, JobConsumer}

trait SchedulerType
trait KestrelScheduler extends SchedulerType with PersistentQueue {
def queuePath: String
trait KestrelScheduler extends PersistentQueueConfig with SchedulerType with Cloneable {
def apply(newName: String): PersistentQueue = {
// ugh
val oldName = name
name = newName
val q = apply()
name = oldName
q
}
}
class MemoryScheduler extends SchedulerType {
var sizeLimit = 0
Expand Down Expand Up @@ -46,9 +53,9 @@ trait Scheduler {
def apply(codec: Codec[JsonJob]): gizzard.scheduler.JobScheduler[JsonJob] = {
val (jobQueue, errorQueue) = schedulerType match {
case kestrel: KestrelScheduler => {
val persistentJobQueue = kestrel(kestrel.queuePath, jobQueueName)
val persistentJobQueue = kestrel(jobQueueName)
val jobQueue = new KestrelJobQueue[JsonJob](jobQueueName, persistentJobQueue, codec)
val persistentErrorQueue = kestrel(kestrel.queuePath, errorQueueName)
val persistentErrorQueue = kestrel(errorQueueName)
val errorQueue = new KestrelJobQueue[JsonJob](errorQueueName, persistentErrorQueue, codec)

(jobQueue, errorQueue)
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/twitter/gizzard/config/NameServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class JobRelay {
var priority: Int = 0
var framed: Boolean = true
var timeout: Duration = 1.seconds
var retries: Int = 3

def apply() = new JobRelayFactory(priority, framed, timeout)
def apply() = new JobRelayFactory(priority, framed, timeout, retries)
}

object NoJobRelay extends JobRelay {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/gizzard/config/TServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait TServer extends (thrift.TProcessor => thrift.server.TServer) {
var idleTimeout = 60.seconds
var threadPool = new ThreadPool

def getPool = threadPool(name + " ThreadPool")
def getPool = threadPool(name + "_thread_pool")

def apply(processor: thrift.TProcessor): thrift.server.TServer
}
Expand Down
24 changes: 15 additions & 9 deletions src/main/scala/com/twitter/gizzard/nameserver/JobRelay.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@ extends Exception("Job replication to cluster '" + cluster + "' is blocked.", ca
class JobRelayFactory(
priority: Int,
framed: Boolean,
timeout: Duration)
timeout: Duration,
retries: Int)
extends (Map[String, Seq[Host]] => JobRelay) {

def this(priority: Int, framed: Boolean, timeout: Duration) = this(priority, framed, timeout, 0)

def apply(hostMap: Map[String, Seq[Host]]) =
new JobRelay(hostMap, priority, framed, timeout)
new JobRelay(hostMap, priority, framed, timeout, retries)
}

class JobRelay(
hostMap: Map[String, Seq[Host]],
priority: Int,
framed: Boolean,
timeout: Duration)
timeout: Duration,
retries: Int)
extends (String => JobRelayCluster) {

private val clients = Map(hostMap.flatMap { case (c, hs) =>
Expand All @@ -39,7 +44,7 @@ extends (String => JobRelayCluster) {
if (onlineHosts.isEmpty) {
if (blocked) Seq(c -> new BlockedJobRelayCluster(c)) else Seq()
} else {
Seq(c -> new JobRelayCluster(onlineHosts, priority, framed, timeout))
Seq(c -> new JobRelayCluster(onlineHosts, priority, framed, timeout, retries))
}
}.toSeq: _*)

Expand All @@ -52,9 +57,10 @@ class JobRelayCluster(
hosts: Seq[Host],
priority: Int,
framed: Boolean,
timeout: Duration)
timeout: Duration,
retries: Int)
extends (Iterable[String] => Unit) {
val client = new LoadBalancingChannel(hosts.map(h => new JobInjectorClient(h.hostname, h.port, framed, timeout)))
val client = new LoadBalancingChannel(hosts.map(h => new JobInjectorClient(h.hostname, h.port, framed, timeout, retries)))

def apply(jobs: Iterable[String]) {
val jobList = new JLinkedList[thrift.Job]()
Expand All @@ -73,14 +79,14 @@ object NullJobRelayFactory extends JobRelayFactory(0, false, new Duration(0)) {
override def apply(h: Map[String, Seq[Host]]) = NullJobRelay
}

object NullJobRelay extends JobRelay(Map(), 0, false, new Duration(0))
object NullJobRelay extends JobRelay(Map(), 0, false, new Duration(0), 0)

object NullJobRelayCluster extends JobRelayCluster(Seq(), 0, false, new Duration(0)) {
object NullJobRelayCluster extends JobRelayCluster(Seq(), 0, false, new Duration(0), 0) {
override val client = null
override def apply(jobs: Iterable[String]) = ()
}

class BlockedJobRelayCluster(cluster: String) extends JobRelayCluster(Seq(), 0, false, new Duration(0)) {
class BlockedJobRelayCluster(cluster: String) extends JobRelayCluster(Seq(), 0, false, new Duration(0), 0) {
override val client = null
override def apply(jobs: Iterable[String]) { throw new ClusterBlockedException(cluster) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class MemoryShard extends Shard {
}
}

def dumpStructure(tableId: Int) = {
new NameserverState(shardTable.toList, parentTable.toList, forwardingTable.toList, tableId)
}

private def find(shardId: ShardId): Option[ShardInfo] = {
shardTable.find { _.id == shardId }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ extends Shard {
familyTree.getOrElse(id, new mutable.ArrayBuffer[LinkInfo])
}

def dumpStructure(tableId: Int) = nameServerShard.dumpStructure(tableId: Int)

def reload() {
log.info("Loading name server configuration...")
nameServerShard.reload()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.twitter.gizzard.nameserver

import gizzard.shards.{ShardId, ShardInfo, LinkInfo}
import thrift.conversions.ShardInfo._
import thrift.conversions.LinkInfo._
import thrift.conversions.Forwarding._
import thrift.conversions.Sequences._
import scala.collection.mutable.ListBuffer

class NameserverState(initialShards: List[gizzard.shards.ShardInfo],
initialLinks: List[gizzard.shards.LinkInfo],
initialForwardings: List[Forwarding], tableId: Int) {


private val shardsById = Map(initialShards.map(s => s.id -> s): _*)

private val linksByParent = initialLinks.foldLeft(Map[ShardId, List[LinkInfo]]()) { (map, link) =>
val key = link.upId
val entry = map.get(key).map(link :: _).getOrElse(List(link))

map + (key -> entry)
}

val forwardings = initialForwardings.filter(_.tableId == tableId)

var links = new ListBuffer[gizzard.shards.LinkInfo]
var shards = new ListBuffer[gizzard.shards.ShardInfo]

private def computeSubtree(id: ShardId): Unit = {
shardsById.get(id).foreach { shardInfo =>
shards += shardInfo
}
linksByParent.get(id).foreach { linksOption =>
linksOption.foreach { link =>
links += link
computeSubtree(link.downId)
}
}
}

forwardings.foreach { forwarding => computeSubtree(forwarding.shardId) }

def toThrift = {
val thriftForwardings = forwardings.map(_.toThrift).toJavaList
val thriftLinks = links.map(_.toThrift).toJavaList
val thriftShards = shards.map(_.toThrift).toJavaList
new thrift.NameserverState(thriftShards, thriftLinks, thriftForwardings, tableId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class ReadWriteShardAdapter(shard: ReadWriteShard[Shard]) extends shards.ReadWri
def listShards() = shard.readOperation(_.listShards())
def shardsForHostname(hostname: String) = shard.readOperation(_.shardsForHostname(hostname))
def listHostnames() = shard.readOperation(_.listHostnames)
def dumpStructure(tableId: Int) = shard.readOperation(_.dumpStructure(tableId))

def createShard[S <: shards.Shard](shardInfo: ShardInfo, repository: ShardRepository[S]) = shard.writeOperation(_.createShard(shardInfo, repository))
def deleteShard(id: ShardId) = shard.writeOperation(_.deleteShard(id))
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/twitter/gizzard/nameserver/Shard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ trait Shard extends shards.Shard {
@throws(classOf[shards.ShardException]) def getBusyShards(): Seq[ShardInfo]
@throws(classOf[shards.ShardException]) def rebuildSchema()
@throws(classOf[shards.ShardException]) def reload()
@throws(classOf[shards.ShardException]) def dumpStructure(tableId: Int): NameserverState
@throws(classOf[shards.ShardException]) def listHostnames(): Seq[String]
@throws(classOf[shards.ShardException]) def removeForwarding(forwarding: Forwarding)

Expand Down
9 changes: 7 additions & 2 deletions src/main/scala/com/twitter/gizzard/nameserver/SqlShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CREATE TABLE IF NOT EXISTS shards (
destination_type VARCHAR(125),
busy TINYINT NOT NULL DEFAULT 0,
PRIMARY KEY primary_key_table_prefix_hostname (hostname, table_prefix)
PRIMARY KEY (hostname, table_prefix)
) ENGINE=INNODB
"""

Expand All @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS shard_children (
child_table_prefix VARCHAR(125) NOT NULL,
weight INT NOT NULL DEFAULT 1,
PRIMARY KEY primary_key_family (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
PRIMARY KEY (parent_hostname, parent_table_prefix, child_hostname, child_table_prefix),
INDEX child (child_hostname, child_table_prefix)
) ENGINE=INNODB
"""
Expand Down Expand Up @@ -85,6 +85,11 @@ class SqlShard(queryEvaluator: QueryEvaluator) extends nameserver.Shard {
}


def dumpStructure(tableId: Int) = {
val shards = queryEvaluator.select("SELECT * FROM shards") { row => rowToShardInfo(row) }
new NameserverState(shards.toList, listLinks(), getForwardings(), tableId)
}

def createShard[S <: shards.Shard](shardInfo: ShardInfo, repository: ShardRepository[S]) {
queryEvaluator.transaction { transaction =>
try {
Expand Down
34 changes: 29 additions & 5 deletions src/main/scala/com/twitter/gizzard/scheduler/JsonCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import scala.collection.mutable
import scala.util.matching.Regex
import com.twitter.json.Json
import net.lag.logging.Logger
import org.codehaus.jackson.map.ObjectMapper
import java.util.{Map => JMap, List => JList}
import scala.collection.jcl

/**
* Codec for json-encoded jobs.
Expand All @@ -18,12 +21,15 @@ import net.lag.logging.Logger
* Jobs that can't be parsed by the json library are handed to 'unparsableJobHandler'.
*/
class JsonCodec(unparsableJobHandler: Array[Byte] => Unit) extends Codec[JsonJob] {
private val mapper = new ObjectMapper

protected val log = Logger.get(getClass.getName)
protected val processors = {
val p = mutable.Map.empty[Regex, JsonJobParser]
p += (("JsonNestedJob".r, new JsonNestedJobParser(this)))
// for backward compat:
p += (("JobWithTasks".r, new JsonNestedJobParser(this)))
p += (("SchedulableWithTasks".r, new JsonNestedJobParser(this)))
p
}

Expand All @@ -34,11 +40,9 @@ class JsonCodec(unparsableJobHandler: Array[Byte] => Unit) extends Codec[JsonJob

def inflate(data: Array[Byte]): JsonJob = {
try {
Json.parse(new String(data)) match {
case json: Map[_, _] =>
assert(json.size == 1)
inflate(json.asInstanceOf[Map[String, Any]])
}
val javaMap: JMap[String, Any] = mapper.readValue(new String(data), classOf[JMap[String, Any]])
val scalaMap = deepConvert(javaMap)
inflate(scalaMap)
} catch {
case e =>
log.error(e, "Unparsable JsonJob; dropping: " + e.toString)
Expand All @@ -61,4 +65,24 @@ class JsonCodec(unparsableJobHandler: Array[Byte] => Unit) extends Codec[JsonJob
throw new UnparsableJsonException("Processor '%s' blew up: %s".format(jobType, e.toString), e)
}
}

private def deepConvert(javaList: JList[Any]): List[Any] = {
jcl.Buffer(javaList).map { v =>
v match {
case jm: JMap[String, Any] => deepConvert(jm)
case jl: JList[Any] => deepConvert(jl)
case _ => v
}
}.toList
}

private def deepConvert(javaMap: JMap[String, Any]): Map[String, Any] = {
Map(jcl.Map(javaMap).toSeq.map { case (k,v) =>
v match {
case jm: JMap[String, Any] => (k, deepConvert(jm))
case jl: JList[Any] => (k, deepConvert(jl))
case _ => (k, v)
}
}: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class KestrelJobQueue[J <: Job](queueName: String, val queue: PersistentQueue, c
}

def drainTo(otherQueue: JobQueue[J], delay: Duration) {
queue.expiredQueue = Some(otherQueue.asInstanceOf[KestrelJobQueue[J]].queue)
queue.maxAge = delay.inMilliseconds.toInt
queue.expiredQueue set Some(Some(otherQueue.asInstanceOf[KestrelJobQueue[J]].queue))
queue.maxAge set Some(delay.inMilliseconds.toInt)
}

def checkExpiration(flushLimit: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package com.twitter.gizzard.thrift
import com.twitter.rpcclient.{PooledClient, ThriftConnection}
import com.twitter.util.Duration

class JobInjectorClient(host: String, port: Int, framed: Boolean, soTimeout: Duration)
class JobInjectorClient(host: String, port: Int, framed: Boolean, soTimeout: Duration, retryCount: Int)
extends PooledClient[JobInjector.Iface] {

def this(host: String, port: Int, framed: Boolean, soTimeout: Duration) =
this(host, port, framed, soTimeout, 0)

val name = "JobManagerClient"

def createConnection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class ManagerService[S <: shards.Shard, J <: JsonJob](
wrapEx(nameServer.getForwardings().map(_.toThrift).toJavaList)
}


def list_hostnames() = wrapEx(nameServer.listHostnames.toJavaList)

def mark_shard_busy(id: ShardId, busy: Int) = {
Expand All @@ -116,6 +115,8 @@ class ManagerService[S <: shards.Shard, J <: JsonJob](
}
}

def dump_nameserver(tableId: Int) = wrapEx(nameServer.dumpStructure(tableId).toThrift)


// Job Scheduler Management

Expand Down
Loading

0 comments on commit 88e6189

Please sign in to comment.