diff --git a/CHANGES b/CHANGES index c840befbf0..bf94191dc4 100644 --- a/CHANGES +++ b/CHANGES @@ -10,7 +10,15 @@ Note that ``RB_ID=#`` correspond to associated messages in commits. New Features ~~~~~~~~~~~~ - * finagle-core: Introduce `c.t.f.service.ResponseClassifier` which allows allows developers to + * finagle-core: Introduce the `c.t.f.filter.ServerAdmissionControl` module in the server Stack, + which is enabled through the param `c.t.f.param.EnableServerAdmissionControl`. Users can define + their own admission control filters, which reject requests when the server operates beyond + its capacity. These rejections apply backpressure and allow clients to retry requests on + servers that may not be over capacity. The filter implementation should define its own logic + to determine over capacity. One or more admission control filters can be installed through + the `ServerAdmissionControl.register` method. ``RB_ID=776385`` + + * finagle-core: Introduce `c.t.f.service.ResponseClassifier` which allows developers to give Finagle the additional application specific knowledge necessary in order to properly classify them. Without this, Finagle can only safely make judgements about transport level failures. This is now used by `StatsFilter` and `FailureAccrualFactory` so that diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala index e9e81f108d..fceb04d124 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala @@ -1,20 +1,18 @@ package com.twitter.finagle.builder +import com.twitter.util import com.twitter.concurrent.AsyncSemaphore -import com.twitter.finagle.filter.{MaskCancelFilter, RequestSemaphoreFilter} -import com.twitter.finagle.netty3.channel.IdleConnectionFilter -import com.twitter.finagle.netty3.channel.OpenConnectionsThresholds +import com.twitter.finagle.{Server => FinagleServer, _} +import com.twitter.finagle.filter.{MaskCancelFilter, RequestSemaphoreFilter, ServerAdmissionControl} import com.twitter.finagle.netty3.Netty3Listener -import com.twitter.finagle.param.ProtocolLibrary -import com.twitter.finagle.server.{StackBasedServer, Listener, StackServer, StdStackServer} +import com.twitter.finagle.netty3.channel.{IdleConnectionFilter, OpenConnectionsThresholds} +import com.twitter.finagle.server.{Listener, StackBasedServer, StackServer, StdStackServer} import com.twitter.finagle.service.{ExpiringService, TimeoutFilter} -import com.twitter.finagle.ssl.{Ssl, Engine} +import com.twitter.finagle.ssl.{Engine, Ssl} import com.twitter.finagle.stats.StatsReceiver import com.twitter.finagle.tracing.TraceInitializerFilter import com.twitter.finagle.transport.Transport import com.twitter.finagle.util._ -import com.twitter.finagle.{Server => FinagleServer, _} -import com.twitter.util import com.twitter.util.{CloseAwaitably, Duration, Future, NullMonitor, Time} import java.net.SocketAddress import javax.net.ssl.SSLEngine @@ -355,6 +353,14 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder]( configured(RequestSemaphoreFilter.Param(sem)) } + /** + * Configure admission control filters in the server Stack. + * + * @see [[com.twitter.finagle.filter.ServerAdmissionControl]] + */ + def enableAdmissionControl(enable: Boolean): This = + configured(ServerAdmissionControl.Param(enable)) + def requestTimeout(howlong: Duration): This = configured(TimeoutFilter.Param(howlong)) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/filter/ServerAdmissionControl.scala b/finagle-core/src/main/scala/com/twitter/finagle/filter/ServerAdmissionControl.scala new file mode 100644 index 0000000000..b249103b20 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/filter/ServerAdmissionControl.scala @@ -0,0 +1,94 @@ +package com.twitter.finagle.filter + +import com.twitter.finagle._ +import com.twitter.finagle.Filter.TypeAgnostic +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.JavaConverters._ + +/** + * Register and install admission control filters in the server Stack. + * + * Users can define their own admission control filters, which reject requests + * when the server operates beyond its capacity. These rejections apply backpressure + * and allow clients to retry requests on servers that may not be over capacity. + * The filter implementation should define its own logic to determine over capacity. + * + * One or more admission control filters can be installed through the ``register`` method. + * The filters are installed in a specific spot in the server Stack, but their internal + * order does not matter. Admission control is enabled through + * [[ServerAdmissionControl.Param]]. Each filter should provide its own mechanism + * for enabling, disabling and configuration. + */ +private[twitter] object ServerAdmissionControl { + // a map of admission control filters, key by name + private[this] val acs: ConcurrentMap[String, TypeAgnostic] = new ConcurrentHashMap() + + val role = new Stack.Role("Server Admission Controller") + + /** + * A class eligible for enabling admission control filters in the server Stack. + * + * @see [[com.twitter.finagle.filter.ServerAdmissionControl]] + */ + case class Param(enabled: Boolean) + object Param { + implicit val param = new Stack.Param[Param] { + lazy val default = Param(true) + } + } + + /** + * Add a filter to the list of admission control filters. If a controller + * with the same name already exists in the map, it's a no-op. It must + * be called before the server construction to take effect. + */ + def register(name: String, filter: TypeAgnostic): Unit = + acs.putIfAbsent(name, filter) + + /** + * Add multiple filters to the list of admission control filters. If a controller + * with the same name already exists in the map, it's a no-op. It must + * be called before the server construction to take effect. + */ + def register(pairs: (String, TypeAgnostic)*): Unit = + pairs.foreach { case (name, filter) => + acs.putIfAbsent(name, filter) + } + + + /** + * Remove a filter from the list of admission control filters. If the map + * does not contain a controller with the name, it's a no-op. It must + * be called before the server construction to take effect. + */ + def unregister(name: String): Unit = acs.remove(name) + + /** + * Clear all filters from the list of admission control filters. + */ + def unregisterAll(): Unit = acs.clear() + + def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] = { + new Stack.Module1[Param, ServiceFactory[Req, Rep]] { + val role = ServerAdmissionControl.role + val description = "Proactively reject requests when the server operates beyond its capacity" + def make( + _enabled: Param, + next: ServiceFactory[Req, Rep] + ): ServiceFactory[Req, Rep] = { + val Param(enabled) = _enabled + + if (!enabled || acs.isEmpty) { + next + } else { + // assume the order of filters doesn't matter + val typeAgnosticFilters = + acs.values.asScala.foldLeft(Filter.TypeAgnostic.Identity){ case (sum, f) => + f.andThen(sum) + } + typeAgnosticFilters.toFilter.andThen(next) + } + } + } + } +} \ No newline at end of file diff --git a/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala b/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala index 57015a0c33..1725dad956 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala @@ -193,4 +193,4 @@ object ExceptionStatsHandler { // static initialization. lazy val default = ExceptionStatsHandler(StatsFilter.DefaultExceptions) } -} +} \ No newline at end of file diff --git a/finagle-core/src/main/scala/com/twitter/finagle/server/StackServer.scala b/finagle-core/src/main/scala/com/twitter/finagle/server/StackServer.scala index 9e8542f0d9..9f5b5f5b5c 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/server/StackServer.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/server/StackServer.scala @@ -63,6 +63,10 @@ object StackServer { // DeadlineFilter. Thus, DeadlineFilter is pushed before TimeoutFilter. stk.push(DeadlineFilter.module) stk.push(DtabStatsFilter.module) + // Admission Control filters are inserted after `StatsFilter` so that rejected + // requests are counted. We may need to adjust how latency are recorded + // to exclude Nack response from latency stats, CSL-2306. + stk.push(ServerAdmissionControl.module) stk.push(StatsFilter.module) stk.push(RequestSemaphoreFilter.module) stk.push(MaskCancelFilter.module) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/util/parsers.scala b/finagle-core/src/main/scala/com/twitter/finagle/util/parsers.scala index 45569f17c3..fc23ff5410 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/util/parsers.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/util/parsers.scala @@ -15,7 +15,7 @@ import com.twitter.util.Duration * dbl: Double = 123.32 * }}} */ -private[finagle] object parsers { +private[twitter] object parsers { object list { def unapplySeq(s: String): Option[List[String]] = diff --git a/finagle-core/src/test/scala/com/twitter/finagle/filter/ServerAdmissionControlTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/filter/ServerAdmissionControlTest.scala new file mode 100644 index 0000000000..2ba5d7205f --- /dev/null +++ b/finagle-core/src/test/scala/com/twitter/finagle/filter/ServerAdmissionControlTest.scala @@ -0,0 +1,125 @@ +package com.twitter.finagle.filter + +import com.twitter.conversions.time._ +import com.twitter.finagle.Filter.TypeAgnostic +import com.twitter.finagle._ +import com.twitter.finagle.server.StackServer +import com.twitter.finagle.stack.Endpoint +import com.twitter.util.{Await, Future} +import java.util.concurrent.atomic.AtomicInteger +import org.junit.runner.RunWith +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner +import org.scalatest.mock.MockitoSugar + +@RunWith(classOf[JUnitRunner]) +class ServerAdmissionControlTest extends FunSuite with MockitoSugar { + class Ctx { + val a = new AtomicInteger(1) + + class AdditionFilter[Req, Rep](delta: Int) extends SimpleFilter[Req, Rep] { + def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = { + a.addAndGet(delta) + service(req) + } + } + + object Addition2Filter { + val name = "multiple 2" + + val typeAgnostic: TypeAgnostic = + new TypeAgnostic { + override def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] = + new AdditionFilter(2) + } + } + + object Addition3Filter { + val name = "multiple 3" + + val typeAgnostic: TypeAgnostic = + new TypeAgnostic { + override def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] = + new AdditionFilter(3) + } + } + + ServerAdmissionControl.unregisterAll() + + val echo = ServiceFactory.const(Service.mk[Int, Int](v => Future.value(v))) + val stack = StackServer.newStack[Int, Int] ++ Stack.Leaf(Endpoint, echo) + + ServerAdmissionControl.register( + Addition2Filter.name, + Addition2Filter.typeAgnostic + ) + } + + test("register a controller") { + val ctx = new Ctx + import ctx._ + + val factory = stack.make(StackServer.defaultParams) + val svc = Await.result(factory(), 5.seconds) + + assert(Await.result(svc(1), 5.seconds) == 1) + assert(a.get == 3) + } + + test("disabled by param") { + val ctx = new Ctx + import ctx._ + + val factory = stack.make( + StackServer.defaultParams + + ServerAdmissionControl.Param(false)) + val svc = Await.result(factory(), 5.seconds) + assert(Await.result(svc(1), 5.seconds) == 1) + assert(a.get == 1) + } + + test("unregister a controller") { + val ctx = new Ctx + import ctx._ + + ServerAdmissionControl.unregister(Addition2Filter.name) + + val factory = stack.make(StackServer.defaultParams) + val svc = Await.result(factory(), 5.seconds) + + assert(Await.result(svc(1), 5.seconds) == 1) + assert(a.get == 1) + } + + test("register multiple controller") { + val ctx = new Ctx + import ctx._ + + ServerAdmissionControl.register( + (Addition2Filter.name, Addition2Filter.typeAgnostic), + (Addition3Filter.name, Addition3Filter.typeAgnostic)) + + val factory = stack.make(StackServer.defaultParams) + val svc = Await.result(factory(), 5.seconds) + + assert(Await.result(svc(1), 5.seconds) == 1) + assert(a.get == 6) + } + + test("duplicated registration is ignored") { + val ctx = new Ctx + import ctx._ + + ServerAdmissionControl.register( + Addition2Filter.name, + Addition2Filter.typeAgnostic + ) + + val factory = stack.make(StackServer.defaultParams) + val svc = Await.result(factory(), 5.seconds) + assert(Await.result(svc(1), 5.seconds) == 1) + assert(a.get == 3) + } +} + +