Skip to content

Commit

Permalink
finagle-core: Stop recording transit latency and deadline budget for …
Browse files Browse the repository at this point in the history
…clients

Problem

Finagle records transit latency for clients, but only servers
care about it.

Solution

Move the transit latency stat out of StatsFilter and into
ServerStatsFilter. Handletime is also a server-specific stat,
so moved that into ServerStatsFilter too, and deleted
HandletimeFilter.  This has the added advantage of recording
transit latency at the same time we do handletime, which is one
of the earliest points.

This review also handles some other miscellaneous cleanup,
making no-allocation, testable, elapsed duration easier to use,
adding tests for handletime, transit latency and deadline budget.

Result

Finagle services no longer export transit_latency_ms or
deadline_budget_ms for clients.  It's not useful for clients,
so it's safe to remove it.

RB_ID=751268
  • Loading branch information
mosesn authored and jenkins committed Oct 12, 2015
1 parent a5d6bb3 commit 86d9b05
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 109 deletions.
11 changes: 10 additions & 1 deletion CHANGES
Expand Up @@ -23,8 +23,17 @@ Breaking API Changes
compatibility, `HashedWheelTimer` has additional constructors to match
those provided by `org.jboss.netty.util.HashedWheelTimer`. ``RB_ID=748514``

* finagle-core: Marked `HandletimeFilter` private[finagle], and renamed it to
`ServerStatsFilter`. ``RB_ID=75268``

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

* finagle-core: The transit latency (transit_latency_ms) and deadline budget
(deadline_budget_ms) stats are now only recorded for servers, not for
clients anymore, since they're only meaningful for servers. ``RB_ID=75268``

Deprecations
~~~~~~~~~~~~

* finagle-zipkin: Drop `c.t.zipkin.thrift.Annotation.duration` and associated thrift field
`c.t.f.thrift.thrift.Annotation.duration`. ``RB_ID=751986``
Expand Down
20 changes: 10 additions & 10 deletions doc/src/sphinx/metrics/Public.rst
Expand Up @@ -34,14 +34,6 @@ StatsFilter
a counter of the number of times any SourcedException or sourced Failure has
been thrown

**transit_latency_ms**
a stat that attempts to measure (walltime) transit times between hops, e.g.,
from client to server. Not supported by all protocols.

**deadline_budget_ms**
a stat accounting for the (implied) amount of time remaining for this request,
for example from a deadline or timeout. Not supported by all protocols.

StatsFactoryWrapper
<<<<<<<<<<<<<<<<<<<

Expand All @@ -56,13 +48,21 @@ StatsFactoryWrapper
a stat of the latency to acquire a service in milliseconds
this entails establishing a connection or waiting for a connection from a pool

HandletimeFilter
<<<<<<<<<<<<<<<<
ServerStatsFilter
<<<<<<<<<<<<<<<<<

**handletime_us**
a histogram of the time it takes to handle the request in microseconds
NB: does not include the time to respond

**transit_latency_ms**
a stat that attempts to measure (walltime) transit times between hops, e.g.,
from client to server. Not supported by all protocols.

**deadline_budget_ms**
a stat accounting for the (implied) amount of time remaining for this request,
for example from a deadline or timeout. Not supported by all protocols.

DefaultServer
<<<<<<<<<<<<<

Expand Down
Expand Up @@ -2,7 +2,7 @@ package com.twitter.finagle.exp

import com.twitter.conversions.time._
import com.twitter.finagle.benchmark.StdBenchAnnotations
import com.twitter.finagle.util.WindowedAdder
import com.twitter.util.Stopwatch
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

Expand All @@ -29,7 +29,7 @@ class LatencyHistogramBench extends StdBenchAnnotations {
err,
1.minute.inMillis,
LatencyHistogram.DefaultSlices,
WindowedAdder.systemMs)
Stopwatch.systemMillis)

// give it some data to start with
0L.until(maxDurationMs).foreach(histo.add)
Expand Down

This file was deleted.

@@ -0,0 +1,67 @@
package com.twitter.finagle.filter

import com.twitter.finagle.Deadline
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.{param, Service, ServiceFactory, SimpleFilter, Stack, Stackable}
import com.twitter.util.{Future, Stopwatch, Time, Duration}
import java.util.concurrent.TimeUnit

private[finagle] object ServerStatsFilter {
val role = Stack.Role("ServerStats")

/**
* Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.filter.ServerStatsFilter]].
*/
def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module1[param.Stats, ServiceFactory[Req, Rep]] {
val role = ServerStatsFilter.role
val description = "Record elapsed execution time, transit latency, deadline budget, of underlying service"
def make(_stats: param.Stats, next: ServiceFactory[Req, Rep]) = {
val param.Stats(statsReceiver) = _stats
new ServerStatsFilter(statsReceiver).andThen(next)
}
}

/** Used as a sentinel with reference equality to indicate the absence of a deadline */
private val NoDeadline = Deadline(Time.Undefined, Time.Undefined)
private val NoDeadlineFn = () => NoDeadline
}

/**
* A [[com.twitter.finagle.Filter]] that records the elapsed execution
* times of the underlying [[com.twitter.finagle.Service]], transit
* time, and budget time.
*
* @note the stat does not include the time that it takes to satisfy
* the returned `Future`, only how long it takes for the `Service`
* to return the `Future`.
*/
private[finagle] class ServerStatsFilter[Req, Rep](statsReceiver: StatsReceiver, nowNanos: () => Long)
extends SimpleFilter[Req, Rep]
{
import ServerStatsFilter._

def this(statsReceiver: StatsReceiver) = this(statsReceiver, Stopwatch.systemNanos)

private[this] val handletime = statsReceiver.stat("handletime_us")
private[this] val transitTimeStat = statsReceiver.stat("transit_latency_ms")
private[this] val budgetTimeStat = statsReceiver.stat("deadline_budget_ms")

def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val startAt = nowNanos()

val dl = Contexts.broadcast.getOrElse(Deadline, NoDeadlineFn)
if (dl ne NoDeadline) {
val now = Time.now
transitTimeStat.add(((now-dl.timestamp) max Duration.Zero).inUnit(TimeUnit.MILLISECONDS))
budgetTimeStat.add(((dl.deadline-now) max Duration.Zero).inUnit(TimeUnit.MILLISECONDS))
}

try service(request)
finally {
val elapsedNs = nowNanos() - startAt
handletime.add(TimeUnit.MICROSECONDS.convert(elapsedNs, TimeUnit.NANOSECONDS))
}
}
}
Expand Up @@ -46,7 +46,7 @@ object StackServer {
* @see [[com.twitter.finagle.tracing.ServerTracingFilter]]
* @see [[com.twitter.finagle.tracing.TraceInitializerFilter]]
* @see [[com.twitter.finagle.filter.MonitorFilter]]
* @see [[com.twitter.finagle.filter.HandletimeFilter]]
* @see [[com.twitter.finagle.filter.ServerStatsFilter]]
*/
def newStack[Req, Rep]: Stack[ServiceFactory[Req, Rep]] = {
val stk = new StackBuilder[ServiceFactory[Req, Rep]](
Expand All @@ -62,7 +62,7 @@ object StackServer {
stk.push(ExceptionSourceFilter.module)
stk.push(Role.jvmTracing, ((next: ServiceFactory[Req, Rep]) =>
newJvmFilter[Req, Rep]() andThen next))
stk.push(HandletimeFilter.module)
stk.push(ServerStatsFilter.module)
stk.push(Role.protoTracing, identity[ServiceFactory[Req, Rep]](_))
stk.push(ServerTracingFilter.module)
stk.push(Role.preparer, identity[ServiceFactory[Req, Rep]](_))
Expand Down
Expand Up @@ -2,11 +2,10 @@ package com.twitter.finagle.service

import com.twitter.finagle.Filter.TypeAgnostic
import com.twitter.finagle._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.stats.{
MultiCategorizingExceptionStatsHandler, ExceptionStatsHandler, StatsReceiver}
import com.twitter.jsr166e.LongAdder
import com.twitter.util.{Future, Stopwatch, Throw, Return, Time, Duration}
import com.twitter.util.{Future, Stopwatch, Throw, Return}
import java.util.concurrent.TimeUnit

object StatsFilter {
Expand Down Expand Up @@ -56,20 +55,16 @@ object StatsFilter {
override def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] =
new StatsFilter[Req, Rep](statsReceiver, exceptionStatsHandler)
}

/** Used as a sentinel with reference equality to indicate the absence of a deadline */
private val NoDeadline = Deadline(Time.Undefined, Time.Undefined)
private val NoDeadlineFn = () => NoDeadline
}

/**
* StatsFilter reports request statistics to the given receiver.
*
* @param timeUnit this controls what granularity is used for
* measuring latency, transit time, and budget time. The default is milliseconds,
* measuring latency. The default is milliseconds,
* but other values are valid. The choice of this changes the name of the stat
* attached to the given [[StatsReceiver]]. For the common units,
* it will be "request_latency_ms", "transit_latency_ms" and "deadline_budget_ms".
* it will be "request_latency_ms".
*
* @note The innocent bystander may find the semantics with respect
* to backup requests a bit puzzling; they are entangled in legacy.
Expand Down Expand Up @@ -109,19 +104,10 @@ class StatsFilter[Req, Rep](
private[this] val loadGauge = statsReceiver.addGauge("load") { outstandingRequestCount.sum() }
private[this] val outstandingRequestCountGauge =
statsReceiver.addGauge("pending") { outstandingRequestCount.sum() }
private[this] val transitTimeStat = statsReceiver.stat(s"transit_latency_$latencyStatSuffix")
private[this] val budgetTimeStat = statsReceiver.stat(s"deadline_budget_$latencyStatSuffix")

def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val elapsed = Stopwatch.start()

val dl = Contexts.broadcast.getOrElse(Deadline, NoDeadlineFn)
if (dl ne NoDeadline) {
val now = Time.now
transitTimeStat.add(((now-dl.timestamp) max Duration.Zero).inUnit(timeUnit))
budgetTimeStat.add(((dl.deadline-now) max Duration.Zero).inUnit(timeUnit))
}

outstandingRequestCount.increment()
service(request).respond { response =>
outstandingRequestCount.decrement()
Expand Down
@@ -1,6 +1,6 @@
package com.twitter.finagle.util

import com.twitter.util.Duration
import com.twitter.util.{Duration, Stopwatch}

/**
* A token bucket is used to control the relative rates of two
Expand Down Expand Up @@ -73,5 +73,5 @@ private[finagle] object TokenBucket {
* the ones added to the bucket.
*/
def newLeakyBucket(ttl: Duration, reserve: Int): TokenBucket =
newLeakyBucket(ttl, reserve, WindowedAdder.systemMs)
newLeakyBucket(ttl, reserve, Stopwatch.systemMillis)
}
Expand Up @@ -27,10 +27,6 @@ private[finagle] object WindowedAdder {
require(slices > 1)
new WindowedAdder(range/slices, slices-1, now)
}

// we use nanos instead of current time millis because it increases monotonically
val systemMs: () => Long = () => System.nanoTime() / (1000 * 1000)
val timeMs: () => Long = () => Time.now.inMilliseconds
}

private[finagle] class WindowedAdder private[WindowedAdder](
Expand Down
@@ -0,0 +1,58 @@
package com.twitter.finagle.filter

import com.twitter.finagle.{Deadline, Service}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.{Stopwatch, Time, Future}
import com.twitter.conversions.time._
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class ServerStatsFilterTest extends FunSuite {
test("Records handletime for a service") {
Time.withCurrentTimeFrozen { ctl =>
val inMemory = new InMemoryStatsReceiver
val svc = Service.mk[Unit, Unit] { unit =>
ctl.advance(5.microseconds)
Future.never
}
val filter = new ServerStatsFilter[Unit, Unit](inMemory, Stopwatch.timeNanos)
filter.andThen(svc)(())
val expected = 5
val actual = inMemory.stats(Seq("handletime_us"))(0)
assert(actual == expected)
}
}

test("Records budget remaining for a service") {
Time.withCurrentTimeFrozen { ctl =>
val inMemory = new InMemoryStatsReceiver
Contexts.broadcast.let(Deadline, Deadline(Time.now, Time.now + 15.milliseconds)) {
ctl.advance(5.milliseconds)
val svc = Service.mk[Unit, Unit] { unit => Future.never }
val filter = new ServerStatsFilter[Unit, Unit](inMemory, Stopwatch.timeNanos)
filter.andThen(svc)(())
val expected = 10f
val actual = inMemory.stats(Seq("deadline_budget_ms"))(0)
assert(actual == expected)
}
}
}

test("Records transit time for a service") {
Time.withCurrentTimeFrozen { ctl =>
val inMemory = new InMemoryStatsReceiver
Contexts.broadcast.let(Deadline, Deadline(Time.now, Time.now + 15.milliseconds)) {
ctl.advance(5.milliseconds)
val svc = Service.mk[Unit, Unit] { unit => Future.never }
val filter = new ServerStatsFilter[Unit, Unit](inMemory, Stopwatch.timeNanos)
filter.andThen(svc)(())
val expected = 5f
val actual = inMemory.stats(Seq("transit_latency_ms"))(0)
assert(actual == expected)
}
}
}
}
@@ -0,0 +1,39 @@
package com.twitter.finagle.server

import com.twitter.conversions.time._
import com.twitter.finagle.{Service, ServiceFactory, Deadline, Stack}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.param.Stats
import com.twitter.finagle.server.StackServer
import com.twitter.finagle.service.TimeoutFilter
import com.twitter.finagle.stack.Endpoint
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.{Await, Future, Time}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class StackServerTest extends FunSuite {
test("Deadline isn't changed until after it's recorded") {
val echo = ServiceFactory.const(Service.mk[Unit, Deadline] { unit =>
Future.value(Contexts.broadcast(Deadline))
})
val stack = StackServer.newStack[Unit, Deadline] ++ Stack.Leaf(Endpoint, echo)
val statsReceiver = new InMemoryStatsReceiver
val factory = stack.make(StackServer.defaultParams + TimeoutFilter.Param(1.second) + Stats(statsReceiver))
val svc = Await.result(factory(), 5.seconds)
Time.withCurrentTimeFrozen { ctl =>
Contexts.broadcast.let(Deadline, Deadline.ofTimeout(5.seconds)) {
ctl.advance(1.second)
val result = svc(())

// we should be one second ahead
assert(statsReceiver.stats(Seq("transit_latency_ms"))(0) == 1.second.inMilliseconds.toFloat)

// but the deadline inside the service's closure should be updated
assert(Await.result(result) == Deadline.ofTimeout(1.second))
}
}
}
}

0 comments on commit 86d9b05

Please sign in to comment.