Skip to content

Commit

Permalink
finagle: Add tracing annotations to backup requests
Browse files Browse the repository at this point in the history
Problem

There are no tracing annotations for backup requests.  Adding them is
not particularly difficult, however this runs into a problem with how
MethodBuilder uses them as the backup requests happen before the
client's trace initialization.

Solution

Have MethodBuilder take over placement of trace initialization and add
anotations to `BackupRequestFilter`.

Result

Timestamped annotations for:

 - "Client Backup Request Issued"
 - "Client Backup Request Won" or "Client Backup Request Lost"

Binary annotations for

 - "clnt/backup_request_threshold_ms" with the current value of the latency threshold, in milliseconds
 - "clnt/backup_request_span_id" with the span id of the backup request

JIRA Issues: CSL-7731, CSL-7593

Differential Revision: https://phabricator.twitter.biz/D280998
  • Loading branch information
kevinoliver authored and jenkins committed Mar 6, 2019
1 parent 1867976 commit 5201f62
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 55 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -7,6 +7,16 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

New Features
~~~~~~~~~~~~

* finagle-core: Added tracing annotations to backup requests. ``PHAB_ID=D280998``

- Timestamped annotation "Client Backup Request Issued"
- Timestamped annotation "Client Backup Request Won" or "Client Backup Request Lost"
- Binary annotation "clnt/backup_request_threshold_ms", with the current value of the latency threshold, in milliseconds
- Binary annotation "clnt/backup_request_span_id", with the span id of the backup request

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

Expand Down
@@ -1,9 +1,25 @@
package com.twitter.finagle.http

import com.twitter.finagle
import com.twitter.finagle.tracing.{Trace, TraceInitializerFilter}
import com.twitter.finagle.tracing.{Trace, TraceInitializerFilter, Tracer}
import com.twitter.finagle.{Filter, ServiceFactory, Stack}

private[finagle] object HttpClientTraceInitializer {

def apply[Req, Rep](tracer: Tracer): Filter[Req, Rep, Req, Rep] =
Filter.mk[Req, Rep, Req, Rep] { (req, svc) =>
Trace.letTracerAndNextId(tracer) {
TraceInfo.setClientRequestHeaders(req.asInstanceOf[Request])
svc(req)
}
}

def typeAgnostic(tracer: Tracer): Filter.TypeAgnostic = new Filter.TypeAgnostic {
def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] = apply(tracer)
}

}

private[finagle] class HttpClientTraceInitializer[Req <: Request, Rep]
extends Stack.Module1[finagle.param.Tracer, ServiceFactory[Req, Rep]] {
val role: Stack.Role = TraceInitializerFilter.role
Expand All @@ -13,13 +29,7 @@ private[finagle] class HttpClientTraceInitializer[Req <: Request, Rep]
_tracer: finagle.param.Tracer,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
val finagle.param.Tracer(tracer) = _tracer
val traceInitializer = Filter.mk[Req, Rep, Req, Rep] { (req, svc) =>
Trace.letTracerAndNextId(tracer) {
TraceInfo.setClientRequestHeaders(req)
svc(req)
}
}
val traceInitializer = HttpClientTraceInitializer[Req, Rep](_tracer.tracer)
traceInitializer.andThen(next)
}
}
Expand Up @@ -6,6 +6,7 @@ import com.twitter.finagle._
import com.twitter.finagle.context.BackupRequest
import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier, Retries, RetryBudget}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.{Annotation, Trace, TraceId, Tracing}
import com.twitter.finagle.util.WindowedPercentileHistogram
import com.twitter.logging.Logger
import com.twitter.util._
Expand All @@ -20,6 +21,15 @@ object BackupRequestFilter {
*/
private val OrigRequestTimeout = Failure("Original request did not complete in time")

private val IssuedAnnotation =
Annotation.Message("Client Backup Request Issued")

private val WonAnnotation =
Annotation.Message("Client Backup Request Won")

private val LostAnnotation =
Annotation.Message("Client Backup Request Lost")

/**
* Use a minimum non-zero delay to prevent sending unnecessary backup requests
* immediately for services where the latency at the percentile where a backup will be sent is
Expand All @@ -42,7 +52,7 @@ object BackupRequestFilter {

private def getAndValidateMaxExtraLoad(maxExtraLoad: Tunable[Double]): Double =
maxExtraLoad() match {
case Some(maxExtraLoad) if (maxExtraLoad >= 0.0 && maxExtraLoad < 1.0) => maxExtraLoad
case Some(mel) if mel >= 0.0 && mel < 1.0 => mel
case Some(invalidMaxExtraLoad) =>
log.error(s"maxExtraLoad must be between 0.0 and 1.0, was $invalidMaxExtraLoad. Using 0.0")
0.0
Expand Down Expand Up @@ -156,8 +166,8 @@ object BackupRequestFilter {

def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.ModuleParams[ServiceFactory[Req, Rep]] {
val role = BackupRequestFilter.role
val description = BackupRequestFilter.description
val role: Stack.Role = BackupRequestFilter.role
val description: String = BackupRequestFilter.description

override def parameters: Seq[Stack.Param[_]] = Seq(
implicitly[Stack.Param[param.Stats]],
Expand Down Expand Up @@ -297,11 +307,19 @@ private[finagle] class BackupRequestFilter[Req, Rep](
windowedPercentileHistogramFac()

// Prevent sending a backup on the first request
@volatile private[this] var sendBackupAfter: Int = Int.MaxValue
@volatile private[this] var sendBackupAfterMillis: Int = Int.MaxValue

// For testing
private[client] def sendBackupAfterDuration: Duration =
Duration.fromMilliseconds(sendBackupAfter)
Duration.fromMilliseconds(sendBackupAfterMillis)

private[this] val sendAfterStat = statsReceiver.stat("send_backup_after_ms")
private[this] val backupsSent = statsReceiver.counter("backups_sent")

// Indicates that the backup request returned first, regardless of whether it succeeded.
private[this] val backupsWon = statsReceiver.counter("backups_won")

private[this] val budgetExhausted = statsReceiver.counter("budget_exhausted")

// schedule timer to refresh `sendBackupAfter`, and refresh `backupRequestRetryBudget` in response
// to changes to the value of `maxExtraLoadTunable`,
Expand All @@ -315,19 +333,12 @@ private[finagle] class BackupRequestFilter[Req, Rep](
percentile = percentileFromMaxExtraLoad(curMaxExtraLoad)
backupRequestRetryBudget = newRetryBudget(curMaxExtraLoad, nowMs)
}
sendBackupAfter = Math.max(MinSendBackupAfterMs, windowedPercentile.percentile(percentile))
sendAfterStat.add(sendBackupAfter)
sendBackupAfterMillis =
Math.max(MinSendBackupAfterMs, windowedPercentile.percentile(percentile))
sendAfterStat.add(sendBackupAfterMillis)
}
}

private[this] val sendAfterStat = statsReceiver.stat("send_backup_after_ms")
private[this] val backupsSent = statsReceiver.counter("backups_sent")

// Indicates that the backup request returned first, regardless of whether it succeeded.
private[this] val backupsWon = statsReceiver.counter("backups_won")

private[this] val budgetExhausted = statsReceiver.counter("budget_exhausted")

private[this] def isSuccess(reqRep: ReqRep): Boolean =
responseClassifier.applyOrElse(reqRep, ResponseClassifier.Default) match {
case ResponseClass.Successful(_) => true
Expand All @@ -344,9 +355,9 @@ private[finagle] class BackupRequestFilter[Req, Rep](
case _ => false
}

private[this] def record(req: Req, f: Future[Rep]): Future[Rep] = {
private[this] def record(req: Req, rep: Future[Rep]): Future[Rep] = {
val start = nowMs()
f.respond { response =>
rep.respond { response =>
if (shouldRecord(response)) {
val latency = (nowMs() - start).toInt
windowedPercentile.add(latency)
Expand All @@ -361,10 +372,57 @@ private[finagle] class BackupRequestFilter[Req, Rep](
private[this] def canIssueBackup(): Boolean =
backupRequestRetryBudget.tryWithdraw() && clientRetryBudget.tryWithdraw()

private[this] def issueBackup(
req: Req,
service: Service[Req, Rep],
tracing: Tracing,
backupTraceId: TraceId
): Future[Rep] = {
if (tracing.isActivelyTracing) {
tracing.record(IssuedAnnotation)
tracing.recordBinary("clnt/backup_request_threshold_ms", sendBackupAfterMillis)
tracing.recordBinary("clnt/backup_request_span_id", backupTraceId.spanId)
}
val rep =
Trace.letId(backupTraceId) {
BackupRequest.let {
service(req)
}
}
record(req, rep)
}

private[this] def pickWinner(
req: Req,
orig: Future[Rep],
backup: Future[Rep],
trace: Tracing
): Future[Rep] = {
val (winner, loser) = if (orig.isDefined) (orig, backup) else (backup, orig)
winner.transform { response =>
val wasSuccess = isSuccess(ReqRep(req, response))
val backupWon = wasSuccess && (backup eq winner)
if (backupWon) backupsWon.incr()

if (trace.isActivelyTracing) {
val annotation = if (backupWon) WonAnnotation else LostAnnotation
trace.record(annotation)
}
if (wasSuccess) {
if (sendInterrupts) {
loser.raise(SupersededRequestFailure)
}
winner
} else {
loser
}
}
}

def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = {
backupRequestRetryBudget.deposit()
val orig = record(req, service(req))
val howLong = sendBackupAfter
val howLong = sendBackupAfterMillis

// once our percentile exceeds how high we can track, we should stop sending backups.
if (howLong >= windowedPercentile.highestTrackableValue) {
Expand All @@ -381,21 +439,11 @@ private[finagle] class BackupRequestFilter[Req, Rep](
// pass on the first successful result we get back.
if (canIssueBackup()) {
backupsSent.incr()
val backup = record(req, BackupRequest.let(service(req)))
val tracing = Trace()
val backupTraceId = tracing.nextId
val backup = issueBackup(req, service, tracing, backupTraceId)
orig.select(backup).transform { _ =>
val winner = if (orig.isDefined) orig else backup
val loser = if (winner eq orig) backup else orig
winner.transform { response =>
if (backup eq winner) backupsWon.incr()
if (isSuccess(ReqRep(req, response))) {
if (sendInterrupts) {
loser.raise(SupersededRequestFailure)
}
Future.const(response)
} else {
loser
}
}
pickWinner(req, orig, backup, tracing)
}
} else {
budgetExhausted.incr()
Expand Down
Expand Up @@ -2,9 +2,9 @@ package com.twitter.finagle.client

import com.twitter.finagle.Filter.TypeAgnostic
import com.twitter.finagle.client.MethodBuilderTimeout.TunableDuration
import com.twitter.finagle.param
import com.twitter.finagle.service.{Retries, TimeoutFilter, ResponseClassifier, ResponseClass}
import com.twitter.finagle.service.{ResponseClass, ResponseClassifier, Retries, TimeoutFilter}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.TraceInitializerFilter
import com.twitter.finagle.util.{Showable, StackRegistry}
import com.twitter.finagle.{Filter, Name, Service, ServiceFactory, Stack, param, _}
import com.twitter.util.tunable.Tunable
Expand Down Expand Up @@ -62,7 +62,10 @@ private[finagle] object MethodBuilder {
stack: Stack[ServiceFactory[Req, Rep]]
): Stack[ServiceFactory[Req, Rep]] = {
stack
// total timeouts are managed directly by MethodBuilder
// backup requests happen before the stack's filters so MethodBuilder
// has to place this before (outside of the stack).
.remove(TraceInitializerFilter.role)
// total timeouts are managed directly by MethodBuilder
.remove(TimeoutFilter.totalTimeoutRole)
// allow for dynamic per-request timeouts
.replace(TimeoutFilter.role, DynamicTimeout.perRequestModule[Req, Rep])
Expand All @@ -75,6 +78,7 @@ private[finagle] object MethodBuilder {
*/
def create(originalStack: Stack[_], params: Stack.Params): Config = {
Config(
TraceInitializerFilter.typeAgnostic(params[param.Tracer].tracer, true),
MethodBuilderRetry.Config(
if (params.contains[param.ResponseClassifier]) {
Some(params[param.ResponseClassifier].responseClassifier)
Expand All @@ -95,7 +99,10 @@ private[finagle] object MethodBuilder {
* @see [[MethodBuilder.Config.create]] to construct an initial instance.
* Using its `copy` method is appropriate after that.
*/
case class Config private (retry: MethodBuilderRetry.Config, timeout: MethodBuilderTimeout.Config)
case class Config private (
traceInitializer: Filter.TypeAgnostic,
retry: MethodBuilderRetry.Config,
timeout: MethodBuilderTimeout.Config)

/** Used by the `ClientRegistry` */
private[client] val RegistryKey = "methods"
Expand Down Expand Up @@ -276,7 +283,7 @@ private[finagle] final class MethodBuilder[Req, Rep](
val configClassifier = config.retry.underlyingClassifier
val idempotentedConfigClassifier = idempotentify(configClassifier, classifier)

(new MethodBuilder[Req, Rep](
new MethodBuilder[Req, Rep](
refCounted,
dest,
stack,
Expand All @@ -288,7 +295,7 @@ private[finagle] final class MethodBuilder[Req, Rep](
+ param.ResponseClassifier(idempotentedStackClassifier)
+ stackParams[Retries.Budget],
config
)).withRetry.forClassifier(idempotentedConfigClassifier)
).withRetry.forClassifier(idempotentedConfigClassifier)
}

private[this] def nonidempotentify(
Expand Down Expand Up @@ -329,6 +336,12 @@ private[finagle] final class MethodBuilder[Req, Rep](
).withRetry.forClassifier(nonidempotentedConfigClassifier)
}

/**
* Allow customizations for protocol-specific trace initialization.
*/
def withTraceInitializer(initializer: Filter.TypeAgnostic): MethodBuilder[Req, Rep] =
withConfig(config.copy(traceInitializer = initializer))

//
// Build
//
Expand Down Expand Up @@ -373,7 +386,7 @@ private[finagle] final class MethodBuilder[Req, Rep](
private[this] def statsReceiver(methodName: Option[String]): StatsReceiver = {
val clientScoped = stackParams[param.Stats].statsReceiver.scope(clientName)
methodName match {
case Some(methodName) => clientScoped.scope(methodName)
case Some(name) => clientScoped.scope(name)
case None => clientScoped
}
}
Expand All @@ -387,6 +400,7 @@ private[finagle] final class MethodBuilder[Req, Rep](
// total timeouts and before per-request timeouts so that each backup uses the per-request
// timeout.
//
// - Trace Initialization
// - Logical Stats
// - Failure logging
// - Annotate method name for a `Failure`
Expand All @@ -400,12 +414,12 @@ private[finagle] final class MethodBuilder[Req, Rep](
val timeouts = withTimeout

val failureSource = methodName match {
case Some(methodName) => addFailureSource(methodName)
case Some(name) => addFailureSource(name)
case None => TypeAgnostic.Identity
}

retries
.logicalStatsFilter(stats)
config.traceInitializer
.andThen(retries.logicalStatsFilter(stats))
.andThen(retries.logFailuresFilter(clientName, methodName))
.andThen(failureSource)
.andThen(timeouts.totalFilter)
Expand Down
Expand Up @@ -10,6 +10,7 @@ import java.nio.ByteBuffer
sealed abstract class Annotation

object Annotation {

case object WireSend extends Annotation
case object WireRecv extends Annotation
final case class WireRecvError(error: String) extends Annotation
Expand Down
Expand Up @@ -6,14 +6,24 @@ import com.twitter.util.{Future, Throw}
object TraceInitializerFilter {
val role: Stack.Role = Stack.Role("TraceInitializerFilter")

/**
* @param newId Set the next TraceId when the tracer is pushed, `true` for clients.
*/
private[finagle] def apply[Req, Rep](tracer: Tracer, newId: Boolean): Filter[Req, Rep, Req, Rep] =
new TraceInitializerFilter[Req, Rep](tracer, newId)

private[finagle] def typeAgnostic(tracer: Tracer, newId: Boolean): Filter.TypeAgnostic =
new Filter.TypeAgnostic {
def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] = apply(tracer, newId)
}

private[finagle] class Module[Req, Rep](newId: Boolean)
extends Stack.Module1[param.Tracer, ServiceFactory[Req, Rep]] {
def this() = this(true)
val role: Stack.Role = TraceInitializerFilter.role
val description = "Initialize the tracing system"
def make(_tracer: param.Tracer, next: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep] = {
val param.Tracer(tracer) = _tracer
new TraceInitializerFilter[Req, Rep](tracer, newId).andThen(next)
apply(_tracer.tracer, newId).andThen(next)
}
}

Expand Down

0 comments on commit 5201f62

Please sign in to comment.