diff --git a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala index e4a89763de..68ba3bd6ec 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala @@ -18,6 +18,7 @@ import com.twitter.util.Duration import com.twitter.conversions.time._ import com.twitter.finagle._ +import com.twitter.finagle.tracing.{TraceReceiver, TracingFilter} import com.twitter.finagle.util.Conversions._ import com.twitter.finagle.util._ import com.twitter.finagle.util.Timer._ @@ -64,7 +65,8 @@ case class ServerBuilder[Req, Rep]( _channelFactory: Option[ReferenceCountedChannelFactory], _maxConcurrentRequests: Option[Int], _hostConnectionMaxIdleTime: Option[Duration], - _requestTimeout: Option[Duration]) + _requestTimeout: Option[Duration], + _traceReceiver: Option[TraceReceiver]) { import ServerBuilder._ @@ -81,7 +83,8 @@ case class ServerBuilder[Req, Rep]( None, // channelFactory None, // maxConcurrentRequests None, // hostConnectionMaxIdleTime - None // requestTimeout + None, // requestTimeout + None // traceReceiver ) def codec[Req1, Rep1](codec: Codec[Req1, Rep1]) = @@ -118,6 +121,9 @@ case class ServerBuilder[Req, Rep]( def requestTimeout(howlong: Duration) = copy(_requestTimeout = Some(howlong)) + def traceReceiver(receiver: TraceReceiver) = + copy(_traceReceiver = Some(receiver)) + private[this] def scopedStatsReceiver = _statsReceiver map { sr => _name map (sr.scope(_)) getOrElse sr } @@ -213,6 +219,13 @@ case class ServerBuilder[Req, Rep]( service = (new TimeoutFilter(duration)) andThen service } + // This has to go last (ie. first in the stack) so that + // protocol-specific trace support can override our generic + // one here. + _traceReceiver foreach { traceReceiver => + service = (new TracingFilter(traceReceiver)) andThen service + } + // Register the channel so we can wait for them for a // drain. We close the socket but wait for all handlers to // complete (to drain them individually.) Note: this would be diff --git a/finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala b/finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala new file mode 100644 index 0000000000..f5976ace27 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala @@ -0,0 +1,112 @@ +package com.twitter.finagle.tracing + +/** + * Support for tracing in finagle. The main abstraction herein is the + * "Trace", which is a local that contains various metadata required + * for distributed tracing as well as references to the local traced + * events. We mimic Dapper in many ways, including borrowing its + * nomenclature. + * + * “Dapper, a Large-Scale Distributed Systems Tracing Infrastructure”, + * Benjamin H. Sigelman, Luiz André Barroso, Mike Burrows, Pat + * Stephenson, Manoj Plakal, Donald Beaver, Saul Jaspan, Chandan + * Shanbhag, 2010. + * + * http://research.google.com/pubs/archive/36356.pdf + */ + +import scala.util.Random + +import com.twitter.util.{Local, Time, TimeFormat, RichU64Long} + +case class TraceID( + var span: Long, + var parentSpan: Option[Long], + val host: Int, + val vm: String) +{ + override def toString = { + val spanHex = new RichU64Long(span).toU64HexString + val parentSpanHex = parentSpan map (new RichU64Long(_).toU64HexString) + + val spanString = parentSpanHex match { + case Some(parentSpanHex) => "%s<:%s".format(spanHex, parentSpanHex) + case None => spanHex + } + + "%s,%s".format(spanString, vm) + } +} + +object Span { + private[Span] val timeFormat = + new TimeFormat("yyyyMMdd.HHmmss") +} + +case class Span( + var traceID: TraceID, + var startTime: Time, + var endTime: Time, + var transcript: Transcript) +{ + override def toString = { + "%s: %s+%d".format( + traceID, + Span.timeFormat.format(startTime), + (endTime - startTime).inMilliseconds) + } +} + +object Trace { + private[this] val rng = new Random + private[this] val current = new Local[Span] + + private[this] def newSpan() = { + val traceID = TraceID(rng.nextLong(), None, Host(), VMID()) + Span(traceID, Time.now, Time.epoch, NullTranscript) + } + + def update(ctx: Span) { + current() = ctx + } + + def apply(): Span = { + if (!current().isDefined) + current() = newSpan() + + current().get + } + + def clear() { + current.clear() + } + + def startSpan() { + this() = newSpan() + } + + def startSpan(parentSpanID: Long) { + startSpan() + this().traceID.parentSpan = Some(parentSpanID) + } + + def endSpan(): Span = { + Trace().endTime = Time.now + val span = Trace() + clear() + span + } + + def debug(isOn: Boolean) { + if (isOn && !Trace().transcript.isRecording) + Trace().transcript = new BufferingTranscript(Trace().traceID) + else if (!isOn && Trace().transcript.isRecording) + Trace().transcript = NullTranscript + } + + def spanID = Trace().traceID.span + + def record(message: => String) { + Trace().transcript.record(message) + } +} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceContext.scala b/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceContext.scala deleted file mode 100644 index d81db3dafe..0000000000 --- a/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceContext.scala +++ /dev/null @@ -1,60 +0,0 @@ -package com.twitter.finagle.tracing - -/** - * Support for tracing in finagle. The main abstraction herein is the - * "TraceContext", which is a local that contains various metadata - * required for distributed tracing. - */ - -import scala.util.Random - -import com.twitter.util.{Local, RichU64Long} - -case class TraceID( - var span: Long, - var parentSpan: Option[Long], - val host: Int, - val vm: String) -{ - override def toString = { - val spanHex = new RichU64Long(span).toU64HexString - val parentSpanHex = parentSpan map (new RichU64Long(_).toU64HexString) - - val spanString = parentSpanHex match { - case Some(parentSpanHex) => "%s<:%s".format(spanHex, parentSpanHex) - case None => spanHex - } - - "%s,%s".format(spanString, vm) - } -} - -case class TraceContext( - var traceID: TraceID, - var transcript: Transcript -) - -object TraceContext { - private[this] val rng = new Random - private[this] val current = new Local[TraceContext] - - def update(ctx: TraceContext) { - current() = ctx - } - - def apply(): TraceContext = { - if (!current().isDefined) - current() = newContext() - - current().get - } - - def newContext() = { - val traceID = TraceID(rng.nextLong(), None, Host(), VMID()) - TraceContext(traceID, NullTranscript) - } - - def reset() { - this() = newContext() - } -} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceReceiver.scala b/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceReceiver.scala new file mode 100644 index 0000000000..c365d834e3 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceReceiver.scala @@ -0,0 +1,20 @@ +package com.twitter.finagle.tracing + +/** + * Trace receivers are called after the completion of every span. These + * in turn can be used to implement trace collection ala Dapper. + */ + +trait TraceReceiver { + /** + * receiveSpan is called for every span production (at the + * completion of a request from the server) + */ + def receiveSpan(span: Span): Unit +} + +class ConsoleTraceReceiver extends TraceReceiver { + def receiveSpan(span: Span) { + println(span) + } +} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/tracing/TracingFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/tracing/TracingFilter.scala new file mode 100644 index 0000000000..b290b01a41 --- /dev/null +++ b/finagle-core/src/main/scala/com/twitter/finagle/tracing/TracingFilter.scala @@ -0,0 +1,21 @@ +package com.twitter.finagle.tracing + +/** + * The TracingFilter takes care of span lifecycle events. It is always + * placed first in the server filter chain so that protocols with + * trace support will override the span resets, and still be properly + * reported here. + */ + +import com.twitter.finagle.{Service, SimpleFilter} + +class TracingFilter[Req, Rep](receiver: TraceReceiver) + extends SimpleFilter[Req, Rep] +{ + def apply(request: Req, service: Service[Req, Rep]) = { + Trace.startSpan() + service(request) ensure { + receiver.receiveSpan(Trace.endSpan()) + } + } +} diff --git a/finagle-core/src/main/scala/com/twitter/finagle/tracing/Transcript.scala b/finagle-core/src/main/scala/com/twitter/finagle/tracing/Transcript.scala index 0fbbfc27d0..e4166e1449 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/tracing/Transcript.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/tracing/Transcript.scala @@ -1,8 +1,9 @@ package com.twitter.finagle.tracing /** - * Transcripts are records of events and are contained in a trace - * context. + * Transcripts are programmer-provided records of events and are + * contained in a trace context. They are equivalent to a sequence of + * "annotations" in Dapper. */ import collection.mutable.ArrayBuffer @@ -31,10 +32,10 @@ trait Transcript extends Iterable[Record] { * context. */ object Transcript extends Transcript { - def record(message: => String) { TraceContext().transcript.record(message) } - def iterator = TraceContext().transcript.iterator - override def isRecording = TraceContext().transcript.isRecording - def merge(other: Iterator[Record]) = TraceContext().transcript.merge(other) + def record(message: => String) { Trace().transcript.record(message) } + def iterator = Trace().transcript.iterator + override def isRecording = Trace().transcript.isRecording + def merge(other: Iterator[Record]) = Trace().transcript.merge(other) } /** diff --git a/finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala b/finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala index 7f5d925483..bba0f350bc 100644 --- a/finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala +++ b/finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala @@ -9,7 +9,7 @@ import org.apache.thrift.protocol.TBinaryProtocol import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder} import com.twitter.finagle.thrift.{ThriftServerFramedCodec, ThriftClientFramedCodec} -import com.twitter.finagle.tracing.{TraceContext, BufferingTranscript} +import com.twitter.finagle.tracing.{Trace, BufferingTranscript} object Tracing1Service extends Tracing1.ServiceIface { private[this] val transport = ClientBuilder() @@ -28,8 +28,8 @@ object Tracing1Service extends Tracing1.ServiceIface { } def computeSomething(): Future[String] = { - println("T1 with trace ID", TraceContext().traceID) - TraceContext().transcript.record("hey i'm issuing a call") + println("T1 with trace ID", Trace().traceID) + Trace.record("hey i'm issuing a call") t2Client.computeSomethingElse() map { somethingElse => "t1: " + somethingElse @@ -54,15 +54,14 @@ object Tracing2Service extends Tracing2.ServiceIface { } def computeSomethingElse(): Future[String] = { - println("T2 with trace ID", TraceContext().traceID) - TraceContext().transcript.record("hey i'm issuing a call") - + println("T2 with trace ID", Trace().traceID) + Trace.record("hey i'm issuing a call") for { x <- t3Client.oneMoreThingToCompute() y <- t3Client.oneMoreThingToCompute() } yield { - TraceContext().transcript.record( + Trace.record( "got my results! (%s and %s), returning".format(x, y)) "t2: " + x + y } @@ -80,10 +79,10 @@ object Tracing3Service extends Tracing3.ServiceIface { } def oneMoreThingToCompute(): Future[String] = { - println("T3 with trace ID", TraceContext().traceID) + println("T3 with trace ID", Trace().traceID) val number = count.incrementAndGet() - TraceContext().transcript.record( + Trace.record( "(t3) hey i'm issuing a call %s".format(number)) Future("t3: %d".format(number)) } @@ -100,13 +99,13 @@ object Client { transport, new TBinaryProtocol.Factory()) // Turn (debug) tracing on. - TraceContext().transcript = new BufferingTranscript(TraceContext().traceID) + Trace().transcript = new BufferingTranscript(Trace().traceID) - TraceContext().transcript.record("about to start issuing the root request..") + Trace.record("about to start issuing the root request..") val result = client.computeSomething()() println("result", result) println("Trace:") - TraceContext().transcript.print() + Trace().transcript.print() } } diff --git a/finagle-stress/src/main/scala/com/twitter/finagle/stress/EndToEndStress.scala b/finagle-stress/src/main/scala/com/twitter/finagle/stress/EndToEndStress.scala index 437ce2ccd8..cc1e0f64d8 100644 --- a/finagle-stress/src/main/scala/com/twitter/finagle/stress/EndToEndStress.scala +++ b/finagle-stress/src/main/scala/com/twitter/finagle/stress/EndToEndStress.scala @@ -17,6 +17,7 @@ import com.twitter.finagle.builder.Http import com.twitter.finagle.util.Timer import com.twitter.finagle.Service import com.twitter.finagle.stats.OstrichStatsReceiver +import com.twitter.finagle.tracing.ConsoleTraceReceiver object EndToEndStress { private[this] object HttpService diff --git a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala index 1159481e1b..1633a49525 100644 --- a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala +++ b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala @@ -19,7 +19,7 @@ import com.twitter.finagle._ import com.twitter.finagle.util.{Ok, Error, Cancelled, TracingHeader} import com.twitter.finagle.util.Conversions._ import com.twitter.finagle.channel.ChannelService -import com.twitter.finagle.tracing.{TraceID, TraceContext, Record} +import com.twitter.finagle.tracing.{TraceID, Trace, Record} /** * ThriftClientFramedCodec implements a framed thrift transport that @@ -70,7 +70,7 @@ class ThriftClientFramedCodec extends Codec[ThriftClientRequest, Array[Byte]] } else { // Otherwise, apply our tracing filter first. This will read // the TraceData frames, and apply them to the current - // TraceContext. + // Trace. (new ThriftClientTracingFilter) andThen underlying } } @@ -108,9 +108,9 @@ class ThriftClientChannelBufferEncoder } /** - * ThriftClientTracingFilter implements TraceContext support for - * thrift. This is applied *after* the Channel has been upgraded (via - * negotiation). It serializes the current TraceContext into a header + * ThriftClientTracingFilter implements Trace support for thrift. This + * is applied *after* the Channel has been upgraded (via + * negotiation). It serializes the current Trace into a header * on the wire. It is applied after all framing. */ @@ -120,8 +120,8 @@ class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[ service: Service[ThriftClientRequest, Array[Byte]]) = { val header = new TracedRequest - header.setParent_span_id(TraceContext().traceID.span) - header.setDebug(TraceContext().transcript.isRecording) + header.setParent_span_id(Trace().traceID.span) + header.setDebug(Trace().transcript.isRecording) val tracedRequest = request.copy( message = OutputBuffer.messageToArray(header) ++ request.message) @@ -153,7 +153,7 @@ class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[ thriftRecord.getMessage()) } - TraceContext().transcript.merge(records.iterator) + Trace().transcript.merge(records.iterator) } rest diff --git a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala index 00d27a93a3..42d4f6c84b 100644 --- a/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala +++ b/finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala @@ -14,7 +14,7 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler import com.twitter.util.Future import com.twitter.finagle._ import com.twitter.finagle.util.TracingHeader -import com.twitter.finagle.tracing.{BufferingTranscript, TraceContext} +import com.twitter.finagle.tracing.{BufferingTranscript, Trace} class ThriftServerChannelBufferEncoder extends SimpleChannelDownstreamHandler { override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) = { @@ -49,18 +49,16 @@ class ThriftServerTracingFilter val header = new TracedRequest val request_ = InputBuffer.peelMessage(request, header) - TraceContext.reset() - TraceContext().traceID.parentSpan = Some(header.getParent_span_id) - - if (header.debug && !TraceContext().transcript.isRecording) - TraceContext().transcript = new BufferingTranscript(TraceContext().traceID) + Trace.startSpan(header.getParent_span_id) + if (header.debug) + Trace.debug(true) // (don't turn off when !header.debug) service(request_) map { response => // Wrap some trace data. val responseHeader = new TracedResponse if (header.debug) { - TraceContext().transcript foreach { record => + Trace().transcript foreach { record => val thriftRecord = new TranscriptRecord( record.traceID.host, record.traceID.vm, @@ -69,11 +67,13 @@ class ThriftServerTracingFilter record.timestamp.inMilliseconds, record.message ) + responseHeader.addToTranscript(thriftRecord) } } - val responseHeaderBytes = OutputBuffer.messageToArray(responseHeader) + val responseHeaderBytes = + OutputBuffer.messageToArray(responseHeader) responseHeaderBytes ++ response } } else { diff --git a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala index 87a59d6dbb..254878009d 100644 --- a/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala +++ b/finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala @@ -14,7 +14,7 @@ import org.jboss.netty.channel.local._ import org.jboss.netty.channel.socket.nio._ import com.twitter.test.{B, SomeStruct, AnException, F} -import com.twitter.finagle.tracing.TraceContext +import com.twitter.finagle.tracing.Trace import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder} import com.twitter.finagle.util.Conversions._ import com.twitter.silly.Silly @@ -28,8 +28,8 @@ object EndToEndSpec extends Specification { def add_one(a: Int, b: Int) = Future.void def multiply(a: Int, b: Int) = Future { a * b } def complex_return(someString: String) = Future { - TraceContext().transcript.record("hey it's me!") - new SomeStruct(123, TraceContext().traceID.parentSpan.get.toString) + Trace.record("hey it's me!") + new SomeStruct(123, Trace().traceID.parentSpan.get.toString) } def someway() = Future.void } @@ -54,13 +54,13 @@ object EndToEndSpec extends Specification { future() must be_==(300) import com.twitter.finagle.tracing.BufferingTranscript - TraceContext().transcript = new BufferingTranscript(TraceContext().traceID) + Trace().transcript = new BufferingTranscript(Trace().traceID) client.complex_return("a string")().arg_two must be_==( - "%s".format(TraceContext().traceID.span.toString)) + "%s".format(Trace().traceID.span.toString)) - TraceContext().transcript must haveSize(1) - TraceContext().transcript.head.message must be_==("hey it's me!") + Trace().transcript must haveSize(1) + Trace().transcript.head.message must be_==("hey it's me!") client.add(1, 2)() must throwA[AnException] client.add_one(1, 2)() // don't block!