Skip to content

Commit

Permalink
finagle-core: Introduce .withDeadlines API for admission control
Browse files Browse the repository at this point in the history
Problem:

We would like deadline admission control to be turned on by default.

Solution:

Introduce new APIs so that servers can be configured with
`Server.withAdmissionControl.deadlines` to turn deadlines on,
`.darkModeDeadlines` to turn deadlines on in dark mode, and `.noDeadlines` so
that `DeadlineFilter` can be turned on off. We also introduce more params for
configuring `DeadlineFilter`'s window for when `DeadlineFilter` is enabled or in
dark mode.

JIRA Issues: CSL-6304

Differential Revision: https://phabricator.twitter.biz/D172402
  • Loading branch information
isabelmartin authored and jenkins committed May 24, 2018
1 parent 2c89cb9 commit cb6975a
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 34 deletions.
8 changes: 8 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ New Features
* finagle-core: Make `Filter.TypeAgnostic` an abstract class for Java usability.
``PHAB_ID=D172716``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~
* finagle-core: Rename `DeadlineFilter.Param(maxRejectFraction)` to
`DeadlineFilter.MaxRejectFraction(maxRejectFraction)` to reduce confusion
when adding additional params.
``PHAB_ID=D172402``


Bug Fixes
~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.twitter.finagle.param
import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle.Stack
import com.twitter.finagle.filter.RequestSemaphoreFilter
import com.twitter.finagle.service.DeadlineFilter

/**
* A collection of methods for configuring the server-side admission control modules
Expand Down Expand Up @@ -30,4 +31,25 @@ class ServerAdmissionControlParams[A <: Stack.Parameterized[A]](self: Stack.Para

self.configured(RequestSemaphoreFilter.Param(semaphore))
}

/**
* Configures mode for `DeadlineFilter` to `Enabled`. (default: `Disabled`)
*/
def deadlines: A = {
self.configured(DeadlineFilter.Mode(DeadlineFilter.Mode.Enabled))
}

/**
* Configures mode for `DeadlineFilter` to `DarkMode`. (default: `Disabled`)
*/
def darkModeDeadlines: A = {
self.configured(DeadlineFilter.Mode(DeadlineFilter.Mode.DarkMode))
}

/**
* Configures mode for `DeadlineFilter` to `Disabled`. (default: `Disabled`)
*/
def noDeadlines: A = {
self.configured(DeadlineFilter.Mode(DeadlineFilter.Mode.Disabled))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.twitter.finagle._
import com.twitter.finagle.context.Deadline
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.logging.{HasLogLevel, Level}
import com.twitter.util.{Future, Duration, Stopwatch, Time, TokenBucket}
import com.twitter.util.{Duration, Future, Stopwatch, Time, TokenBucket}

/**
* DeadlineFilter provides an admission control module that can be pushed onto the stack to
Expand All @@ -31,60 +31,128 @@ object DeadlineFilter {
// usage with the Double `maxRejectFraction`
private[service] val RejectBucketScaleFactor = 1000.0

/**
* A class eligible for configuring a [[com.twitter.finagle.Stackable]]
* [[com.twitter.finagle.service.DeadlineFilter]] module.
*
* @param rejectPeriod No more than `maxRejectFraction` of requests will be
* discarded over the `rejectPeriod`. Must be `>= 1 second` and `<= 60 seconds`.
*/
case class RejectPeriod(rejectPeriod: Duration) {
require(
rejectPeriod.inSeconds >= 1 && rejectPeriod.inSeconds <= 60,
s"rejectPeriod must be [1 second, 60 seconds]: $rejectPeriod"
)

def mk(): (RejectPeriod, Stack.Param[RejectPeriod]) =
(this, RejectPeriod.param)
}

object RejectPeriod {
implicit val param: Stack.Param[DeadlineFilter.RejectPeriod] =
Stack.Param(RejectPeriod(DefaultRejectPeriod))
}

/**
* A class eligible for configuring a [[com.twitter.finagle.Stackable]]
* [[com.twitter.finagle.service.DeadlineFilter]] module.
*
* @param maxRejectFraction Maximum fraction of requests that can be
* rejected over `rejectPeriod`. Must be between 0.0 and 1.0.
*/
case class Param(maxRejectFraction: Double) {
case class MaxRejectFraction(maxRejectFraction: Double) {
require(
maxRejectFraction >= 0.0 && maxRejectFraction <= 1.0,
s"maxRejectFraction must be between 0.0 and 1.0: $maxRejectFraction"
)

def mk(): (Param, Stack.Param[Param]) =
(this, Param.param)
def mk(): (MaxRejectFraction, Stack.Param[MaxRejectFraction]) =
(this, MaxRejectFraction.param)
}
object MaxRejectFraction {
implicit val param: Stack.Param[DeadlineFilter.MaxRejectFraction] =
Stack.Param(MaxRejectFraction(DefaultMaxRejectFraction))
}

/**
* A class eligible for configuring a [[com.twitter.finagle.Stackable]]
* [[com.twitter.finagle.service.DeadlineFilter]] module.
*
* @param param `Disabled` will omit `DeadlineFilter` from the server
* stack. `DarkMode` will collect stats about deadlines but not reject requests.
* `Enabled` turns `DeadlineFilter` on.
*/
private[finagle] case class Mode(param: Mode.FilterMode) {

def mk(): (Mode, Stack.Param[Mode]) =
(this, Mode.param)
}
object Param {
implicit val param: Stack.Param[DeadlineFilter.Param] =
Stack.Param(Param(DefaultMaxRejectFraction))

private[finagle] case object Mode {

sealed trait FilterMode
case object Disabled extends FilterMode
case object DarkMode extends FilterMode
case object Enabled extends FilterMode

val Default: FilterMode = Disabled

implicit val param: Stack.Param[Mode] = Stack.Param(Mode(Default))
}

/**
* Creates a [[com.twitter.finagle.Stackable]]
* [[com.twitter.finagle.service.DeadlineFilter]].
*/
def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module2[param.Stats, DeadlineFilter.Param, ServiceFactory[Req, Rep]] {
new Stack.Module4[
param.Stats,
DeadlineFilter.RejectPeriod,
DeadlineFilter.MaxRejectFraction,
DeadlineFilter.Mode,
ServiceFactory[Req, Rep]]
{
val role = new Stack.Role("DeadlineFilter")
val description = "Reject requests when their deadline has passed"

def make(
_stats: param.Stats,
_param: DeadlineFilter.Param,
_rejectPeriod: DeadlineFilter.RejectPeriod,
_maxRejectFraction: DeadlineFilter.MaxRejectFraction,
mode: DeadlineFilter.Mode,
next: ServiceFactory[Req, Rep]
) = {
val Param(maxRejectFraction) = _param
) =
mode match {
case Mode(Mode.DarkMode) | Mode(Mode.Enabled) =>
val rejectPeriod = _rejectPeriod.rejectPeriod
val maxRejectFraction = _maxRejectFraction.maxRejectFraction

if (maxRejectFraction <= 0.0) next
else {
val param.Stats(statsReceiver) = _stats
val scopedStatsReceiver = statsReceiver.scope("admission_control", "deadline")
if (maxRejectFraction <= 0.0) next
else {
val param.Stats(statsReceiver) = _stats
val scopedStatsReceiver = statsReceiver.scope("admission_control", "deadline")
val darkMode = mode == Mode(Mode.DarkMode)

new ServiceFactoryProxy[Req, Rep](next) {
new ServiceFactoryProxy[Req, Rep](next) {

private[this] val newDeadlineFilter: Service[Req, Rep] => Service[Req, Rep] = service =>
new DeadlineFilter(DefaultRejectPeriod, maxRejectFraction, scopedStatsReceiver)
.andThen(service)
private[this] val newDeadlineFilter: Service[Req, Rep] => Service[Req, Rep] =
service =>
new DeadlineFilter(
rejectPeriod = rejectPeriod,
maxRejectFraction = maxRejectFraction,
statsReceiver = scopedStatsReceiver,
isDarkMode = darkMode)
.andThen(service)

override def apply(conn: ClientConnection): Future[Service[Req, Rep]] =
// Create a DeadlineFilter per connection, so we don't share the state of the token
// bucket for rejecting requests.
next(conn).map(newDeadlineFilter)
}
}
override def apply(conn: ClientConnection): Future[Service[Req, Rep]] =
// Create a DeadlineFilter per connection, so we don't share the state of the token
// bucket for rejecting requests.
next(conn).map(newDeadlineFilter)
}
}

case _ =>
next
}
}

Expand Down Expand Up @@ -119,15 +187,31 @@ object DeadlineFilter {
* @param statsReceiver for stats reporting, typically scoped to
* ".../admission_control/deadline/"
* @param nowMillis current time in milliseconds
* @param isDarkMode DarkMode will collect stats but not reject requests
* @see The [[https://twitter.github.io/finagle/guide/Servers.html#request-deadline user guide]]
* for more details.
*/
class DeadlineFilter[Req, Rep](
rejectPeriod: Duration = DeadlineFilter.DefaultRejectPeriod,
maxRejectFraction: Double = DeadlineFilter.DefaultMaxRejectFraction,
statsReceiver: StatsReceiver,
nowMillis: () => Long = Stopwatch.systemMillis
nowMillis: () => Long = Stopwatch.systemMillis,
isDarkMode: Boolean
) extends SimpleFilter[Req, Rep] {

def this(
rejectPeriod: Duration,
maxRejectFraction: Double,
statsReceiver: StatsReceiver,
nowMillis: () => Long
) =
this(
rejectPeriod,
maxRejectFraction,
statsReceiver,
nowMillis,
false)

import DeadlineFilter.DeadlineExceededException

require(
Expand Down Expand Up @@ -167,9 +251,12 @@ class DeadlineFilter[Req, Rep](
// There are enough tokens to reject the request
if (rejectBucket.tryGet(rejectWithdrawal)) {
rejectedStat.incr()
Future.exception(
new DeadlineExceededException(timestamp, deadline, exceeded, now)
)
if (isDarkMode)
service(request)
else
Future.exception(
new DeadlineExceededException(timestamp, deadline, exceeded, now)
)
} else {
rejectBucket.put(serviceDeposit)
service(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class DeadlineFilterTest extends FunSuite with MockitoSugar with OneInstancePerT
rejectPeriod = 10.seconds,
maxRejectFraction = 0.2,
statsReceiver = statsReceiver,
nowMillis = Stopwatch.timeMillis
nowMillis = Stopwatch.timeMillis,
isDarkMode = false
)

val deadlineService = deadlineFilter.andThen(service)
Expand Down Expand Up @@ -92,6 +93,32 @@ class DeadlineFilterTest extends FunSuite with MockitoSugar with OneInstancePerT
}
}

test(
"When the deadline is exceeded and the reject token bucket contains sufficient tokens but we " +
"are in dark mode, DeadlineFilter should service the request and increment the exceeded " +
"and rejected stats"
) {
val darkModeDeadlineFilter = new DeadlineFilter[String, String](
rejectPeriod = 10.seconds,
maxRejectFraction = 0.2,
statsReceiver = statsReceiver,
nowMillis = Stopwatch.timeMillis,
isDarkMode = true
)
val darkModeService = darkModeDeadlineFilter.andThen(service)

Time.withCurrentTimeFrozen { tc =>
Contexts.broadcast.let(Deadline, Deadline.ofTimeout(1.seconds)) {
for (i <- 0 until 5) Await.result(darkModeService("marco"), 1.second)
tc.advance(2.seconds)
assert(Await.result(darkModeService("marco"), 1.second) == "polo")
assert(statsReceiver.counters.get(List("exceeded")) == Some(1))
assert(statsReceiver.counters.get(List("exceeded_beyond_tolerance")) == None)
assert(statsReceiver.counters.get(List("rejected")) == Some(1))
}
}
}

test("tokens added to reject bucket on request without deadline") {
for (i <- 0 until 5) Await.result(deadlineService("marco"), 1.second)

Expand Down Expand Up @@ -166,13 +193,23 @@ class DeadlineFilterTest extends FunSuite with MockitoSugar with OneInstancePerT
}
}

test("param") {
test("MaxRejectFraction param") {
import DeadlineFilter._

val p: MaxRejectFraction = MaxRejectFraction(0.5)

val ps: Stack.Params = Stack.Params.empty + p
assert(ps.contains[MaxRejectFraction])
assert((ps[MaxRejectFraction] match { case MaxRejectFraction(d) => d }) == 0.5)
}

test("RejectPeriod param") {
import DeadlineFilter._

val p: Param = Param(0.5)
val p: RejectPeriod = RejectPeriod(5.seconds)

val ps: Stack.Params = Stack.Params.empty + p
assert(ps.contains[Param])
assert((ps[Param] match { case Param(d) => (d) }) == 0.5)
assert(ps.contains[RejectPeriod])
assert((ps[RejectPeriod] match { case RejectPeriod(d) => d }) == 5.seconds)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public static void main(String[] args) {
Server<Request, Response> newStyleServer =
com.twitter.finagle.Http
.server()
.withAdmissionControl().deadlines()
.withAdmissionControl().noDeadlines()
.withAdmissionControl().darkModeDeadlines()
.withCompressionLevel(2)
.configured(new Label("test").mk())
.withDecompression(true)
Expand Down

0 comments on commit cb6975a

Please sign in to comment.