Skip to content

Commit

Permalink
finagle-core: Track offload task delays at a given rate
Browse files Browse the repository at this point in the history
Problem

There is no visibility into how long tasks have been sitting in the offload queue. Knowing this could be
useful for tuning and troubleshooting.

Solution

Add a new stat `finagle/offload_pool/delay_ms` that records a delay between when the task is
submitted and when it's executed. For efficiency reasons, we're not instrumenting every task but
do sample the offload queue at the given interval (1 second by default).

The sampling interval can be set via the `com.twitter.finagle.offload.delaySampleInterval` flag.

JIRA Issues: CSL-10315

Differential Revision: https://phabricator.twitter.biz/D571980
  • Loading branch information
vkostyukov authored and jenkins committed Nov 7, 2020
1 parent af228ca commit a7ebf2e
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Unreleased
New Features
~~~~~~~~~~~~

* finagle-core: Add a new stat (histogram) that reports how long a task has been sitting in the
offload queue. This instrumentation is sampled at the given interval (100ms by default) that
can be overridden with a global flag `com.twitter.finagle.offload.delaySampleInterval`.
``PHAB_ID=D571980``

* finagle-core: Add a new experimental flag `com.twitter.finagle.offload.queueSize` that allows to
put bounds on the offload queue. Any excess work that can't be offloaded due to a queue overflow
is run on IO (Netty) thread instead. Put this way, this flag enables the simplest form of
Expand Down
4 changes: 4 additions & 0 deletions doc/src/sphinx/Flags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ Common
is run on IO (Netty) threads instead. Thus, when set, this flag enforces the backpressure on the
link between "Netty (producer) and your application (consumer).

**com.twitter.finagle.offload.delaySampleInterval** `duration`
When offload filter is enabled, sample offloading delay (how long a task has been sitting in the
offload queue before it gets executed) at this interval (default: `100.milliseconds`).

Netty 4
-------

Expand Down
5 changes: 5 additions & 0 deletions doc/src/sphinx/metrics/Finagle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ These metrics correspond to the state of the offload filter thread pool when con
**finagle/offload_pool/queue_depth**
A Gauge of the number of tasks that are waiting to be executed.

**finagle/offload_pool/delay_ms**
A histogram reporting offloading delay - how long a task has been sitting in the offload queue
before it gets executed. For efficiency reasons, this stat is sampled each
`com.twitter.finagle.offload.delaySampleInterval` interval.

Timer
<<<<<

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package com.twitter.finagle.filter

import com.twitter.concurrent.NamedPoolThreadFactory
import com.twitter.finagle.offload.{queueSize, numWorkers}
import com.twitter.finagle.stats.{Counter, FinagleStatsReceiver, StatsReceiver}
import com.twitter.finagle.stats.{Counter, FinagleStatsReceiver, Stat, StatsReceiver}
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.{Service, ServiceFactory, SimpleFilter, Stack, Stackable}
import com.twitter.util.{
ExecutorServiceFuturePool,
Future,
FutureNonLocalReturnControl,
FuturePool,
Promise
Promise,
Stopwatch,
Time,
Timer
}
import scala.com.twitter.finagle.offload.delaySampleInterval
import java.util.concurrent.{
ExecutorService,
LinkedBlockingQueue,
Expand All @@ -29,6 +34,30 @@ import scala.runtime.NonLocalReturnControl
*/
object OffloadFilter {

private[finagle] final class SampleDelay(pool: FuturePool, stat: Stat, timer: Timer)
extends (() => Unit) {
private class Task extends (() => Unit) {
private val submitted = Stopwatch.start()

def apply(): Unit = {
val delay = submitted()
stat.add(delay.inMilliseconds)

val nextAt = Time.now + delaySampleInterval() - delay
// NOTE: if the delay happened to be longer than the sampling interval, the nextAt would be
// negative. Scheduling a task under a negative time would force the Timer to treat it as
// "run now". Thus the offloading delay is sampled at either 'sampleInterval' or 'delay',
// whichever is longer.
timer.schedule(nextAt)(SampleDelay.this())
}
}

def apply(): Unit = {
val task = new Task
pool(task())
}
}

/**
* This handler is run when the submitted work is rejected from the ThreadPool, usually because
* its work queue has reached the proposed limit. When that happens, we simply run the work on
Expand Down Expand Up @@ -73,14 +102,18 @@ object OffloadFilter {
private[this] val ClientAnnotationKey = "clnt/finagle.offload_pool_size"
private[this] val ServerAnnotationKey = "srv/finagle.offload_pool_size"

private[this] lazy val defaultPool = {
numWorkers.get match {
case None => None
case Some(threads) =>
val stats = FinagleStatsReceiver.scope("offload_pool")
val pool = new OffloadFuturePool(new OffloadThreadPool(threads, queueSize(), stats), stats)
private[this] lazy val global: Option[FuturePool] = {
numWorkers.get.map { threads =>
val stats = FinagleStatsReceiver.scope("offload_pool")
val pool = new OffloadFuturePool(new OffloadThreadPool(threads, queueSize(), stats), stats)

// Start sampling the offload delay if the interval isn't Duration.Top.
if (delaySampleInterval().isFinite && !delaySampleInterval().isZero) {
val sampleDelay = new SampleDelay(pool, stats.stat("delay_ms"), DefaultTimer)
sampleDelay()
}

Some(pool)
pool
}
}

Expand All @@ -94,7 +127,7 @@ object OffloadFilter {
final case object Disabled extends Param

implicit val param: Stack.Param[Param] = new Stack.Param[Param] {
lazy val default: Param = defaultPool.map(Enabled(_)).getOrElse(Disabled)
lazy val default: Param = global.map(Enabled.apply).getOrElse(Disabled)

override def show(p: Param): Seq[(String, () => String)] = {
val enabledStr = p match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package scala.com.twitter.finagle.offload

import com.twitter.app.GlobalFlag
import com.twitter.util.Duration

object delaySampleInterval
extends GlobalFlag[Duration](
Duration.fromMilliseconds(100),
"Sample offload filter queue at this interval and record task delay (in milliseconds) under " +
"offload_pool/delay_ms stat (default: 100ms). Setting this flag to either 'duration.top', " +
"'duration.bottom', or zero disables both the stat and sampling."
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package com.twitter.finagle.filter
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.Service
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.filter.OffloadFilter.OffloadThreadPool
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.{Await, Awaitable, Future, FuturePool, Promise}
import com.twitter.util.{Await, Awaitable, Future, FuturePool, MockTimer, Promise, Time}
import com.twitter.finagle.util.DefaultTimer.Implicit
import java.util.concurrent.{CountDownLatch, Executors}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import scala.collection.mutable.ArrayBuffer

class OffloadFilterTest extends FunSuite with BeforeAndAfterAll {
private class ExpectedException extends Exception("boom")
Expand Down Expand Up @@ -157,9 +157,50 @@ class OffloadFilterTest extends FunSuite with BeforeAndAfterAll {
assert(await(Contexts.local.let(key, 4) { s() }) == Some(4))
}

class MockFuturePool extends FuturePool {
private val queue = ArrayBuffer.empty[() => Any]
def apply[T](f: => T): Future[T] = {
queue += { () => f }
Future.never
}

def runAll(): Unit = {
queue.foreach(f => f())
queue.clear()
}

def isEmpty: Boolean = queue.isEmpty
}

test("sample delay should sample the delay") {
val stats = new InMemoryStatsReceiver
val pool = new MockFuturePool
val timer = new MockTimer
val sampleDelay = new OffloadFilter.SampleDelay(pool, stats.stat("delay"), timer)
Time.withCurrentTimeFrozen { ctrl =>
sampleDelay()

ctrl.advance(50.milliseconds)
pool.runAll()
assert(stats.stats(Seq("delay")) == Seq(50))
assert(timer.tasks.nonEmpty)
assert(pool.isEmpty)

ctrl.advance(50.milliseconds)
timer.tick()
assert(timer.tasks.isEmpty)
assert(!pool.isEmpty)

ctrl.advance(200.milliseconds)
pool.runAll()

assert(stats.stats(Seq("delay")) == Seq(50, 200))
}
}

test("rejection handler does what it's supposed to do") {
val stats = new InMemoryStatsReceiver
val pool = new OffloadThreadPool(1, 1, stats)
val pool = new OffloadFilter.OffloadThreadPool(1, 1, stats)
val blockOnMe = new Promise[Unit] with Runnable {
def run(): Unit = Await.result(this)
}
Expand Down

0 comments on commit a7ebf2e

Please sign in to comment.