Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

define TraceReceiver & add support in the server builder.

some renaming to make things clearer.
  • Loading branch information...
commit 0face4a9eb7472e3eef85749d3afe88ac26ceb35 1 parent 97c204e
marius a. eriksen mariusae authored
17 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
View
@@ -18,6 +18,7 @@ import com.twitter.util.Duration
import com.twitter.conversions.time._
import com.twitter.finagle._
+import com.twitter.finagle.tracing.{TraceReceiver, TracingFilter}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util._
import com.twitter.finagle.util.Timer._
@@ -64,7 +65,8 @@ case class ServerBuilder[Req, Rep](
_channelFactory: Option[ReferenceCountedChannelFactory],
_maxConcurrentRequests: Option[Int],
_hostConnectionMaxIdleTime: Option[Duration],
- _requestTimeout: Option[Duration])
+ _requestTimeout: Option[Duration],
+ _traceReceiver: Option[TraceReceiver])
{
import ServerBuilder._
@@ -81,7 +83,8 @@ case class ServerBuilder[Req, Rep](
None, // channelFactory
None, // maxConcurrentRequests
None, // hostConnectionMaxIdleTime
- None // requestTimeout
+ None, // requestTimeout
+ None // traceReceiver
)
def codec[Req1, Rep1](codec: Codec[Req1, Rep1]) =
@@ -118,6 +121,9 @@ case class ServerBuilder[Req, Rep](
def requestTimeout(howlong: Duration) =
copy(_requestTimeout = Some(howlong))
+ def traceReceiver(receiver: TraceReceiver) =
+ copy(_traceReceiver = Some(receiver))
+
private[this] def scopedStatsReceiver =
_statsReceiver map { sr => _name map (sr.scope(_)) getOrElse sr }
@@ -213,6 +219,13 @@ case class ServerBuilder[Req, Rep](
service = (new TimeoutFilter(duration)) andThen service
}
+ // This has to go last (ie. first in the stack) so that
+ // protocol-specific trace support can override our generic
+ // one here.
+ _traceReceiver foreach { traceReceiver =>
+ service = (new TracingFilter(traceReceiver)) andThen service
+ }
+
// Register the channel so we can wait for them for a
// drain. We close the socket but wait for all handlers to
// complete (to drain them individually.) Note: this would be
112 finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala
View
@@ -0,0 +1,112 @@
+package com.twitter.finagle.tracing
+
+/**
+ * Support for tracing in finagle. The main abstraction herein is the
+ * "Trace", which is a local that contains various metadata required
+ * for distributed tracing as well as references to the local traced
+ * events. We mimic Dapper in many ways, including borrowing its
+ * nomenclature.
+ *
+ * “Dapper, a Large-Scale Distributed Systems Tracing Infrastructure”,
+ * Benjamin H. Sigelman, Luiz André Barroso, Mike Burrows, Pat
+ * Stephenson, Manoj Plakal, Donald Beaver, Saul Jaspan, Chandan
+ * Shanbhag, 2010.
+ *
+ * http://research.google.com/pubs/archive/36356.pdf
+ */
+
+import scala.util.Random
+
+import com.twitter.util.{Local, Time, TimeFormat, RichU64Long}
+
+case class TraceID(
+ var span: Long,
+ var parentSpan: Option[Long],
+ val host: Int,
+ val vm: String)
+{
+ override def toString = {
+ val spanHex = new RichU64Long(span).toU64HexString
+ val parentSpanHex = parentSpan map (new RichU64Long(_).toU64HexString)
+
+ val spanString = parentSpanHex match {
+ case Some(parentSpanHex) => "%s<:%s".format(spanHex, parentSpanHex)
+ case None => spanHex
+ }
+
+ "%s,%s".format(spanString, vm)
+ }
+}
+
+object Span {
+ private[Span] val timeFormat =
+ new TimeFormat("yyyyMMdd.HHmmss")
+}
+
+case class Span(
+ var traceID: TraceID,
+ var startTime: Time,
+ var endTime: Time,
+ var transcript: Transcript)
+{
+ override def toString = {
+ "%s: %s+%d".format(
+ traceID,
+ Span.timeFormat.format(startTime),
+ (endTime - startTime).inMilliseconds)
+ }
+}
+
+object Trace {
+ private[this] val rng = new Random
+ private[this] val current = new Local[Span]
+
+ private[this] def newSpan() = {
+ val traceID = TraceID(rng.nextLong(), None, Host(), VMID())
+ Span(traceID, Time.now, Time.epoch, NullTranscript)
+ }
+
+ def update(ctx: Span) {
+ current() = ctx
+ }
+
+ def apply(): Span = {
+ if (!current().isDefined)
+ current() = newSpan()
+
+ current().get
+ }
+
+ def clear() {
+ current.clear()
+ }
+
+ def startSpan() {
+ this() = newSpan()
+ }
+
+ def startSpan(parentSpanID: Long) {
+ startSpan()
+ this().traceID.parentSpan = Some(parentSpanID)
+ }
+
+ def endSpan(): Span = {
+ Trace().endTime = Time.now
+ val span = Trace()
+ clear()
+ span
+ }
+
+ def debug(isOn: Boolean) {
+ if (isOn && !Trace().transcript.isRecording)
+ Trace().transcript = new BufferingTranscript(Trace().traceID)
+ else if (!isOn && Trace().transcript.isRecording)
+ Trace().transcript = NullTranscript
+ }
+
+ def spanID = Trace().traceID.span
+
+ def record(message: => String) {
+ Trace().transcript.record(message)
+ }
+}
60 finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceContext.scala
View
@@ -1,60 +0,0 @@
-package com.twitter.finagle.tracing
-
-/**
- * Support for tracing in finagle. The main abstraction herein is the
- * "TraceContext", which is a local that contains various metadata
- * required for distributed tracing.
- */
-
-import scala.util.Random
-
-import com.twitter.util.{Local, RichU64Long}
-
-case class TraceID(
- var span: Long,
- var parentSpan: Option[Long],
- val host: Int,
- val vm: String)
-{
- override def toString = {
- val spanHex = new RichU64Long(span).toU64HexString
- val parentSpanHex = parentSpan map (new RichU64Long(_).toU64HexString)
-
- val spanString = parentSpanHex match {
- case Some(parentSpanHex) => "%s<:%s".format(spanHex, parentSpanHex)
- case None => spanHex
- }
-
- "%s,%s".format(spanString, vm)
- }
-}
-
-case class TraceContext(
- var traceID: TraceID,
- var transcript: Transcript
-)
-
-object TraceContext {
- private[this] val rng = new Random
- private[this] val current = new Local[TraceContext]
-
- def update(ctx: TraceContext) {
- current() = ctx
- }
-
- def apply(): TraceContext = {
- if (!current().isDefined)
- current() = newContext()
-
- current().get
- }
-
- def newContext() = {
- val traceID = TraceID(rng.nextLong(), None, Host(), VMID())
- TraceContext(traceID, NullTranscript)
- }
-
- def reset() {
- this() = newContext()
- }
-}
20 finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceReceiver.scala
View
@@ -0,0 +1,20 @@
+package com.twitter.finagle.tracing
+
+/**
+ * Trace receivers are called after the completion of every span. These
+ * in turn can be used to implement trace collection ala Dapper.
+ */
+
+trait TraceReceiver {
+ /**
+ * receiveSpan is called for every span production (at the
+ * completion of a request from the server)
+ */
+ def receiveSpan(span: Span): Unit
+}
+
+class ConsoleTraceReceiver extends TraceReceiver {
+ def receiveSpan(span: Span) {
+ println(span)
+ }
+}
21 finagle-core/src/main/scala/com/twitter/finagle/tracing/TracingFilter.scala
View
@@ -0,0 +1,21 @@
+package com.twitter.finagle.tracing
+
+/**
+ * The TracingFilter takes care of span lifecycle events. It is always
+ * placed first in the server filter chain so that protocols with
+ * trace support will override the span resets, and still be properly
+ * reported here.
+ */
+
+import com.twitter.finagle.{Service, SimpleFilter}
+
+class TracingFilter[Req, Rep](receiver: TraceReceiver)
+ extends SimpleFilter[Req, Rep]
+{
+ def apply(request: Req, service: Service[Req, Rep]) = {
+ Trace.startSpan()
+ service(request) ensure {
+ receiver.receiveSpan(Trace.endSpan())
+ }
+ }
+}
13 finagle-core/src/main/scala/com/twitter/finagle/tracing/Transcript.scala
View
@@ -1,8 +1,9 @@
package com.twitter.finagle.tracing
/**
- * Transcripts are records of events and are contained in a trace
- * context.
+ * Transcripts are programmer-provided records of events and are
+ * contained in a trace context. They are equivalent to a sequence of
+ * "annotations" in Dapper.
*/
import collection.mutable.ArrayBuffer
@@ -31,10 +32,10 @@ trait Transcript extends Iterable[Record] {
* context.
*/
object Transcript extends Transcript {
- def record(message: => String) { TraceContext().transcript.record(message) }
- def iterator = TraceContext().transcript.iterator
- override def isRecording = TraceContext().transcript.isRecording
- def merge(other: Iterator[Record]) = TraceContext().transcript.merge(other)
+ def record(message: => String) { Trace().transcript.record(message) }
+ def iterator = Trace().transcript.iterator
+ override def isRecording = Trace().transcript.isRecording
+ def merge(other: Iterator[Record]) = Trace().transcript.merge(other)
}
/**
23 finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala
View
@@ -9,7 +9,7 @@ import org.apache.thrift.protocol.TBinaryProtocol
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.finagle.thrift.{ThriftServerFramedCodec, ThriftClientFramedCodec}
-import com.twitter.finagle.tracing.{TraceContext, BufferingTranscript}
+import com.twitter.finagle.tracing.{Trace, BufferingTranscript}
object Tracing1Service extends Tracing1.ServiceIface {
private[this] val transport = ClientBuilder()
@@ -28,8 +28,8 @@ object Tracing1Service extends Tracing1.ServiceIface {
}
def computeSomething(): Future[String] = {
- println("T1 with trace ID", TraceContext().traceID)
- TraceContext().transcript.record("hey i'm issuing a call")
+ println("T1 with trace ID", Trace().traceID)
+ Trace.record("hey i'm issuing a call")
t2Client.computeSomethingElse() map { somethingElse =>
"t1: " + somethingElse
@@ -54,15 +54,14 @@ object Tracing2Service extends Tracing2.ServiceIface {
}
def computeSomethingElse(): Future[String] = {
- println("T2 with trace ID", TraceContext().traceID)
- TraceContext().transcript.record("hey i'm issuing a call")
-
+ println("T2 with trace ID", Trace().traceID)
+ Trace.record("hey i'm issuing a call")
for {
x <- t3Client.oneMoreThingToCompute()
y <- t3Client.oneMoreThingToCompute()
} yield {
- TraceContext().transcript.record(
+ Trace.record(
"got my results! (%s and %s), returning".format(x, y))
"t2: " + x + y
}
@@ -80,10 +79,10 @@ object Tracing3Service extends Tracing3.ServiceIface {
}
def oneMoreThingToCompute(): Future[String] = {
- println("T3 with trace ID", TraceContext().traceID)
+ println("T3 with trace ID", Trace().traceID)
val number = count.incrementAndGet()
- TraceContext().transcript.record(
+ Trace.record(
"(t3) hey i'm issuing a call %s".format(number))
Future("t3: %d".format(number))
}
@@ -100,13 +99,13 @@ object Client {
transport, new TBinaryProtocol.Factory())
// Turn (debug) tracing on.
- TraceContext().transcript = new BufferingTranscript(TraceContext().traceID)
+ Trace().transcript = new BufferingTranscript(Trace().traceID)
- TraceContext().transcript.record("about to start issuing the root request..")
+ Trace.record("about to start issuing the root request..")
val result = client.computeSomething()()
println("result", result)
println("Trace:")
- TraceContext().transcript.print()
+ Trace().transcript.print()
}
}
1  finagle-stress/src/main/scala/com/twitter/finagle/stress/EndToEndStress.scala
View
@@ -17,6 +17,7 @@ import com.twitter.finagle.builder.Http
import com.twitter.finagle.util.Timer
import com.twitter.finagle.Service
import com.twitter.finagle.stats.OstrichStatsReceiver
+import com.twitter.finagle.tracing.ConsoleTraceReceiver
object EndToEndStress {
private[this] object HttpService
16 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala
View
@@ -19,7 +19,7 @@ import com.twitter.finagle._
import com.twitter.finagle.util.{Ok, Error, Cancelled, TracingHeader}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.channel.ChannelService
-import com.twitter.finagle.tracing.{TraceID, TraceContext, Record}
+import com.twitter.finagle.tracing.{TraceID, Trace, Record}
/**
* ThriftClientFramedCodec implements a framed thrift transport that
@@ -70,7 +70,7 @@ class ThriftClientFramedCodec extends Codec[ThriftClientRequest, Array[Byte]]
} else {
// Otherwise, apply our tracing filter first. This will read
// the TraceData frames, and apply them to the current
- // TraceContext.
+ // Trace.
(new ThriftClientTracingFilter) andThen underlying
}
}
@@ -108,9 +108,9 @@ class ThriftClientChannelBufferEncoder
}
/**
- * ThriftClientTracingFilter implements TraceContext support for
- * thrift. This is applied *after* the Channel has been upgraded (via
- * negotiation). It serializes the current TraceContext into a header
+ * ThriftClientTracingFilter implements Trace support for thrift. This
+ * is applied *after* the Channel has been upgraded (via
+ * negotiation). It serializes the current Trace into a header
* on the wire. It is applied after all framing.
*/
@@ -120,8 +120,8 @@ class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[
service: Service[ThriftClientRequest, Array[Byte]]) =
{
val header = new TracedRequest
- header.setParent_span_id(TraceContext().traceID.span)
- header.setDebug(TraceContext().transcript.isRecording)
+ header.setParent_span_id(Trace().traceID.span)
+ header.setDebug(Trace().transcript.isRecording)
val tracedRequest = request.copy(
message = OutputBuffer.messageToArray(header) ++ request.message)
@@ -153,7 +153,7 @@ class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[
thriftRecord.getMessage())
}
- TraceContext().transcript.merge(records.iterator)
+ Trace().transcript.merge(records.iterator)
}
rest
16 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala
View
@@ -14,7 +14,7 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler
import com.twitter.util.Future
import com.twitter.finagle._
import com.twitter.finagle.util.TracingHeader
-import com.twitter.finagle.tracing.{BufferingTranscript, TraceContext}
+import com.twitter.finagle.tracing.{BufferingTranscript, Trace}
class ThriftServerChannelBufferEncoder extends SimpleChannelDownstreamHandler {
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) = {
@@ -49,18 +49,16 @@ class ThriftServerTracingFilter
val header = new TracedRequest
val request_ = InputBuffer.peelMessage(request, header)
- TraceContext.reset()
- TraceContext().traceID.parentSpan = Some(header.getParent_span_id)
-
- if (header.debug && !TraceContext().transcript.isRecording)
- TraceContext().transcript = new BufferingTranscript(TraceContext().traceID)
+ Trace.startSpan(header.getParent_span_id)
+ if (header.debug)
+ Trace.debug(true) // (don't turn off when !header.debug)
service(request_) map { response =>
// Wrap some trace data.
val responseHeader = new TracedResponse
if (header.debug) {
- TraceContext().transcript foreach { record =>
+ Trace().transcript foreach { record =>
val thriftRecord = new TranscriptRecord(
record.traceID.host,
record.traceID.vm,
@@ -69,11 +67,13 @@ class ThriftServerTracingFilter
record.timestamp.inMilliseconds,
record.message
)
+
responseHeader.addToTranscript(thriftRecord)
}
}
- val responseHeaderBytes = OutputBuffer.messageToArray(responseHeader)
+ val responseHeaderBytes =
+ OutputBuffer.messageToArray(responseHeader)
responseHeaderBytes ++ response
}
} else {
14 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala
View
@@ -14,7 +14,7 @@ import org.jboss.netty.channel.local._
import org.jboss.netty.channel.socket.nio._
import com.twitter.test.{B, SomeStruct, AnException, F}
-import com.twitter.finagle.tracing.TraceContext
+import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.finagle.util.Conversions._
import com.twitter.silly.Silly
@@ -28,8 +28,8 @@ object EndToEndSpec extends Specification {
def add_one(a: Int, b: Int) = Future.void
def multiply(a: Int, b: Int) = Future { a * b }
def complex_return(someString: String) = Future {
- TraceContext().transcript.record("hey it's me!")
- new SomeStruct(123, TraceContext().traceID.parentSpan.get.toString)
+ Trace.record("hey it's me!")
+ new SomeStruct(123, Trace().traceID.parentSpan.get.toString)
}
def someway() = Future.void
}
@@ -54,13 +54,13 @@ object EndToEndSpec extends Specification {
future() must be_==(300)
import com.twitter.finagle.tracing.BufferingTranscript
- TraceContext().transcript = new BufferingTranscript(TraceContext().traceID)
+ Trace().transcript = new BufferingTranscript(Trace().traceID)
client.complex_return("a string")().arg_two must be_==(
- "%s".format(TraceContext().traceID.span.toString))
+ "%s".format(Trace().traceID.span.toString))
- TraceContext().transcript must haveSize(1)
- TraceContext().transcript.head.message must be_==("hey it's me!")
+ Trace().transcript must haveSize(1)
+ Trace().transcript.head.message must be_==("hey it's me!")
client.add(1, 2)() must throwA[AnException]
client.add_one(1, 2)() // don't block!
Please sign in to comment.
Something went wrong with that request. Please try again.