Permalink
Browse files

util-core: Move slow task detection into a ProxyTimer

Summary: Problem / Solution

We can make the tools for detecting slow timer tasks to a ProxyTimer
to make them more generic. They can now be enabled and turned in
the DefaultTimer via global flags.

JIRA Issues: CSL-4838

Differential Revision: https://phabricator.twitter.biz/D70279
  • Loading branch information...
bryce-anderson authored and jenkins committed Jul 17, 2017
1 parent 4225006 commit 7c8425d95e45771ff88e8d23857f9f8026bb09aa
View
@@ -8,6 +8,9 @@ Next Version
New Features:
* util-core: Added `c.t.util.SlowProbeProxyTimer` for monitoring the duration
of execution for timer tasks. ``PHAB_ID=D70279``
* util-core: Introduced RootMonitor#set to set custom Monitor to RootMonitor.
``PHAB_ID=D70876``
@@ -0,0 +1,74 @@
package com.twitter.util
/**
* An abstract [[ProxyTimer]] that provides callback methods which are called when
* a task takes longer than the specified maximum runtime or if a task is observed
* to be taking longer than the specified maximum runtime.
*
* @note Observation of slow task execution is performed when scheduling more work to
* avoid the overhead of another thread or timer checking on tasks. This results
* in lower overhead but means that slow running tasks may not be observed while
* executing. However, they will trigger a callback to the `slowTaskCompleted`
* regardless of whether additional work is scheduled.
*
* @note This makes assumptions that the underlying `Timer` will execute tasks
* sequentially in order to catch slow running tasks during execution. If the
* underlying `Timer` executes tasks in parallel the callback `slowTaskExecuting`
* will become unreliable. However, the `slowTaskCompleted` callback will remain
* reliable but must be a thread-safe implementation.
*/
abstract class SlowProbeProxyTimer(maxRuntime: Duration) extends ProxyTimer {
/**
* Called when a task takes longer than the specified maximum duration
*/
protected def slowTaskCompleted(elapsed: Duration): Unit
/**
* Called when a task is observed to be executing longer than the specified
* maximum duration
*/
protected def slowTaskExecuting(elapsed: Duration): Unit
@volatile
private[this] var lastStartAt = Time.Top
// let another thread check if the timer thread has been slow.
// while this could be the timer thread scheduling more work,
// we expect that at some point another thread will schedule something.
// while this relies on application's doing scheduling to trigger
// the findings, we expect that to be common. the alternative would've
// been to use a separate dedicated thread, but the cost didn't seem
// worth the benefits to me.
override protected def scheduleOnce(when: Time)(f: => Unit): TimerTask = {
checkSlowTask()
self.schedule(when)(meterTask(f))
}
override protected def schedulePeriodically(
when: Time,
period: Duration
)(f: => Unit): TimerTask = {
checkSlowTask()
self.schedule(when, period)(meterTask(f))
}
private[this] def checkSlowTask(): Unit = {
val elapsed = Time.now - lastStartAt
if (elapsed > maxRuntime) slowTaskExecuting(elapsed)
}
private[this] def meterTask(f: => Unit): Unit = {
// mark this task as started, then finished in a finally block.
val started = Time.now
lastStartAt = started
try f
finally {
val elapsed = Time.now - started
lastStartAt = Time.Top
if (elapsed > maxRuntime) slowTaskCompleted(elapsed)
}
}
override def toString: String = s"${getClass.getSimpleName}($self)"
}
@@ -0,0 +1,101 @@
package com.twitter.util
import com.twitter.conversions.time._
import org.scalatest.FunSuite
import scala.collection.mutable
class SlowProbeProxyTimerTest extends FunSuite {
private type Task = () => Unit
private val NullTask = new TimerTask { def cancel(): Unit = () }
private val maxRuntime = 20.milliseconds
private class TestSlowProbeProxyTimer extends SlowProbeProxyTimer(maxRuntime) {
val scheduledTasks: mutable.Queue[Task] = new mutable.Queue[Task]()
var slowTaskDuration: Option[Duration] = None
var slowTaskExecuting: Option[Duration] = None
protected def slowTaskCompleted(elapsed: Duration): Unit = { slowTaskDuration = Some(elapsed) }
protected def slowTaskExecuting(elapsed: Duration): Unit = { slowTaskExecuting = Some(elapsed) }
protected val self: Timer = new Timer {
protected def scheduleOnce(when: Time)(f: => Unit): TimerTask = {
scheduledTasks.enqueue(() => f)
NullTask
}
protected def schedulePeriodically(when: Time, period: Duration)(f: => Unit): TimerTask =
schedule(when)(f)
def stop(): Unit = ()
}
}
test("tasks that don't exceed the deadline are not counted and the slow-task hook is not fired") {
val meteredTimer = new TestSlowProbeProxyTimer
val now = Time.now
Time.withTimeFunction(now) { control =>
meteredTimer.schedule(Time.now) {
control.advance(maxRuntime)
}
assert(meteredTimer.slowTaskDuration.isEmpty)
assert(meteredTimer.slowTaskExecuting.isEmpty)
val task = meteredTimer.scheduledTasks.dequeue()
task() // execute the task
assert(meteredTimer.slowTaskDuration.isEmpty)
assert(meteredTimer.slowTaskExecuting.isEmpty) // no work was scheduled
}
}
test("slow tasks are counted even if other work is not scheduled") {
val meteredTimer = new TestSlowProbeProxyTimer
val now = Time.now
val taskDuration = maxRuntime + 1.millisecond
Time.withTimeFunction(now) { control =>
meteredTimer.schedule(Time.now) {
control.advance(taskDuration)
}
assert(meteredTimer.slowTaskDuration.isEmpty)
assert(meteredTimer.slowTaskExecuting.isEmpty)
val task = meteredTimer.scheduledTasks.dequeue()
task() // execute the task
assert(meteredTimer.slowTaskDuration == Some(taskDuration))
assert(meteredTimer.slowTaskExecuting.isEmpty) // no work was scheduled
}
}
test("scheduling work during a slow task fires the slow-tast hook") {
val meteredTimer = new TestSlowProbeProxyTimer
val now = Time.now
val taskDuration = maxRuntime + 1.millisecond
Time.withTimeFunction(now) { control =>
meteredTimer.schedule(Time.now) {
// A task that takes 21 milliseconds to schedule more work.
control.advance(taskDuration)
meteredTimer.schedule(Time.now) { () /* Boring task :/ */ }
}
assert(meteredTimer.slowTaskDuration.isEmpty)
assert(meteredTimer.slowTaskExecuting.isEmpty)
val task = meteredTimer.scheduledTasks.dequeue()
task() // execute the task
assert(meteredTimer.slowTaskDuration == Some(taskDuration))
assert(meteredTimer.slowTaskExecuting == Some(taskDuration))
}
}
}

0 comments on commit 7c8425d

Please sign in to comment.