Permalink
Browse files

Merge remote-tracking branch 'origin/master' into bad_jobs_q_standalone

Conflicts:
	src/main/scala/com/twitter/gizzard/config/JobScheduler.scala
  • Loading branch information...
Alan Liang
Alan Liang committed Mar 9, 2012
2 parents f0d8b60 + ede4682 commit 8ee20c2bc4c93848a59973eb10387d17c6bd72f3
View
@@ -13,18 +13,9 @@ trait Credentials extends Connection {
val password = ""
}
-object GizzardMemoization {
- var nsQueryEvaluator: QueryEvaluatorFactory = null
-}
-
object TestQueryEvaluator extends QueryEvaluator {
- override def apply() = {
- if (GizzardMemoization.nsQueryEvaluator == null) {
- GizzardMemoization.nsQueryEvaluator = super.apply()
- }
-
- GizzardMemoization.nsQueryEvaluator
- }
+ singletonFactory = true
+ database.memoize = true
}
class TestScheduler(val name: String) extends Scheduler {
@@ -46,6 +37,7 @@ new GizzardServer {
jobRelay.priority = Priority.High.id
nameServerReplicas = Seq(new Mysql {
+ queryEvaluator = TestQueryEvaluator
val connection = new Connection with Credentials {
val hostnames = Seq("localhost")
val database = "gizzard_test"
View
@@ -1,8 +1,8 @@
#Project properties
-#Wed Mar 07 17:45:29 PST 2012
+#Thu Mar 08 18:41:09 PST 2012
project.organization=com.twitter
project.name=gizzard
sbt.version=0.7.4
-project.version=3.0.1-beta30-SNAPSHOT
+project.version=3.0.2-beta32-SNAPSHOT
build.scala.versions=2.8.1
project.initialize=false
@@ -6,19 +6,19 @@ with CompileThriftFinagle
with DefaultRepos
with SubversionPublisher {
- def finagleVersion = "1.9.10"
+ def finagleVersion = "1.11.0"
override def filterScalaJars = false
val scalaTools = "org.scala-lang" % "scala-compiler" % "2.8.1"
- val querulous = "com.twitter" % "querulous" % "2.6.5"
+ val querulous = "com.twitter" % "querulous" % "2.7.1"
//val kestrel = "net.lag" % "kestrel" % "1.2.7"
// remove when moved to libkestrel
val twitterActors = "com.twitter" % "twitteractors_2.8.0" % "2.0.1"
- val finagleThrift = "com.twitter" % "finagle-thrift" % "1.9.10"
- val finagleOstrich4 = "com.twitter" % "finagle-ostrich4" % "1.9.10"
+ val finagleThrift = "com.twitter" % "finagle-thrift" % finagleVersion
+ val finagleOstrich4 = "com.twitter" % "finagle-ostrich4" % finagleVersion
val jackson = "org.codehaus.jackson" % "jackson-core-asl" % "1.9.2"
val jacksonMap = "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.2"
@@ -1,4 +1,4 @@
#Automatically generated by ReleaseManagement
-#Wed Mar 07 17:45:29 PST 2012
-version=3.0.0-beta30
-sha1=8bd5b70ca612f46d55d6a3d8f560d52ac8a10e99
+#Thu Mar 08 18:41:09 PST 2012
+version=3.0.1-beta30
+sha1=85d2654b75bbc5b5ecba4b35cf3ca5203ee3383f
@@ -36,13 +36,15 @@ abstract class GizzardServer(config: ServerConfig) {
}
lazy val jobCodec = new LoggingJsonCodec(
- new ReplicatingJsonCodec(remoteClusterManager.jobRelay, logUnparsableJob),
+ new JsonCodec(logUnparsableJob),
config.jobStats,
logUnparsableJob
)
+ lazy val jobAsyncReplicator = config.jobAsyncReplicator(remoteClusterManager.jobRelay)
+
lazy val jobScheduler = new PrioritizingJobScheduler(jobPriorities map { p =>
- p -> config.jobQueues(p)(jobCodec)
+ p -> config.jobQueues(p)(jobCodec, jobAsyncReplicator)
} toMap)
// service wiring
@@ -57,6 +59,7 @@ abstract class GizzardServer(config: ServerConfig) {
nameServer.reload()
remoteClusterManager.reload()
jobScheduler.start()
+ jobAsyncReplicator.start()
new Thread(new Runnable { def run() { managerThriftServer.serve() } }, "GizzardManagerThread").start()
new Thread(new Runnable { def run() { jobInjectorThriftServer.serve() } }, "JobInjectorThread").start()
@@ -69,5 +72,6 @@ abstract class GizzardServer(config: ServerConfig) {
while (quiesce && jobScheduler.size > 0) Thread.sleep(100)
jobScheduler.shutdown()
+ jobAsyncReplicator.shutdown()
}
}
@@ -9,6 +9,9 @@ import com.twitter.conversions.time._
import com.twitter.gizzard.shards
import com.twitter.gizzard.nameserver
import com.twitter.gizzard.proxy
+import com.twitter.util.StorageUnit
+import com.twitter.conversions.storage._
+import net.lag.kestrel.config.QueueConfig
trait GizzardServer {
def jobQueues: Map[Int, Scheduler]
@@ -19,6 +22,7 @@ trait GizzardServer {
var jobRelay: JobRelay = new JobRelay
var manager: Manager = new Manager with TThreadServer
var jobInjector: JobInjector = new JobInjector with THsHaServer
+ var jobAsyncReplicator: JobAsyncReplicator = new JobAsyncReplicator
var queryStats: StatsCollection = new StatsCollection { }
var jobStats: StatsCollection = new StatsCollection {
@@ -65,6 +69,43 @@ trait JobInjector extends TServer {
var port = 7921
}
+class JobAsyncReplicator {
+ var path = "/tmp"
+ var maxItems: Int = Int.MaxValue
+ var maxSize: StorageUnit = Long.MaxValue.bytes
+ var maxItemSize: StorageUnit = Long.MaxValue.bytes
+ var maxAge: Option[Duration] = None
+ var maxJournalSize: StorageUnit = 16.megabytes
+ var maxMemorySize: StorageUnit = 128.megabytes
+ var maxJournalOverflow: Int = 10
+ var discardOldWhenFull: Boolean = false
+ var keepJournal: Boolean = true
+ var syncJournal: Boolean = false
+ var multifileJournal: Boolean = false
+ var expireToQueue: Option[String] = None
+ var maxExpireSweep: Int = Int.MaxValue
+ var fanoutOnly: Boolean = false
+ var threadsPerCluster: Int = 4
+
+ def aConfig = QueueConfig(
+ maxItems = maxItems,
+ maxSize = maxSize,
+ maxItemSize = maxItemSize,
+ maxAge = maxAge,
+ maxJournalSize = maxJournalSize,
+ maxMemorySize = maxMemorySize,
+ maxJournalOverflow = maxJournalOverflow,
+ discardOldWhenFull = discardOldWhenFull,
+ keepJournal = keepJournal,
+ syncJournal = syncJournal,
+ multifileJournal = multifileJournal,
+ expireToQueue = expireToQueue,
+ maxExpireSweep = maxExpireSweep,
+ fanoutOnly = fanoutOnly
+ )
+
+ def apply(jobRelay: => nameserver.JobRelay) = new scheduler.JobAsyncReplicator(jobRelay, aConfig, path, threadsPerCluster)
+}
// XXX: move StatsCollection, etc. to separate file
trait TransactionalStatsConsumer {
@@ -4,13 +4,13 @@ import com.twitter.util.{Duration, StorageUnit}
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
-import net.lag.kestrel.{QueueCollection, PersistentQueue}
-import net.lag.kestrel.config.QueueConfig
-
import com.twitter.gizzard
import com.twitter.gizzard.scheduler
import com.twitter.gizzard.scheduler.{JsonJob, JsonCodec, MemoryJobQueue, KestrelJobQueue, JobConsumer}
+import net.lag.kestrel.{QueueCollection, PersistentQueue}
+import net.lag.kestrel.config.QueueConfig
+
trait SchedulerType
trait KestrelScheduler extends SchedulerType {
var path = "/tmp"
@@ -107,6 +107,7 @@ trait Scheduler {
var errorRetryDelay = 900.seconds
var perFlushItemLimit = 1000
var jitterRate = 0.0f
+ var isReplicated: Boolean = true
var _jobQueueName: Option[String] = None
def jobQueueName_=(s: String) { _jobQueueName = Some(s) }
@@ -118,7 +119,7 @@ trait Scheduler {
def badJobQueueName_=(s: String) { _badJobQueueName = Some(s) }
def badJobQueueName: String = _badJobQueueName.getOrElse(name + "_bad_jobs")
- def apply(codec: JsonCodec): gizzard.scheduler.JobScheduler = {
+ def apply(codec: JsonCodec, jobAsyncReplicator: scheduler.JobAsyncReplicator): gizzard.scheduler.JobScheduler = {
val (jobQueue, errorQueue, badJobQueue) = schedulerType match {
case kestrel: KestrelScheduler => {
val persistentJobQueue = kestrel(jobQueueName)
@@ -155,6 +156,8 @@ trait Scheduler {
errorLimit,
perFlushItemLimit,
jitterRate,
+ isReplicated,
+ jobAsyncReplicator,
jobQueue,
errorQueue,
badJobQueue
@@ -0,0 +1,127 @@
+package com.twitter.gizzard.scheduler
+
+import scala.annotation.tailrec
+import com.twitter.util.Time
+import com.twitter.gizzard.nameserver.JobRelay
+import net.lag.kestrel.config.QueueConfig
+import net.lag.kestrel.PersistentQueue
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.Executors
+import com.twitter.logging.Logger
+
+
+class JobAsyncReplicator(jobRelay: => JobRelay, queueConfig: QueueConfig, queueRootDir: String, threadsPerCluster: Int) {
+
+ // TODO make configurable
+ private val WatchdogPollInterval = 100 // 100 millis
+ private val QueuePollTimeout = 1000 // 1 second
+
+ @volatile private var queueMap: Map[String, PersistentQueue] = Map()
+ @volatile private var watchdogThread: Option[Thread] = None
+
+ private val log = Logger.get(getClass)
+ private val exceptionLog = Logger.get("exception")
+
+ private val threadpool = Executors.newCachedThreadPool()
+
+ def clusters = queueMap.keySet
+ def queues = queueMap.values.toSeq
+
+ def enqueue(job: Array[Byte]) {
+ queues foreach { _.add(job) }
+ }
+
+ def start() {
+ watchdogThread = Some(newWatchdogThread)
+ watchdogThread foreach { _.start() }
+ }
+
+ def shutdown() {
+ watchdogThread foreach { _.interrupt() }
+ queues foreach { _.close() }
+ if (!threadpool.isShutdown) threadpool.shutdown()
+ }
+
+ private final def process(cluster: String) {
+ try {
+ ignoreInterrupt {
+ queueMap.get(cluster) foreach { queue =>
+
+ while (!queue.isClosed) {
+ queue.removeReceive(removeTimeout, true) foreach { item =>
+ try {
+ jobRelay(cluster)(Seq(item.data))
+ queue.confirmRemove(item.xid)
+ } catch { case e =>
+ exceptionLog.error(e, "Exception in job replication for cluster %s: %s", cluster, e.toString)
+ queue.unremove(item.xid)
+ }
+ }
+ }
+ }
+ }
+ } catch {
+ case e =>
+ exceptionLog.error(e, "Uncaught exception in job replication for cluster %s: %s", cluster, e.toString)
+ }
+ }
+
+ def reconfigure() {
+ // save the relay so we work on a single instance in case it is reconfigured again.
+ val r = jobRelay
+
+ if (r.clusters != clusters) {
+ queues foreach { _.close() }
+
+ val qs = (r.clusters foldLeft Map[String, PersistentQueue]()) { (m, c) =>
+ m + (c -> newQueue(c))
+ }
+
+ queueMap = qs
+
+ qs.values foreach { _.setup }
+
+ for (c <- qs.keys; i <- 0 until threadsPerCluster) {
+ threadpool.submit(newProcessor(c))
+ }
+ }
+
+ // wait before returning
+ Thread.sleep(WatchdogPollInterval)
+ }
+
+
+ // Helpers
+
+ private def removeTimeout = {
+ Some(Time.fromMilliseconds(System.currentTimeMillis + QueuePollTimeout))
+ }
+
+ private def newQueue(cluster: String) = {
+ new PersistentQueue("replicating_" + cluster, queueRootDir, queueConfig)
+ }
+
+ private def newProcessor(cluster: String) = {
+ new Runnable { def run() { process(cluster) } }
+ }
+
+ private def ignoreInterrupt[A](f: => A) = try {
+ f
+ } catch {
+ case e: InterruptedException => ()
+ case e: ClosedByInterruptException => ()
+ }
+
+ private def newWatchdogThread = new Thread {
+ override def run() {
+ try {
+ ignoreInterrupt {
+ while (!isInterrupted) reconfigure()
+ }
+ } catch {
+ case e =>
+ exceptionLog.error(e, "Uncaught exception in watchdog thread")
+ }
+ }
+ }
+}
@@ -32,6 +32,8 @@ class JobScheduler(
val errorLimit: Int,
val flushLimit: Int,
val jitterRate: Float,
+ val isReplicated: Boolean,
+ jobAsyncReplicator: JobAsyncReplicator,
val queue: JobQueue,
val errorQueue: JobQueue,
val badJobQueue: JobQueue)
@@ -149,6 +151,10 @@ extends Process with JobConsumer {
try {
val job = ticket.job
try {
+ if (isReplicated && job.shouldReplicate && !job.wasReplicated) {
+ jobAsyncReplicator.enqueue(job.toJsonBytes)
+ job.setReplicated()
+ }
job()
Stats.incr("job-success-count")
} catch {
@@ -159,7 +165,6 @@ extends Process with JobConsumer {
case e =>
Stats.incr("job-error-count")
exceptionLog.error(e, "Job: %s", job)
-// log.error(e, "Error in Job: %s - %s", job, e)
job.errorCount += 1
job.errorMessage = e.toString
if (job.errorCount > errorLimit) {
@@ -26,6 +26,8 @@ class JsonCodec(unparsableJobHandler: Array[Byte] => Unit) {
protected val processors = {
val p = mutable.Map.empty[Regex, JsonJobParser]
p += (("JsonNestedJob".r, new JsonNestedJobParser(this)))
+ p += (("ReplicatedJob".r, new ReplicatedJobParser(this)))
+
// for backward compat:
p += (("JobWithTasks".r, new JsonNestedJobParser(this)))
p += (("SchedulableWithTasks".r, new JsonNestedJobParser(this)))
Oops, something went wrong.

0 comments on commit 8ee20c2

Please sign in to comment.