Skip to content
Permalink
Browse files

finagle: Introduce request logging

Summary: Problem

There is a lack of visibility into where request/response time is
spent, both synchronously and asynchronously, within the Finagle
stack.

Solution

Introduce `c.t.f.filter.RequestLogger` which logs at trace level to
the "com.twitter.finagle.request.Logger" logger.

Result

Better visibility for users and operators.

JIRA Issues: csl-5671

Differential Revision: https://phabricator.twitter.biz/D124352
  • Loading branch information...
kevinoliver authored and jenkins committed Jan 8, 2018
1 parent 68e9d08 commit 203fed55335633173b2a36b98c30c55336baaf3a
@@ -14,6 +14,14 @@ New Features
* finagle-memcached: By default, the Memcached client now creates two connections
to each endpoint, instead of 4. ``PHAB_ID=D119619``

New Features
~~~~~~~~~~~~

* Introduce trace logging of requests as they flow through a Finagle
client or server. These logs can be turned on at runtime by setting the
"com.twitter.finagle.request.Logger" logger to trace level.
``PHAB_ID=D124352``

Bug Fixes
~~~~~~~~~

@@ -1,6 +1,6 @@
package com.twitter.finagle

import scala.annotation.tailrec
import scala.annotation.{implicitNotFound, tailrec}
import scala.collection.mutable

/**
@@ -43,7 +43,7 @@ sealed trait Stack[T] {
* the map traverses on the element produced by `fn`, not the
* original stack.
*/
protected def transform(fn: Stack[T] => Stack[T]): Stack[T] =
def transform(fn: Stack[T] => Stack[T]): Stack[T] =
fn(this) match {
case Node(hd, mk, next) => Node(hd, mk, next.transform(fn))
case leaf @ Leaf(_, _) => leaf
@@ -285,7 +285,7 @@ object Stack {
val head = new Stack.Head {
val role: Stack.Role = _role
val description: String = _role.toString
val parameters = Nil
val parameters: Seq[Stack.Param[_]] = Nil
}
Leaf(head, t)
}
@@ -620,7 +620,7 @@ trait Stackable[T] extends Stack.Head {
* [[StackBuilder]] to provide a convenient interface for constructing
* Stacks.
*/
@scala.annotation.implicitNotFound("${From} is not Stackable to ${To}")
@implicitNotFound("${From} is not Stackable to ${To}")
trait CanStackFrom[-From, To] {
def toStackable(role: Stack.Role, el: From): Stackable[To]
}
@@ -1,6 +1,7 @@
package com.twitter.finagle.client

import com.twitter.finagle._
import com.twitter.finagle.filter.RequestLogger
import com.twitter.finagle.naming.BindingFactory
import com.twitter.finagle.param._
import com.twitter.finagle.stack.nilStack
@@ -102,8 +103,8 @@ trait EndpointerStackClient[Req, Rep, This <: EndpointerStackClient[Req, Rep, Th
* If that is also an empty String, then `dest` is used.
*/
def newClient(dest: Name, label0: String): ServiceFactory[Req, Rep] = {
val Stats(stats) = params[Stats]
val Label(label1) = params[Label]
val stats = params[Stats].statsReceiver
val label1 = params[Label].label

// For historical reasons, we have two sources for identifying
// a client. The most recently set `label0` takes precedence.
@@ -113,7 +114,9 @@ trait EndpointerStackClient[Req, Rep, This <: EndpointerStackClient[Req, Rep, Th
case _ => label0
}

val clientStack = stack ++ (endpointer +: nilStack)
val tranformer = RequestLogger.newStackTransformer(clientLabel)
val clientStack = tranformer(stack ++ (endpointer +: nilStack))

val clientParams = params +
Label(clientLabel) +
Stats(stats.scope(clientLabel)) +
@@ -0,0 +1,127 @@
package com.twitter.finagle.filter

import com.twitter.finagle.{ClientConnection, Service, ServiceFactory, ServiceFactoryProxy, ServiceProxy, Stack}
import com.twitter.finagle.tracing.Trace
import com.twitter.logging.{Level, Logger}
import com.twitter.util.{Future, Stopwatch}
import java.util.concurrent.TimeUnit

object RequestLogger {

/**
* The name of the logger used.
*/
val loggerName = "com.twitter.finagle.request.Logger"

private val log = Logger.get(loggerName)

private[this] def withLogging[Req, Rep](
label: String,
nowNanos: () => Long,
role: Stack.Role,
svcFac: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] =
new ServiceFactoryProxy[Req, Rep](svcFac) {
private[this] val requestLogger = new RequestLogger(label, role.name, nowNanos)
override def apply(conn: ClientConnection): Future[Service[Req, Rep]] = {
super.apply(conn).map { svc =>
new ServiceProxy[Req, Rep](svc) {
override def apply(request: Req): Future[Rep] = {
if (!requestLogger.shouldTrace) {
super.apply(request)
} else {
val startNanos = requestLogger.start()
try requestLogger.endAsync(startNanos, super.apply(request))
finally
requestLogger.endSync(startNanos)
}
}
}
}
}
}

/**
* Used to [[Stack.transform transform]] a [[Stack]] to include request tracing.
*
* @param label the label of the client or server
*/
private[finagle] def newStackTransformer(
label: String,
nowNanos: () => Long = Stopwatch.systemNanos
): Stack.Transformer =
new Stack.Transformer {
def apply[Req, Rep](
stack: Stack[ServiceFactory[Req, Rep]]
): Stack[ServiceFactory[Req, Rep]] = {
stack.transform {
case Stack.Leaf(hd, svcFac) =>
Stack.Leaf(hd, withLogging(label, nowNanos, hd.role, svcFac))
case Stack.Node(hd, mk, next) =>
val mkWithLogging =
(ps: Stack.Params, stack: Stack[ServiceFactory[Req, Rep]]) => {
val mkStack = mk(ps, stack)
val mkSvcFac = mkStack.make(ps)
val svcFac = withLogging(label, nowNanos, hd.role, mkSvcFac)
Stack.Leaf(stack.head, svcFac)
}
Stack.Node(hd, mkWithLogging, next)
}
}
}

}

/**
* Produces fine-grained logging of requests and responses as they flow through
* the Finagle stack, typically [[com.twitter.finagle.Filter Filters]].
*
* Instances are thread-safe and safe to be used by multiple threads.
*
* @param label the label of the client or server
*
* @param name used in the logs to indicate what is starting and ending.
*
* @note logs are done at `TRACE` level in the "com.twitter.finagle.request.Logger"
* `Logger`.
*/
private class RequestLogger(
label: String,
name: String,
nowNanos: () => Long
) {
import RequestLogger._

def shouldTrace: Boolean =
log.isLoggable(Level.TRACE)

def start(): Long = {
val start = nowNanos()
val traceId = Trace.id
log.trace(s"traceId=$traceId $label $name begin")
start
}

def endAsync[T](
startNanos: Long,
future: Future[T]
): Future[T] = {
future.ensure {
val traceId = Trace.id
val elapsedUs = elapsedMicros(startNanos)
log.trace(s"traceId=$traceId $label $name end cumulative async elapsed $elapsedUs us")
}
}

def endSync(startNanos: Long): Unit = {
val traceId = Trace.id
val elapsedUs = elapsedMicros(startNanos)
log.trace(s"traceId=$traceId $label $name end cumulative sync elapsed $elapsedUs us")
}

private[this] def elapsedMicros(startNanos: Long): Long = {
val elapsedNanos = nowNanos() - startNanos
TimeUnit.MICROSECONDS.convert(elapsedNanos, TimeUnit.NANOSECONDS)
}

}
@@ -1,6 +1,7 @@
package com.twitter.finagle.server

import com.twitter.finagle.Stack.Param
import com.twitter.finagle.filter.RequestLogger
import com.twitter.finagle.param._
import com.twitter.finagle.{ClientConnection, ListeningServer, ServiceFactory, Stack}
import com.twitter.finagle.stack.Endpoint
@@ -16,6 +17,7 @@ import java.net.SocketAddress
trait ListeningStackServer[Req, Rep, This <: ListeningStackServer[Req, Rep, This]]
extends StackServer[Req, Rep]
with Stack.Parameterized[This]
with Stack.Transformable[This]
with CommonParams[This]
with WithServerTransport[This]
with WithServerSession[This]
@@ -53,14 +55,18 @@ trait ListeningStackServer[Req, Rep, This <: ListeningStackServer[Req, Rep, This
private[this] val serverParams = params +
Label(serverLabel) +
Stats(statsReceiver) +
Monitor(reporter(label, None) andThen monitor)

private[this] val serviceFactory = (stack ++ Stack.Leaf(Endpoint, factory))
.make(serverParams)
Monitor(reporter(label, None).andThen(monitor))

// We re-parameterize in case `newListeningServer` needs to access the
// finalized parameters.
private[this] val server = withParams(serverParams)
private[this] val server: This = {
val withEndpoint = withStack(stack ++ Stack.Leaf(Endpoint, factory))
withEndpoint
.transformed(RequestLogger.newStackTransformer(serverLabel))
.withParams(serverParams)
}

private[this] val serviceFactory = server.stack.make(serverParams)

// Session bookkeeping used to explicitly manage
// session resources per ListeningServer. Note, draining
@@ -148,4 +154,7 @@ trait ListeningStackServer[Req, Rep, This <: ListeningStackServer[Req, Rep, This
override def configuredParams(newParams: Stack.Params): This = {
withParams(params ++ newParams)
}

override def transformed(t: Stack.Transformer): This =
withStack(t(stack))
}
@@ -15,13 +15,13 @@ object StackServer {

private[this] class JvmTracing[Req, Rep]
extends Stack.Module1[param.Tracer, ServiceFactory[Req, Rep]] {
override def role: Role = Role.jvmTracing
override def description: String = "Server-side JVM tracing"
override def make(
def role: Role = Role.jvmTracing
def description: String = "Server-side JVM tracing"
def make(
_tracer: param.Tracer,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
val param.Tracer(tracer) = _tracer
val tracer = _tracer.tracer
if (tracer.isNull) next
else newJvmFilter[Req, Rep].andThen(next)
}
@@ -31,10 +31,10 @@ object StackServer {
* Canonical Roles for each Server-related Stack modules.
*/
object Role extends Stack.Role("StackServer") {
val serverDestTracing = Stack.Role("ServerDestTracing")
val jvmTracing = Stack.Role("JvmTracing")
val preparer = Stack.Role("preparer")
val protoTracing = Stack.Role("protoTracing")
val serverDestTracing: Stack.Role = Stack.Role("ServerDestTracing")
val jvmTracing: Stack.Role = Stack.Role("JvmTracing")
val preparer: Stack.Role = Stack.Role("preparer")
val protoTracing: Stack.Role = Stack.Role("protoTracing")
}

/**
@@ -109,7 +109,11 @@ object StackServer {
*/
trait StackServer[Req, Rep]
extends StackBasedServer[Req, Rep]
with Stack.Parameterized[StackServer[Req, Rep]] {
with Stack.Parameterized[StackServer[Req, Rep]]
with Stack.Transformable[StackServer[Req, Rep]] {

def transformed(t: Stack.Transformer): StackServer[Req, Rep] =
withStack(t(stack))

/** The current stack used in this StackServer. */
def stack: Stack[ServiceFactory[Req, Rep]]

0 comments on commit 203fed5

Please sign in to comment.
You can’t perform that action at this time.