Skip to content

Commit

Permalink
finagle-http: improve performance of StatsFilter
Browse files Browse the repository at this point in the history
Problem

The `c.t.f.http.filters.StatsFilter` manages
metrics by converting the response status
code to a String on every invocation. There
is no need for doing this on every call.

Solution

Keep references to valid HTTP status code
ranges, where they can be referenced by
the status code's Int value, without
allocating a new String on each request.
Also change the memoization of a specific
status code to be based off of the Int value.

Additionally, the allocation of a `Duration`
is removed by changing how the elapsed time
in milliseconds is determined.

JIRA Issues: CSL-8375

Differential Revision: https://phabricator.twitter.biz/D350733
  • Loading branch information
enbnt authored and jenkins committed Aug 6, 2019
1 parent ddbd669 commit f6ce452
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 18 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -7,6 +7,19 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle-http: improve performance of c.t.f.http.filter.StatsFilter. This results in two notable
API changes:
1. There is a `private[filter]` constructor which can take a `() => Long` for
determining the current time in milliseconds (the existing `StatsFilter(StatsReceiver)`
constructor defaults to using `Stopwatch.systemMillis` for determining the current time in
milliseconds.
2. The `protected count(Duration, Response)` method has been changed to
`private[this] count(Long, Response)` and is no longer part of the public API.
``PHAB_ID=D350733``

19.8.0
------

Expand Down
@@ -0,0 +1,57 @@
package com.twitter.finagle.filter

import com.twitter.finagle.Service
import com.twitter.finagle.benchmark.StdBenchAnnotations
import com.twitter.finagle.http.{Request, Response, Status}
import com.twitter.finagle.http.filter.StatsFilter
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.util.{Await, Future}
import org.openjdk.jmh.annotations.{Benchmark, Scope, State}
import scala.util.Random

// ./sbt 'project finagle-benchmark' 'jmh:run HttpStatsFilterBenchmark'
@State(Scope.Benchmark)
class HttpStatsFilterBenchmark extends StdBenchAnnotations {

private[this] val request: Request = Request("/ping")

private[this] val constResult: Future[Response] = Future.value {
val rep = Response()
rep.contentString = "pong"
rep
}

private[this] val filter = new StatsFilter[Request](
NullStatsReceiver
)

val statusCodes = Array(
Status.Accepted,
Status.BadRequest,
Status.Continue,
Status.EnhanceYourCalm,
Status.Forbidden,
Status.NoContent,
Status.Ok,
Status.InternalServerError
)

private[this] val constSvc = filter.andThen(Service.constant[Response](constResult))
private[this] val randSvc = filter.andThen(Service.mk[Request, Response](_ =>
Future.value {
Response(statusCodes(Random.nextInt(statusCodes.size)))
}))

@Benchmark
def constantStatusCode(): Response = {
val res = constSvc(request)
Await.result(res)
}

@Benchmark
def randomStatusCode(): Response = {
val res = randSvc(request)
Await.result(res)
}

}
Expand Up @@ -4,6 +4,8 @@ import com.twitter.finagle._
import com.twitter.finagle.http.{Request, Response, Status}
import com.twitter.finagle.stats.{Counter, Stat, StatsReceiver}
import com.twitter.util._
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReferenceArray

object StatsFilter {
val role: Stack.Role = Stack.Role("HttpStatsFilter")
Expand All @@ -24,6 +26,29 @@ object StatsFilter {
new StatsFilter[Request](statsParam.statsReceiver.scope("http")).andThen(next)
}
}

private def statusCodeRange(code: Int): String =
if (code < 100) "UNKNOWN"
else if (code < 200) "1XX"
else if (code < 300) "2XX"
else if (code < 400) "3XX"
else if (code < 500) "4XX"
else if (code < 600) "5XX"
else "UNKNOWN"

private def getStatusRangeIndex(code: Int): Int =
if (code < 100) 0
else if (code < 200) 1
else if (code < 300) 2
else if (code < 400) 3
else if (code < 500) 4
else if (code < 600) 5
else 0

private val nowTimeInMillis: () => Long = Stopwatch.systemMillis

private case class StatusStats(count: Counter, latency: Stat)

}

/**
Expand All @@ -36,41 +61,69 @@ object StatsFilter {
* time.[code]
* time.[class]
*/
class StatsFilter[REQUEST <: Request](stats: StatsReceiver)
class StatsFilter[REQUEST <: Request] private[filter] (stats: StatsReceiver, now: () => Long)
extends SimpleFilter[REQUEST, Response] {

import StatsFilter._

def this(stats: StatsReceiver) = this(stats, StatsFilter.nowTimeInMillis)

private[this] val statusReceiver = stats.scope("status")
private[this] val timeReceiver = stats.scope("time")

private[this] val counterCache: String => Counter =
Memoize(statusReceiver.counter(_))
// Optimized for the known valid status code ranges (1xx, 2xx, 3xx, 4xx, 5xx)
// where the 0th index represents an unknown/invalid HTTP status range.
// We initialize these lazily to prevent unused metrics from being exported.
private[this] val statusRange: AtomicReferenceArray[StatusStats] =
new AtomicReferenceArray[StatusStats](6)

private[this] def getStatusRange(code: Int): StatusStats = {
val index = getStatusRangeIndex(code)
val statsPair = statusRange.get(index)
if (statsPair == null) {
val codeRange = statusCodeRange(code)
val initPair = StatusStats(statusReceiver.counter(codeRange), timeReceiver.stat(codeRange))
statusRange.compareAndSet(index, null, initPair)
initPair
} else {
statsPair
}
}

private[this] val statCache: String => Stat =
Memoize(timeReceiver.stat(_))
private[this] val statusCache = new ConcurrentHashMap[Int, StatusStats]()
private[this] val statusCacheFn = new java.util.function.Function[Int, StatusStats] {
def apply(t: Int): StatusStats = {
val statusCode = t.toString
StatusStats(statusReceiver.counter(statusCode), timeReceiver.stat(statusCode))
}
}

def apply(request: REQUEST, service: Service[REQUEST, Response]): Future[Response] = {
val elapsed = Stopwatch.start()
val start = now()

val future = service(request)
future respond {
case Return(response) =>
count(elapsed(), response)
count(now() - start, response)
case Throw(_) =>
// Treat exceptions as empty 500 errors
val response = Response(request.version, Status.InternalServerError)
count(elapsed(), response)
count(now() - start, response)
}
future
}

protected def count(duration: Duration, response: Response): Unit = {
val statusCode = response.statusCode.toString
val statusClass = (response.statusCode / 100).toString + "XX"
private[this] def count(durationMs: Long, response: Response): Unit = {
val statusCode = response.statusCode

counterCache(statusCode).incr()
counterCache(statusClass).incr()
// we don't use a named pair to avoid extra allocation here
// ex: `val (counter, stat) = getStatusRange(statusCode)`
val rangeStats = getStatusRange(statusCode)
rangeStats.count.incr()
rangeStats.latency.add(durationMs)

statCache(statusCode).add(duration.inMilliseconds)
statCache(statusClass).add(duration.inMilliseconds)
val codeStats = statusCache.computeIfAbsent(statusCode, statusCacheFn)
codeStats.count.incr()
codeStats.latency.add(durationMs)
}
}
Expand Up @@ -3,7 +3,7 @@ package com.twitter.finagle.http.filter
import com.twitter.finagle.Service
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.stats.{InMemoryStatsReceiver, Verbosity}
import com.twitter.util.{Await, Duration, Future, Time}
import com.twitter.util.{Await, Duration, Future, Stopwatch, Time}
import org.scalatest.FunSuite
import org.mockito.Mockito.{spy, verify}

Expand All @@ -21,7 +21,7 @@ class StatsFilterTest extends FunSuite {
test("increment stats") {
val receiver = spy(new InMemoryStatsReceiver)

val filter = new StatsFilter(receiver) andThen service
val filter = new StatsFilter(receiver, Stopwatch.timeMillis) andThen service

Time.withCurrentTimeFrozen { _ =>
Await.result(filter(Request()), Duration.fromSeconds(5))
Expand All @@ -36,7 +36,7 @@ class StatsFilterTest extends FunSuite {
test("status and time counters and stats are memoized") {
val receiver = spy(new InMemoryStatsReceiver)

val filter = new StatsFilter(receiver) andThen service
val filter = new StatsFilter(receiver, Stopwatch.timeMillis) andThen service

Time.withCurrentTimeFrozen { _ =>
Await.result(filter(Request()), Duration.fromSeconds(5))
Expand Down

0 comments on commit f6ce452

Please sign in to comment.