Skip to content

Commit

Permalink
finagle-core: Introduce local tracing and fix trace timing operations
Browse files Browse the repository at this point in the history
Problem

Finagle does not have a nice API for generating local spans. In addition,
several existing APIs that approximate simple use cases, and have been used, for
local spans do not work as advertised and have caused confusion when expected
information does not appear in the resulting traces. Specifically, the
`Trace#time` and the `Trace#timeFuture` APIs silently discard all timing
information.

Solution/Result

Introduce a new `Trace#localSpan` API to accurately annotate spans with a
beginning and ending annotation/timestamp. In addition, fix the `Trace#time` and
`Trace#timeFuture` APIs to record the measured time using a `BinaryAnnotation`.
These features can either be used together or separately to provide a richer set
of functionality than previously available.

JIRA Issues: CSL-7502

Differential Revision: https://phabricator.twitter.biz/D404869
  • Loading branch information
David Rusek authored and jenkins committed Dec 11, 2019
1 parent 3007e5d commit 1c6d5d2
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 24 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ New Features
* finagle-core, finagle-exp: Add annotations to ``DarkTrafficFilter`` to identify which span
is dark, as well as which light span it correlates with. ``PHAB_ID=D402864``

* finagle-core: Introduce `Trace#traceLocal` for creating local spans within a trace context.
``PHAB_ID=D404869``

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -37,6 +40,11 @@ Runtime Behavior Changes
* finagle-zipkin-core: Tracing produces microsecond resolution timestamps in JDK9 or later.
``PHAB_ID=D400661``

* finagle-core: `Trace#time` and `Trace#timeFuture` no longer generate timestamped annotations or
silently discard timing information. They now instead generate a `BinaryAnnotation` containing
the timing information. In order to also get timestamped `Annotations` for when the operation
began and ended, use in conjunction with `Trace#traceLocal`. ``PHAB_ID=D404869``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

Expand All @@ -47,6 +55,14 @@ Breaking API Changes
deprecated, so to construct it, you must call one of the RichClientParam.apply
methods. ``PHAB_ID=D400382``

Deprecations
~~~~~~~~~~~~

* finagle-core: Deprecate `Tracing#record(message, duration)` as it does not have the intended
effect and silently discards any duration information in the resulting trace. Instead you should
use either `Tracing#recordBinary` or a combination of `Trace#traceLocal` and `Trace#time`.
``PHAB_ID=D404869``

Bug Fixes
~~~~~~~~~

Expand Down
36 changes: 36 additions & 0 deletions doc/src/sphinx/Tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Finagle generates distributed trace data for each request. Traces allow one to u

Finagle tracing represents pieces of a request/response path as spans. Each span shares a single 64-bit traceid which allows them to later be aggregated and analyzed as a complete trace. Likewise span ids are shared between hops, between a client and a server, in a trace. Systems such as `Zipkin <http://zipkin.io>`_, which we use at Twitter, can be used to collect, correlate, and view this data.

Configuration
-------------

By default Finagle uses a `DefaultTracer` which will load implementations from included artifacts using the Java `ServiceLoader <https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html>`_. Out of the box, adding the finagle-zipkin-scribe package to your classpath will enable sending tracing spans to zipkin via the `scribe <http://go/scribe>`_ protocol.

You can replicate this functionality and globally install a tracer by including an artifact with a service loaded `Tracer`. Your implementation must implement the `Tracer <https://github.com/twitter/finagle/blob/develop/finagle-core/src/main/scala/com/twitter/finagle/tracing/Tracer.scala>`_ API and follow the requirements necessary to be loaded by the `ServiceLoader <https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html>`_.
Expand Down Expand Up @@ -90,6 +93,39 @@ Trace System Initialization

The tracing system is initialized by the `TraceInitializationFilter`. This filter is present in the default Finagle stack for both the client and the server. The role of this filter is to set up the tracing subsystem and wrap incoming/outgoing requests with the correct tracing context, this includes either using the incoming trace id or generating a new one, the effect being a fully enabled tracing system in the current context.

Tracing
-------

In general you can use any method from the `Trace` API to add annotations to the current tracing context. This allows you to annotate events or information happening at the time a request is being made or processed.

Each request operates within the scope of a span, which is created when the request is received or an initial request is made. In some cases it may be advantageous to segment parts of processing a request into their own discrete events. You can do this by generating local spans; these spans exist completely within a single process. This is useful when understanding the relationship between local computation and external requests as well as relative timing and duration of these operations to each other. Local spans can be created with the `Trace#traceLocal` methods.

.. code-block:: scala
import com.twitter.finagle.tracing.Trace
def chainOfEvents(): Int = ???
Trace.traceLocal("important_work") {
// perform within the context of a new SpanId
chainOfEvents()
}
Furthermore, operations can be timed and the result recorded within the trace context:

.. code-block:: scala
import com.twitter.finagle.tracing.Trace
def complexComputation(): Int = ???
val result = Trace.time("complexComputation_ns") {
// record how long the computation took
complexComputation()
}
Combining a local span with timing information allows for comparing the performance of a local computation to other distributed computations.

Standard Annotations
--------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,61 @@ object Trace extends Tracing {
} else f
}

private[this] def serviceName: String = {
TraceServiceName() match {
case Some(name) => name
case None => "local"
}
}

/**
* Create a span that begins right before the function is called
* and ends immediately after the function completes. This
* span will never have a corresponding remote component and is contained
* completely within the process it is created.
*/
def traceLocal[T](name: String)(f: => T): T = {
letId(Trace.nextId) {
val trace = Trace()

// these annotations are necessary to get the
// zipkin ui to properly display the span.
trace.recordRpc(name)
trace.recordServiceName(serviceName)
trace.recordBinary("lc", name)

trace.record("local/begin")
val result = f
trace.record("local/end")
result
}
}

/**
* Time an operation and add an annotation with that duration on it
* Create a span that begins right before the function is called
* and ends immediately after the async operation completes. This span will
* never have a corresponding remote component and is contained
* completely within the process it is created.
*/
def traceLocalFuture[T](name: String)(f: => Future[T]): Future[T] = {
letId(Trace.nextId) {
val trace = Trace()

// these annotations are necessary to get the
// zipkin ui to properly display the span.
trace.recordRpc(name)
trace.recordServiceName(serviceName)
trace.recordBinary("lc", name)

trace.record("local/begin")
f.ensure(trace.record("local/end"))
}
}

/**
* Time an operation and add a binary annotation to the current span
* with the duration.
*
* @param message The message describing the operation
* @param f operation to perform
* @tparam T return type
Expand All @@ -218,19 +271,20 @@ object Trace extends Tracing {
if (trace.isActivelyTracing) {
val elapsed = Stopwatch.start()
val rv = f
trace.record(message, elapsed())
trace.recordBinary(message, elapsed())
rv
} else f
}

/**
* Runs the function f and logs that duration until the future is satisfied with the given name.
* Time an async operation and add a binary annotation to the current span
* with the duration.
*/
def timeFuture[T](message: String)(f: Future[T]): Future[T] = {
val trace = Trace()
if (trace.isActivelyTracing) {
val start = Time.now
f.ensure(trace.record(message, start.untilNow))
val elapsed = Stopwatch.start()
f.ensure(trace.recordBinary(message, elapsed()))
} else f
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ abstract class Tracing {
final def record(message: String): Unit =
record(Annotation.Message(message))

// NOTE: This API is broken and silently discards the duration
@deprecated("Use Trace#traceLocal instead", "2019-20-10")
final def record(message: String, duration: Duration): Unit =
record(Annotation.Message(message), duration)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.twitter.finagle.tracing

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.tracing.Annotation.BinaryAnnotation
import com.twitter.io.Buf
import com.twitter.util.Time
import com.twitter.util.{Return, Throw}
import com.twitter.util.{Await, Future, MockTimer, Return, Throw, Time}
import org.mockito.Matchers.any
import org.mockito.Mockito.{never, times, verify, when, atLeast}
import org.scalatest.{OneInstancePerTest, BeforeAndAfter, FunSuite}
import org.mockito.Mockito.{atLeast, never, times, verify, when}
import org.scalatest.{BeforeAndAfter, FunSuite, OneInstancePerTest}
import org.scalatestplus.mockito.MockitoSugar
import scala.util.Random

Expand Down Expand Up @@ -166,21 +166,6 @@ class TraceTest extends FunSuite with MockitoSugar with BeforeAndAfter with OneI
}
}

/* TODO temporarily disabled until we can mock stopwatches
"Trace.time" in Time.withCurrentTimeFrozen { tc =>
val tracer = new BufferingTracer()
val duration = 1.second
Trace.pushTracer(tracer)
Trace.time("msg") {
tc.advance(duration)
}
tracer.iterator foreach { r =>
r.annotation mustEqual Annotation.Message("msg")
r.duration mustEqual Some(duration)
}
}
*/

test("pass flags to next id") {
val flags = Flags().setDebug
val id = TraceId(Some(SpanId(1L)), Some(SpanId(2L)), SpanId(3L), None, flags)
Expand Down Expand Up @@ -389,4 +374,91 @@ class TraceTest extends FunSuite with MockitoSugar with BeforeAndAfter with OneI
case rv => fail(s"Got $rv")
}
}

test("trace local span") {
val startTime = Time.now
Time.withTimeAt(startTime) { ctrl =>
val tracer = new BufferingTracer()
val parentTraceId = Trace.id
Trace.letTracerAndId(tracer, parentTraceId) {
val childTraceId = Trace.traceLocal("work") {
ctrl.advance(1.second)
Trace.id
}

assert(
tracer.toSeq.contains(Record(childTraceId, startTime, Annotation.Message("local/begin"))))
assert(
tracer.toSeq.contains(
Record(childTraceId, startTime.plus(1.second), Annotation.Message("local/end"))))
assert(parentTraceId != childTraceId)
}
}
}

test("trace async local span") {
val mockTimer = new MockTimer()
val startTime = Time.now
Time.withTimeAt(startTime) { ctrl =>
val tracer = new BufferingTracer()
val parentTraceId = Trace.nextId
Trace.letTracerAndId(tracer, parentTraceId) {
val childTraceIdFuture = Trace.traceLocalFuture("work") {
Future.Done.delayed(1.second)(mockTimer).map(_ => Trace.id)
}

ctrl.advance(1.second)
mockTimer.tick()

val childTraceId = Await.result(childTraceIdFuture)

assert(
tracer.toSeq.contains(Record(childTraceId, startTime, Annotation.Message("local/begin"))))
assert(
tracer.toSeq.contains(
Record(childTraceId, startTime.plus(1.second), Annotation.Message("local/end"))))
assert(parentTraceId != childTraceId)
}
}
}

test("time a computation and trace it") {
val startTime = Time.now
Time.withTimeAt(startTime) { ctrl =>
val tracer = new BufferingTracer()
val traceId = Trace.nextId
Trace.letTracerAndId(tracer, traceId) {
Trace.time("duration") {
ctrl.advance(1.second)
}

assert(
tracer.toSeq.contains(
Record(traceId, startTime.plus(1.second), BinaryAnnotation("duration", 1.second))))
}
}
}

test("time an async computation and trace it") {
val mockTimer = new MockTimer()
val startTime = Time.now
Time.withTimeAt(startTime) { ctrl =>
val tracer = new BufferingTracer()
val traceId = Trace.nextId
val result = Trace.letTracerAndId(tracer, traceId) {
Trace.timeFuture("duration") {
Future.Done.delayed(1.second)(mockTimer)
}
}

ctrl.advance(1.second)
mockTimer.tick()

Await.ready(result)

assert(
tracer.toSeq.contains(
Record(traceId, startTime.plus(1.second), BinaryAnnotation("duration", 1.second))))
}
}
}

0 comments on commit 1c6d5d2

Please sign in to comment.