Skip to content

Commit

Permalink
finagle-core: Allow to bound an offload filter queue
Browse files Browse the repository at this point in the history
Problem

There is no backpressure in the link between Netty (producer) and OffloadFilter (consumer), which can
result in service degradation under a high load.

Solution

Allow users to put a bound on an offload queue (default is unbounded). Any excess work that can't be
offloaded is run directly on Netty threads, resembling the behavior before the OffloadFilter. The simple
machinery along with a carefully pick queue size value achieves the backpressure in services with offload
filter enabled.

h/t @FBrasil for the implementation idea.

JIRA Issues: CSL-10316

Differential Revision: https://phabricator.twitter.biz/D573328
  • Loading branch information
vkostyukov authored and jenkins committed Nov 6, 2020
1 parent 0c7890c commit af228ca
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Unreleased
New Features
~~~~~~~~~~~~

* finagle-core: Add a new experimental flag `com.twitter.finagle.offload.queueSize` that allows to
put bounds on the offload queue. Any excess work that can't be offloaded due to a queue overflow
is run on IO (Netty) thread instead. Put this way, this flag enables the simplest form of
backpressure on the link between Netty and OffloadFilter. ``PHAB_ID=D573328``

* finagle-netty4: Add `ExternalClientEngineFactory` to the open source version of Finagle. This
`SslClientEngineFactory` acts as a better example of how to build custom client and server engine
factories in order to reuse SSL contexts for performance concerns. ``PHAB_ID=D572567``
Expand Down
6 changes: 6 additions & 0 deletions doc/src/sphinx/Flags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ Common
When this flag is greater than zero, the execution of application code happens in an isolated pool and the netty threads are used only to handle the network channels. This behavior changes the assumptions regarding the scheduling of tasks in finagle applications. Traditionally, the recommendation is to execute CPU-intensive tasks using a `FuturePool` but, when this flag is enabled, CPU-intensive tasks don't require a `FuturePool`. Important: Blocking tasks should still use a `FuturePool`.
It's important to review the allocation of thread pools when this flag is enabled otherwise the application might create too many threads, which leads to more GC pressure and increases the risk of CPU throttling.

**com.twitter.finagle.offload.queueSize** `int`
Experimental flag. When offload filter is enabled, its queue is bounded by this value (default:
"unbounded" or `Int.MaxValue`) Any excess work that can't be offloaded due to the queue overflow
is run on IO (Netty) threads instead. Thus, when set, this flag enforces the backpressure on the
link between "Netty (producer) and your application (consumer).

Netty 4
-------

Expand Down
5 changes: 5 additions & 0 deletions doc/src/sphinx/metrics/Finagle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ These metrics correspond to the state of the offload filter thread pool when con
**finagle/offload_pool/completed_tasks**
A gauge of the number of total tasks that have completed execution.

**finagle/offload_pool/not_offloaded_tasks**
A counter of how many tasks weren't offloaded because the queue has grown over a proposed limit
(set via a flag `com.twitter.finagle.offload.queueSize`). If a task can't be offloaded it is run
the caller thread which is commonly a Netty IO worker.

**finagle/offload_pool/queue_depth**
A Gauge of the number of tasks that are waiting to be executed.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.twitter.finagle.filter

import com.twitter.concurrent.NamedPoolThreadFactory
import com.twitter.finagle.offload.numWorkers
import com.twitter.finagle.stats.FinagleStatsReceiver
import com.twitter.finagle.offload.{queueSize, numWorkers}
import com.twitter.finagle.stats.{Counter, FinagleStatsReceiver, StatsReceiver}
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.{Service, ServiceFactory, SimpleFilter, Stack, Stackable}
import com.twitter.util.{
Expand All @@ -12,7 +12,13 @@ import com.twitter.util.{
FuturePool,
Promise
}
import java.util.concurrent.{ExecutorService, Executors, ThreadPoolExecutor}
import java.util.concurrent.{
ExecutorService,
LinkedBlockingQueue,
RejectedExecutionHandler,
ThreadPoolExecutor,
TimeUnit
}
import scala.runtime.NonLocalReturnControl

/**
Expand All @@ -23,33 +29,58 @@ import scala.runtime.NonLocalReturnControl
*/
object OffloadFilter {

/**
* This handler is run when the submitted work is rejected from the ThreadPool, usually because
* its work queue has reached the proposed limit. When that happens, we simply run the work on
* the current thread (a thread that was trying to offload), which is most commonly a Netty IO
* worker.
*/
private[finagle] final class RunsOnNettyThread(rejections: Counter)
extends RejectedExecutionHandler {
def rejectedExecution(r: Runnable, e: ThreadPoolExecutor): Unit = {
if (!e.isShutdown) {
rejections.incr()
r.run()
}
}
}

private[finagle] final class OffloadThreadPool(
poolSize: Int,
queueSize: Int,
stats: StatsReceiver)
extends ThreadPoolExecutor(
poolSize /*corePoolSize*/,
poolSize /*maximumPoolSize*/,
0L /*keepAliveTime*/,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](queueSize) /*workQueue*/,
new NamedPoolThreadFactory("finagle/offload", makeDaemons = true) /*threadFactory*/,
new RunsOnNettyThread(stats.counter("not_offloaded_tasks")))

private final class OffloadFuturePool(executor: ThreadPoolExecutor, stats: StatsReceiver)
extends ExecutorServiceFuturePool(executor) {
private val gauges = Seq(
stats.addGauge("pool_size") { poolSize },
stats.addGauge("active_tasks") { numActiveTasks },
stats.addGauge("completed_tasks") { numCompletedTasks },
stats.addGauge("queue_depth") { executor.getQueue.size }
)
}

private[this] val Role = Stack.Role("OffloadWorkFromIO")
private[this] val Description = "Offloading computations from IO threads"
private[this] val ClientAnnotationKey = "clnt/finagle.offload_pool_size"
private[this] val ServerAnnotationKey = "srv/finagle.offload_pool_size"

private[this] lazy val (defaultPool, defaultPoolStats) = {
private[this] lazy val defaultPool = {
numWorkers.get match {
case None =>
(None, Seq.empty)
case None => None
case Some(threads) =>
val factory = new NamedPoolThreadFactory("finagle/offload", makeDaemons = true)
val threadPool = Executors.newFixedThreadPool(threads, factory)
val pool = new ExecutorServiceFuturePool(threadPool)

val stats = FinagleStatsReceiver.scope("offload_pool")
val gauges = Seq(
stats.addGauge("pool_size") { pool.poolSize },
stats.addGauge("active_tasks") { pool.numActiveTasks },
stats.addGauge("completed_tasks") { pool.numCompletedTasks },
stats.addGauge("queue_depth") {
threadPool match {
case executor: ThreadPoolExecutor => executor.getQueue.size
case _ => -1
}
}
)
(Some(pool), gauges)
val pool = new OffloadFuturePool(new OffloadThreadPool(threads, queueSize(), stats), stats)

Some(pool)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.twitter.finagle.offload

import com.twitter.app.GlobalFlag

object queueSize
extends GlobalFlag[Int](
Int.MaxValue,
"Experimental flag. When offload filter is enabled, its queue is bounded by this value (default is" +
"Int.MaxValue or unbounded). Any excess work that can't be offloaded due to the queue overflow is run" +
"on IO (Netty) threads instead. Thus, when set, this flag enforces the backpressure on the link between" +
"Netty (producer) and your application (consumer)."
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.twitter.finagle.filter
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.Service
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.filter.OffloadFilter.OffloadThreadPool
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.{Await, Awaitable, Future, FuturePool, Promise}
import com.twitter.finagle.util.DefaultTimer.Implicit
import java.util.concurrent.{CountDownLatch, Executors}
Expand Down Expand Up @@ -154,4 +156,32 @@ class OffloadFilterTest extends FunSuite with BeforeAndAfterAll {
assert(await(s()) == None)
assert(await(Contexts.local.let(key, 4) { s() }) == Some(4))
}

test("rejection handler does what it's supposed to do") {
val stats = new InMemoryStatsReceiver
val pool = new OffloadThreadPool(1, 1, stats)
val blockOnMe = new Promise[Unit] with Runnable {
def run(): Unit = Await.result(this)
}

// block the only worker we have
pool.submit(blockOnMe)

// Take up a slot in the queue
pool.submit(new Runnable {
def run(): Unit = ()
})
assert(stats.counters(Seq("not_offloaded_tasks")) == 0)

// Submit a task for rejection
var caller: Thread = null
pool.submit(new Runnable {
def run(): Unit = caller = Thread.currentThread()
})

assert(caller eq Thread.currentThread())
assert(stats.counters(Seq("not_offloaded_tasks")) == 1)
blockOnMe.setDone()
pool.shutdown()
}
}

0 comments on commit af228ca

Please sign in to comment.