Skip to content

Commit

Permalink
finagle-core: introduce Admission controller in the server stack
Browse files Browse the repository at this point in the history
Problem

The Finagle server does not have a way to dynamically
reject requests when it's overloaded. It can go into
failure spiral without a way to recover until it's
restarted.

Solution

Introduce the `c.t.f.filter.ServerAdmissionControl` module
in the server Stack, which is enabled through param
`c.t.f.param.EnableServerAdmissionControl`. There are can be
multiple implementations of admission control filters
which are registered through `ServerAdmissionControl.register`
method. It's up to each AC filter to define their own
way of detecting server over capacity and configuration.

Server admission control is on by default in the server
Stack except for TwitterServer admin server.

Result

Provide users a way to define their own admission control
logic, and hook it up in the server Stack.

RB_ID=776385
  • Loading branch information
blackicewei authored and jenkins committed Jan 11, 2016
1 parent 25aab59 commit 0f0e228
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 11 deletions.
10 changes: 9 additions & 1 deletion CHANGES
Expand Up @@ -10,7 +10,15 @@ Note that ``RB_ID=#`` correspond to associated messages in commits.
New Features
~~~~~~~~~~~~

* finagle-core: Introduce `c.t.f.service.ResponseClassifier` which allows allows developers to
* finagle-core: Introduce the `c.t.f.filter.ServerAdmissionControl` module in the server Stack,
which is enabled through the param `c.t.f.param.EnableServerAdmissionControl`. Users can define
their own admission control filters, which reject requests when the server operates beyond
its capacity. These rejections apply backpressure and allow clients to retry requests on
servers that may not be over capacity. The filter implementation should define its own logic
to determine over capacity. One or more admission control filters can be installed through
the `ServerAdmissionControl.register` method. ``RB_ID=776385``

* finagle-core: Introduce `c.t.f.service.ResponseClassifier` which allows developers to
give Finagle the additional application specific knowledge necessary in order to properly
classify them. Without this, Finagle can only safely make judgements about transport
level failures. This is now used by `StatsFilter` and `FailureAccrualFactory` so that
Expand Down
@@ -1,20 +1,18 @@
package com.twitter.finagle.builder

import com.twitter.util
import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle.filter.{MaskCancelFilter, RequestSemaphoreFilter}
import com.twitter.finagle.netty3.channel.IdleConnectionFilter
import com.twitter.finagle.netty3.channel.OpenConnectionsThresholds
import com.twitter.finagle.{Server => FinagleServer, _}
import com.twitter.finagle.filter.{MaskCancelFilter, RequestSemaphoreFilter, ServerAdmissionControl}
import com.twitter.finagle.netty3.Netty3Listener
import com.twitter.finagle.param.ProtocolLibrary
import com.twitter.finagle.server.{StackBasedServer, Listener, StackServer, StdStackServer}
import com.twitter.finagle.netty3.channel.{IdleConnectionFilter, OpenConnectionsThresholds}
import com.twitter.finagle.server.{Listener, StackBasedServer, StackServer, StdStackServer}
import com.twitter.finagle.service.{ExpiringService, TimeoutFilter}
import com.twitter.finagle.ssl.{Ssl, Engine}
import com.twitter.finagle.ssl.{Engine, Ssl}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.TraceInitializerFilter
import com.twitter.finagle.transport.Transport
import com.twitter.finagle.util._
import com.twitter.finagle.{Server => FinagleServer, _}
import com.twitter.util
import com.twitter.util.{CloseAwaitably, Duration, Future, NullMonitor, Time}
import java.net.SocketAddress
import javax.net.ssl.SSLEngine
Expand Down Expand Up @@ -355,6 +353,14 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder](
configured(RequestSemaphoreFilter.Param(sem))
}

/**
* Configure admission control filters in the server Stack.
*
* @see [[com.twitter.finagle.filter.ServerAdmissionControl]]
*/
def enableAdmissionControl(enable: Boolean): This =
configured(ServerAdmissionControl.Param(enable))

def requestTimeout(howlong: Duration): This =
configured(TimeoutFilter.Param(howlong))

Expand Down
@@ -0,0 +1,94 @@
package com.twitter.finagle.filter

import com.twitter.finagle._
import com.twitter.finagle.Filter.TypeAgnostic
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import scala.collection.JavaConverters._

/**
* Register and install admission control filters in the server Stack.
*
* Users can define their own admission control filters, which reject requests
* when the server operates beyond its capacity. These rejections apply backpressure
* and allow clients to retry requests on servers that may not be over capacity.
* The filter implementation should define its own logic to determine over capacity.
*
* One or more admission control filters can be installed through the ``register`` method.
* The filters are installed in a specific spot in the server Stack, but their internal
* order does not matter. Admission control is enabled through
* [[ServerAdmissionControl.Param]]. Each filter should provide its own mechanism
* for enabling, disabling and configuration.
*/
private[twitter] object ServerAdmissionControl {
// a map of admission control filters, key by name
private[this] val acs: ConcurrentMap[String, TypeAgnostic] = new ConcurrentHashMap()

val role = new Stack.Role("Server Admission Controller")

/**
* A class eligible for enabling admission control filters in the server Stack.
*
* @see [[com.twitter.finagle.filter.ServerAdmissionControl]]
*/
case class Param(enabled: Boolean)
object Param {
implicit val param = new Stack.Param[Param] {
lazy val default = Param(true)
}
}

/**
* Add a filter to the list of admission control filters. If a controller
* with the same name already exists in the map, it's a no-op. It must
* be called before the server construction to take effect.
*/
def register(name: String, filter: TypeAgnostic): Unit =
acs.putIfAbsent(name, filter)

/**
* Add multiple filters to the list of admission control filters. If a controller
* with the same name already exists in the map, it's a no-op. It must
* be called before the server construction to take effect.
*/
def register(pairs: (String, TypeAgnostic)*): Unit =
pairs.foreach { case (name, filter) =>
acs.putIfAbsent(name, filter)
}


/**
* Remove a filter from the list of admission control filters. If the map
* does not contain a controller with the name, it's a no-op. It must
* be called before the server construction to take effect.
*/
def unregister(name: String): Unit = acs.remove(name)

/**
* Clear all filters from the list of admission control filters.
*/
def unregisterAll(): Unit = acs.clear()

def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] = {
new Stack.Module1[Param, ServiceFactory[Req, Rep]] {
val role = ServerAdmissionControl.role
val description = "Proactively reject requests when the server operates beyond its capacity"
def make(
_enabled: Param,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
val Param(enabled) = _enabled

if (!enabled || acs.isEmpty) {
next
} else {
// assume the order of filters doesn't matter
val typeAgnosticFilters =
acs.values.asScala.foldLeft(Filter.TypeAgnostic.Identity){ case (sum, f) =>
f.andThen(sum)
}
typeAgnosticFilters.toFilter.andThen(next)
}
}
}
}
}
Expand Up @@ -193,4 +193,4 @@ object ExceptionStatsHandler {
// static initialization.
lazy val default = ExceptionStatsHandler(StatsFilter.DefaultExceptions)
}
}
}
Expand Up @@ -63,6 +63,10 @@ object StackServer {
// DeadlineFilter. Thus, DeadlineFilter is pushed before TimeoutFilter.
stk.push(DeadlineFilter.module)
stk.push(DtabStatsFilter.module)
// Admission Control filters are inserted after `StatsFilter` so that rejected
// requests are counted. We may need to adjust how latency are recorded
// to exclude Nack response from latency stats, CSL-2306.
stk.push(ServerAdmissionControl.module)
stk.push(StatsFilter.module)
stk.push(RequestSemaphoreFilter.module)
stk.push(MaskCancelFilter.module)
Expand Down
Expand Up @@ -15,7 +15,7 @@ import com.twitter.util.Duration
* dbl: Double = 123.32
* }}}
*/
private[finagle] object parsers {
private[twitter] object parsers {

object list {
def unapplySeq(s: String): Option[List[String]] =
Expand Down
@@ -0,0 +1,125 @@
package com.twitter.finagle.filter

import com.twitter.conversions.time._
import com.twitter.finagle.Filter.TypeAgnostic
import com.twitter.finagle._
import com.twitter.finagle.server.StackServer
import com.twitter.finagle.stack.Endpoint
import com.twitter.util.{Await, Future}
import java.util.concurrent.atomic.AtomicInteger
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
import org.scalatest.mock.MockitoSugar

@RunWith(classOf[JUnitRunner])
class ServerAdmissionControlTest extends FunSuite with MockitoSugar {
class Ctx {
val a = new AtomicInteger(1)

class AdditionFilter[Req, Rep](delta: Int) extends SimpleFilter[Req, Rep] {
def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = {
a.addAndGet(delta)
service(req)
}
}

object Addition2Filter {
val name = "multiple 2"

val typeAgnostic: TypeAgnostic =
new TypeAgnostic {
override def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] =
new AdditionFilter(2)
}
}

object Addition3Filter {
val name = "multiple 3"

val typeAgnostic: TypeAgnostic =
new TypeAgnostic {
override def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] =
new AdditionFilter(3)
}
}

ServerAdmissionControl.unregisterAll()

val echo = ServiceFactory.const(Service.mk[Int, Int](v => Future.value(v)))
val stack = StackServer.newStack[Int, Int] ++ Stack.Leaf(Endpoint, echo)

ServerAdmissionControl.register(
Addition2Filter.name,
Addition2Filter.typeAgnostic
)
}

test("register a controller") {
val ctx = new Ctx
import ctx._

val factory = stack.make(StackServer.defaultParams)
val svc = Await.result(factory(), 5.seconds)

assert(Await.result(svc(1), 5.seconds) == 1)
assert(a.get == 3)
}

test("disabled by param") {
val ctx = new Ctx
import ctx._

val factory = stack.make(
StackServer.defaultParams +
ServerAdmissionControl.Param(false))
val svc = Await.result(factory(), 5.seconds)
assert(Await.result(svc(1), 5.seconds) == 1)
assert(a.get == 1)
}

test("unregister a controller") {
val ctx = new Ctx
import ctx._

ServerAdmissionControl.unregister(Addition2Filter.name)

val factory = stack.make(StackServer.defaultParams)
val svc = Await.result(factory(), 5.seconds)

assert(Await.result(svc(1), 5.seconds) == 1)
assert(a.get == 1)
}

test("register multiple controller") {
val ctx = new Ctx
import ctx._

ServerAdmissionControl.register(
(Addition2Filter.name, Addition2Filter.typeAgnostic),
(Addition3Filter.name, Addition3Filter.typeAgnostic))

val factory = stack.make(StackServer.defaultParams)
val svc = Await.result(factory(), 5.seconds)

assert(Await.result(svc(1), 5.seconds) == 1)
assert(a.get == 6)
}

test("duplicated registration is ignored") {
val ctx = new Ctx
import ctx._

ServerAdmissionControl.register(
Addition2Filter.name,
Addition2Filter.typeAgnostic
)

val factory = stack.make(StackServer.defaultParams)
val svc = Await.result(factory(), 5.seconds)
assert(Await.result(svc(1), 5.seconds) == 1)
assert(a.get == 3)
}
}


0 comments on commit 0f0e228

Please sign in to comment.