Skip to content

Commit

Permalink
finagle-core: Add annotations to filters that might throttle requests
Browse files Browse the repository at this point in the history
Problem:
We would like to have the ability to analyze traces to
understand how rate-limiting affects latencies. To do so,
we need instrumentation in the various filters that might
throttle requests

Solution:
Add `rejected` binary annotation to `RequestSemophoreFilter`
and `NackAdmissionFilter`

JIRA Issues: CSL-7684

Differential Revision: https://phabricator.twitter.biz/D597875
  • Loading branch information
joybestourous authored and jenkins committed Jan 14, 2021
1 parent e9e9e0d commit 6685768
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Unreleased

New Features
~~~~~~~~~~~~
* finagle-core: Add `clnt/<FilterName>_rejected` annotation to filters that may throttle requests,
including `c.t.finagle.filter.NackAdmissionFilter` and `c.t.finagle.filter.RequestSemaphoreFilter`.
``PHAB_ID=D597875``

* finagle-http: Record http-specific annotations including `http.status_code` and
`http.method`. See details at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.twitter.conversions.DurationOps._
import com.twitter.finagle._
import com.twitter.finagle.client.useNackAdmissionFilter
import com.twitter.finagle.stats.{Counter, Gauge, StatsReceiver, Verbosity}
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.util.{LossyEma, Rng}
import com.twitter.util._

Expand Down Expand Up @@ -240,6 +241,12 @@ class NackAdmissionFilter[Req, Rep] private[filter] (
def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = {
rpsCounter.incr()
if (enabled && shouldDropRequest()) {
val tracing = Trace()
if (tracing.isActivelyTracing)
tracing.recordBinary(
"clnt/NackAdmissionFilter_rejected",
s"probabilistically dropped because nackRate ${1d - emaValue} over window $window exceeds nackRateThreshold $acceptRateThreshold"
)
droppedRequestCounter.incr()
OverloadFailure
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.twitter.finagle.filter
import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle._
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.tracing.Trace
import com.twitter.util.{Future, Return, Throw}

object RequestSemaphoreFilter {
Expand Down Expand Up @@ -42,6 +43,11 @@ class RequestSemaphoreFilter[Req, Rep](sem: AsyncSemaphore, stats: StatsReceiver
def apply(req: Req, service: Service[Req, Rep]): Future[Rep] =
sem.acquire().transform {
case Return(permit) => service(req).ensure { permit.release() }
case Throw(noPermit) => Future.exception(Failure.rejected(noPermit))
case Throw(noPermit) => {
val tracing = Trace()
if (tracing.isActivelyTracing)
tracing.recordBinary("clnt/RequestSemaphoreFilter_rejected", noPermit.getMessage)
Future.exception(Failure.rejected(noPermit))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.twitter.finagle.filter
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.service.FailedService
import com.twitter.finagle.stats.{InMemoryStatsReceiver, NullStatsReceiver}
import com.twitter.finagle.tracing.Annotation.BinaryAnnotation
import com.twitter.finagle.tracing.{BufferingTracer, Record, Trace}
import com.twitter.finagle.util.{DefaultLogger, Ema, LossyEma, Rng}
import com.twitter.finagle.{Failure, FailureFlags, Service, ServiceFactory, Stack, param}
import com.twitter.util._
Expand Down Expand Up @@ -215,6 +217,31 @@ class NackAdmissionFilterTest extends FunSuite {
testDropsRequest()
}

testEnabled("annotates dropped requests") { ctl =>
val lowRng: CustomRng = new CustomRng(0)
val ctx = new Ctx(lowRng)
import ctx._
val tracer = new BufferingTracer()
Trace.letTracer(tracer) {

while (filter.emaValue > DefaultAcceptRateThreshold) {
ctl.advance(10.milliseconds)
nackWithoutTest()
}

nackWithoutTest()
val expected = Seq(
(
"clnt/NackAdmissionFilter_rejected",
"probabilistically dropped because " +
"nackRate 0.5000930677240232 over window 3.seconds exceeds nackRateThreshold 0.5"))
val actual = tracer.iterator.toList collect {
case Record(_, _, BinaryAnnotation(k, v), _) => k -> v
}
assert(expected == actual)
}
}

testEnabled("doesn't drop requests after accept rate drops below threshold") { ctl =>
/**
* We change customRng so it will always drop or always send requests,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ package com.twitter.finagle.filter

import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle._
import com.twitter.finagle.tracing.Annotation.BinaryAnnotation
import com.twitter.finagle.tracing.{BufferingTracer, Record, Trace}
import com.twitter.util.{Await, Future}
import org.scalatest.FunSuite

class RequestSemaphoreFilterTest extends FunSuite {
def tracingAnnotations(tracer: BufferingTracer): Seq[(String, Any)] = {
tracer.iterator.toList collect {
case Record(_, _, BinaryAnnotation(k, v), _) => k -> v
}
}

test("mark dropped requests as rejected") {
val neverSvc = new Service[Int, Int] {
Expand All @@ -29,4 +36,20 @@ class RequestSemaphoreFilterTest extends FunSuite {
val e = intercept[Exception] { Await.result(svc(1)) }
assert(e == exc)
}

test("annotates dropped requests") {
val tracer = new BufferingTracer()
Trace.letTracer(tracer) {
val neverSvc = new Service[Int, Int] {
def apply(req: Int) = Future.never
}
val q = new AsyncSemaphore(1, 0)
val svc = new RequestSemaphoreFilter(q) andThen neverSvc
svc(1)
val f = intercept[Failure] { Await.result(svc(1)) }
}

val expected = Seq(("clnt/RequestSemaphoreFilter_rejected", "Max waiters exceeded"))
assert(tracingAnnotations(tracer) == expected)
}
}

0 comments on commit 6685768

Please sign in to comment.