Permalink
Browse files

expose active threads

  • Loading branch information...
1 parent 43938d7 commit cfa3efefd1705f823ee6e62586ed497ee32f6068 Kyle Maxwell committed Jan 14, 2011
@@ -7,6 +7,7 @@ import com.twitter.util.TimeConversions._
import net.lag.configgy.ConfigMap
import net.lag.kestrel.PersistentQueue
import net.lag.logging.Logger
+import java.util.concurrent.atomic.AtomicInteger
import shards.{ShardBlackHoleException, ShardRejectedOperationException}
object JobScheduler {
@@ -85,6 +86,9 @@ class JobScheduler[J <: Job](val name: String,
private val log = Logger.get(getClass.getName)
var workerThreads: Collection[BackgroundProcess] = Nil
@volatile var running = false
+ private var _activeThreads = new AtomicInteger
+
+ def activeThreads = _activeThreads.get()
val retryTask = new BackgroundProcess("Retry process for " + name + " errors") {
def runLoop() {
@@ -133,8 +137,7 @@ class JobScheduler[J <: Job](val name: String,
log.info("Pausing work in JobScheduler: %s", queue)
queue.pause()
errorQueue.pause()
- workerThreads.foreach { _.shutdown() }
- workerThreads = Nil
+ shutdownWorkerThreads()
}
def resume() = {
@@ -146,13 +149,19 @@ class JobScheduler[J <: Job](val name: String,
}
def shutdown() {
- log.info("Shutting down JobScheduler: %s", queue)
- queue.shutdown()
- errorQueue.shutdown()
+ if(running) {
@robey
robey Jan 14, 2011

if (running) {

+ log.info("Shutting down JobScheduler: %s", queue)
+ queue.shutdown()
+ errorQueue.shutdown()
+ shutdownWorkerThreads()
+ retryTask.shutdown()
+ running = false
+ }
+ }
+
+ private def shutdownWorkerThreads() {
workerThreads.foreach { _.shutdown() }
workerThreads = Nil
- retryTask.shutdown()
- running = false
}
def isShutdown = queue.isShutdown
@@ -177,29 +186,34 @@ class JobScheduler[J <: Job](val name: String,
}
def process() {
- queue.get().foreach { ticket =>
- val job = ticket.job
+ queue.get.foreach { ticket =>
+ _activeThreads.incrementAndGet()
try {
- job()
- Stats.incr("job-success-count")
- } catch {
- case e: ShardBlackHoleException =>
- Stats.incr("job-blackholed-count")
- case e: ShardRejectedOperationException =>
- Stats.incr("job-darkmoded-count")
- errorQueue.put(job)
- case e =>
- Stats.incr("job-error-count")
- log.error(e, "Error in Job: %s - %s", job, e)
- job.errorCount += 1
- job.errorMessage = e.toString
- if (job.errorCount > errorLimit) {
- badJobQueue.foreach { _.put(job) }
- } else {
+ val job = ticket.job
+ try {
+ job()
@robey
robey Jan 14, 2011

some weird indentation here...

+ Stats.incr("job-success-count")
+ } catch {
+ case e: ShardBlackHoleException =>
+ Stats.incr("job-blackholed-count")
+ case e: ShardRejectedOperationException =>
+ Stats.incr("job-darkmoded-count")
errorQueue.put(job)
- }
+ case e =>
+ Stats.incr("job-error-count")
+ log.error(e, "Error in Job: %s - %s", job, e)
+ job.errorCount += 1
+ job.errorMessage = e.toString
+ if (job.errorCount > errorLimit) {
+ badJobQueue.foreach { _.put(job) }
+ } else {
+ errorQueue.put(job)
+ }
+ }
@robey
robey Jan 14, 2011

...and here. vi sucks! :)

+ ticket.ack()
+ } finally {
+ _activeThreads.decrementAndGet()
}
- ticket.ack()
}
}
}
@@ -45,4 +45,5 @@ class PrioritizingJobScheduler[J <: Job](val _schedulers: Map[Int, JobScheduler[
def retryErrors() = schedulers.values.foreach { _.retryErrors() }
def size = schedulers.values.foldLeft(0) { _ + _.size }
+ def activeThreads = schedulers.values.foldLeft(0) { _ + _.activeThreads }
}

0 comments on commit cfa3efe

Please sign in to comment.