Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Add a top-level exception handler to the gizzard scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
John Corwin committed Mar 22, 2012
1 parent 38f0710 commit dd690db
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 35 deletions.
76 changes: 41 additions & 35 deletions src/main/scala/com/twitter/gizzard/scheduler/JobScheduler.scala
Expand Up @@ -142,50 +142,56 @@ extends Process with JobConsumer {

// hook to let unit tests stub out threads.
protected def processWork() {
process()
process()
}

def process() {
queue.get.foreach { ticket =>
_activeThreads.incrementAndGet()
try {
val job = ticket.job
try {
queue.get.foreach { ticket =>
_activeThreads.incrementAndGet()
try {
if (isReplicated && job.shouldReplicate && !job.wasReplicated) {
jobAsyncReplicator.enqueue(job.toJsonBytes)
job.setReplicated()
}
job()
Stats.incr("job-success-count")
} catch {
case e: ShardBlackHoleException => Stats.incr("job-blackholed-count")
case e: ShardOfflineException =>
Stats.incr("job-blocked-count")
errorQueue.put(job)
case e: BadJsonJobException =>
badJobQueue.put(job)
Logger.get("bad_jobs").error(job.toString)
Stats.incr("job-bad-count")
case e =>
Stats.incr("job-error-count")
exceptionLog.error(e, "Job: %s", job)
job.errorCount += 1
job.errorMessage = e.toString
if (job.errorCount > errorLimit) {
val job = ticket.job
try {
if (isReplicated && job.shouldReplicate && !job.wasReplicated) {
jobAsyncReplicator.enqueue(job.toJsonBytes)
job.setReplicated()
}
job()
Stats.incr("job-success-count")
} catch {
case e: ShardBlackHoleException => Stats.incr("job-blackholed-count")
case e: ShardOfflineException =>
Stats.incr("job-blocked-count")
errorQueue.put(job)
case e: BadJsonJobException =>
badJobQueue.put(job)
Logger.get("bad_jobs").error(job.toString)
Stats.incr("job-bad-count")
} else {
errorQueue.put(job)
}
}
job.nextJob match {
case None => ticket.ack()
case _ => ticket.continue(job.nextJob.get)
case e =>
Stats.incr("job-error-count")
exceptionLog.error(e, "Job: %s", job)
job.errorCount += 1
job.errorMessage = e.toString
if (job.errorCount > errorLimit) {
badJobQueue.put(job)
Logger.get("bad_jobs").error(job.toString)
Stats.incr("job-bad-count")
} else {
errorQueue.put(job)
}
}
job.nextJob match {
case None => ticket.ack()
case _ => ticket.continue(job.nextJob.get)
}
} finally {
_activeThreads.decrementAndGet()
}
} finally {
_activeThreads.decrementAndGet()
}
} catch {
case e =>
log.error(e, "Uncaught exception in gizzard worker thread")
Stats.incr("uncaught-exceptions")
}
}
}
Expand Up @@ -7,6 +7,7 @@ import org.specs.mock.{ClassMocker, JMocker}
import com.twitter.gizzard.shards._
import com.twitter.gizzard.ConfiguredSpecification
import com.twitter.gizzard.nameserver.JobRelay
import com.twitter.gizzard.Stats


class JobSchedulerSpec extends ConfiguredSpecification with JMocker with ClassMocker {
Expand Down Expand Up @@ -182,6 +183,16 @@ class JobSchedulerSpec extends ConfiguredSpecification with JMocker with ClassMo
jobScheduler.process()
}

"handle queue errors" in {
Stats.clearAll()
expect {
one(queue).get() willThrow new Exception("bad queue")
}

jobScheduler.process()
Stats.getCounter("uncaught-exceptions")() mustEqual 1
}

"too many errors" in {
expect {
one(queue).get() willReturn Some(ticket1)
Expand Down

0 comments on commit dd690db

Please sign in to comment.