Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don’t worry, you can still create the pull request.
  • 18 commits
  • 18 files changed
  • 0 commit comments
  • 1 contributor
Showing with 586 additions and 216 deletions.
  1. +10 −0 finagle-core/src/main/scala/com/twitter/finagle/Protocol.scala
  2. +2 −1 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
  3. +3 −1 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
  4. +10 −0 finagle-core/src/main/scala/com/twitter/finagle/filter/CastingFilter.scala
  5. +8 −0 finagle-core/src/main/scala/com/twitter/finagle/tracing/Endpoint.scala
  6. +147 −0 finagle-core/src/main/scala/com/twitter/finagle/tracing/Span.scala
  7. +100 −75 finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala
  8. +0 −1 finagle-core/src/main/scala/com/twitter/finagle/tracing/TracingFilter.scala
  9. +37 −32 finagle-core/src/main/scala/com/twitter/finagle/tracing/Transcript.scala
  10. +14 −16 finagle-core/src/test/scala/com/twitter/finagle/tracing/TranscriptSpec.scala
  11. +14 −15 finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala
  12. +9 −3 ...t/src/main/scala/com/twitter/finagle/thrift/{ThriftBufferCodec.scala → ThriftBufferDecoder.scala}
  13. +2 −2 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientBufferedCodec.scala
  14. +38 −27 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala
  15. +23 −21 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala
  16. +112 −1 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftTracing.scala
  17. +43 −14 finagle-thrift/src/main/thrift/tracing.thrift
  18. +14 −7 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala
View
10 finagle-core/src/main/scala/com/twitter/finagle/Protocol.scala
@@ -28,6 +28,16 @@ trait AbstractCodec[Req, Rep] {
def prepareService(
underlying: Service[IReq, IRep]
): Future[Service[Req, Rep]] = Future.value(underlying)
+
+ // XXX: here we can hide the intermediate types entirely by
+ // producing the channel service here(!).
+ // ChannelService, then
+
+
+ private[finagle] def buildService(
+ underlying: Service[_, _]
+ ): Future[Service[Req, Rep]] =
+ prepareService(underlying.asInstanceOf[Service[IReq, IRep]])
}
trait ClientCodec[Req, Rep] extends AbstractCodec[Req, Rep]
View
3 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -307,7 +307,8 @@ case class ClientBuilder[Req, Rep](
var factory: ServiceFactory[Req, Rep] = null
val bs = buildBootstrap(codec, host)
- factory = new ChannelServiceFactory[Req, Rep](bs, prepareService _, hostStatsReceiver)
+ factory = new ChannelServiceFactory[Req, Rep](
+ bs, prepareService _, hostStatsReceiver)
factory = buildPool(factory, hostStatsReceiver)
if (_requestTimeout < Duration.MaxValue) {
View
4 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
@@ -19,7 +19,9 @@ import com.twitter.finagle.util.{Ok, Error, Cancelled, AsyncLatch}
* channel. It is responsible for requests dispatched to a given
* (connected) channel during its lifetime.
*/
-private[finagle] class ChannelService[Req, Rep](channel: Channel, factory: ChannelServiceFactory[Req, Rep])
+private[finagle] class ChannelService[Req, Rep](
+ channel: Channel,
+ factory: ChannelServiceFactory[Req, Rep])
extends Service[Req, Rep]
{
private[this] val currentReplyFuture = new AtomicReference[Promise[Rep]]
View
10 finagle-core/src/main/scala/com/twitter/finagle/filter/CastingFilter.scala
@@ -0,0 +1,10 @@
+package com.twitter.finagle.filter
+
+import com.twitter.finagle.{Filter, Service}
+
+// class CastingFilter[Req, Rep] extends Filter[Req, Rep, Any, Any]
+// {
+// def apply(request: Req, service: Service[Any, Any]): Future[Rep] =
+// service()
+// }
+
View
8 finagle-core/src/main/scala/com/twitter/finagle/tracing/Endpoint.scala
@@ -0,0 +1,8 @@
+package com.twitter.finagle.tracing
+
+/**
+ * Endpoints describe a TCP endpoint that terminates RPC
+ * communication.
+ */
+
+case class Endpoint(ipv4: Int, port: Short)
View
147 finagle-core/src/main/scala/com/twitter/finagle/tracing/Span.scala
@@ -0,0 +1,147 @@
+package com.twitter.finagle.tracing
+
+/**
+ * The `Span` is the core datastructure in RPC tracing. It denotes the
+ * issuance and handling of a single RPC request.
+ */
+
+import util.Random
+
+import com.twitter.util.RichU64Long
+
+/**
+ * The span itself is an immutable datastructure. Mutations are done
+ * through copying & updating span references elsewhere. If an
+ * explicit root identifier is not specified, it is computed to be
+ * either the parent span or this span itself.
+ *
+ * @param id A 64-bit span identifier
+ * @param parentId Span identifier for the parent span
+ * @param _rootId Span identifier for the root span (aka "Trace")
+ * @param transcript The event-transcript for this span
+ * @param client The client endpoint participating in the span
+ * @param server The server endpoint participating in the span
+ * @param children A sequence of child transcripts
+ */
+case class Span(
+ id : Long,
+ parentId : Option[Long],
+ _rootId : Option[Long],
+ transcript : Transcript,
+ client : Option[Endpoint],
+ server : Option[Endpoint],
+ children : Seq[Span])
+{
+ /**
+ * @return the root identifier for the trace to which this span
+ * belongs
+ */
+ val rootId = _rootId getOrElse (parentId getOrElse id)
+
+ /**
+ * @return whether the transcript for this span is currently
+ * recording
+ */
+ def isRecording = transcript.isRecording
+
+ /**
+ * @return a pretty string for this span ID.
+ */
+ def idString = {
+ val spanHex = new RichU64Long(id).toU64HexString
+ val parentSpanHex = parentId map (new RichU64Long(_).toU64HexString)
+
+ parentSpanHex match {
+ case Some(parentSpanHex) => "%s<:%s".format(spanHex, parentSpanHex)
+ case None => spanHex
+ }
+ }
+
+ override def toString = {
+ "<Span %s>".format(idString)
+ }
+
+ /**
+ * Print this span (together with its children) to the console.
+ */
+ def print(): Unit = print(0)
+ def print(indent: Int) {
+ transcript foreach { record =>
+ val atMs = record.timestamp.inMilliseconds
+ val lines: Seq[String] = record.annotation match {
+ case Annotation.Message(text) => text.split("\n")
+ case annotation => Seq(annotation.toString)
+ }
+
+ lines foreach { line =>
+ println("%s%s %03dms: %s".format(" " * indent, idString, atMs, line))
+ }
+ }
+
+ // Inline children at the appropriate split-off points (ie. where
+ // we see the ClientSend, etc. events)
+ children foreach { _.print(indent + 2) }
+ }
+
+ /**
+ * Make a copy of this span with recording turned off/on.
+ */
+ def recording(v: Boolean) = {
+ if (v && !isRecording)
+ copy(transcript = new BufferingTranscript)
+ else if (!v && isRecording)
+ copy(transcript = NullTranscript)
+ else
+ this
+ }
+
+ /**
+ * Merge the given spans into this one, splicing them into the span
+ * tree by ID. Any spans that cannot be parented are not merged in.
+ */
+ def merge(spans: Seq[Span]): Span = {
+ val parented = spans map { parent =>
+ val children = spans filter { child =>
+ child.parentId match {
+ case Some(id) if id == parent.id => true
+ case _ => false
+ }
+ }
+
+ parent.copy(children = Set() ++ (parent.children ++ children) toSeq)
+ }
+
+ val (newSpan, _) = splice(parented)
+ newSpan
+ }
+
+ private def splice(spans: Seq[Span]): (Span, Seq[Span]) = {
+ // First split out spans that we need to merge
+ val (spansToMerge, otherSpans) = spans partition { _.id == id }
+
+ // Then splice our children, returning spans yet unmatched.
+ val (splicedChildren, unmatchedSpans) =
+ children.foldLeft((List[Span](), otherSpans)) {
+ case ((children, spans), child) =>
+ val (nextChild, nextSpans) = child.splice(spans)
+ (nextChild :: children, nextSpans)
+ }
+
+ // Now split out new children, and any unmatched spans.
+ val (newChildren, nextSpans) =
+ unmatchedSpans partition { _.parentId map { _ == id } getOrElse false }
+
+ spansToMerge foreach { span => transcript.recordAll(span.transcript.iterator) }
+ val newSpan = copy(children = splicedChildren ++ newChildren)
+
+ (newSpan, nextSpans)
+ }
+}
+
+object Span {
+ private[Span] val rng = new Random
+
+ def apply(): Span = Span(None, None, None)
+ def apply(id: Option[Long], parentId: Option[Long], rootId: Option[Long]): Span =
+ Span(id getOrElse rng.nextLong, parentId, rootId, NullTranscript, None, None, Seq())
+}
View
175 finagle-core/src/main/scala/com/twitter/finagle/tracing/Trace.scala
@@ -1,106 +1,131 @@
package com.twitter.finagle.tracing
-import scala.util.Random
-
-import com.twitter.util.{Local, Time, TimeFormat, RichU64Long}
-
-case class TraceID(
- var span: Long,
- var parentSpan: Option[Long],
- val host: Int,
- val vm: String)
-{
- override def toString = {
- val spanHex = new RichU64Long(span).toU64HexString
- val parentSpanHex = parentSpan map (new RichU64Long(_).toU64HexString)
-
- val spanString = parentSpanHex match {
- case Some(parentSpanHex) => "%s<:%s".format(spanHex, parentSpanHex)
- case None => spanHex
- }
-
- "%s,%s".format(spanString, vm)
- }
-}
-
-object Span {
- private[Span] val timeFormat =
- new TimeFormat("yyyyMMdd.HHmmss")
-}
-
-case class Span(
- var traceID: TraceID,
- var startTime: Time,
- var endTime: Time,
- var transcript: Transcript)
-{
- override def toString = {
- "%s: %s+%d".format(
- traceID,
- Span.timeFormat.format(startTime),
- (endTime - startTime).inMilliseconds)
- }
-
- def print() {
- transcript foreach { record =>
- val atMs = (record.timestamp - startTime).inMilliseconds
- record.message.split("\n") foreach { line =>
- println("%s %03dms: %s".format(traceID, atMs, line))
- }
- }
- }
-}
-
+/**
+ * This is a tracing system similar to Dapper:
+ *
+ * “Dapper, a Large-Scale Distributed Systems Tracing Infrastructure”,
+ * Benjamin H. Sigelman, Luiz André Barroso, Mike Burrows, Pat
+ * Stephenson, Manoj Plakal, Donald Beaver, Saul Jaspan, Chandan
+ * Shanbhag, 2010.
+ *
+ * It is meant to be general to whatever underlying RPC mechanism, and
+ * it is up to the underlying codec to implement the transport.
+ */
+
+import com.twitter.util.Local
+
+/**
+ * `Trace` specifies global mutable operations on traces.
+ */
object Trace {
- private[this] val rng = new Random
- private[this] val current = new Local[Span]
+ private[this] case class Ref(var span: Span)
+ private[this] val current = new Local[Ref] // The currently active span.
- private[this] def newSpan() = {
- val traceID = TraceID(rng.nextLong(), None, Host(), VMID())
- Span(traceID, Time.now, Time.epoch, NullTranscript)
+ private[this] def update(span: Span) {
+ ref().span = span
}
- def update(ctx: Span) {
- current() = ctx
- }
-
- def apply(): Span = {
+ private[this] def ref(): Ref = {
if (!current().isDefined)
- current() = newSpan()
+ current() = Ref(Span())
current().get
}
+ /**
+ * Get the current span. Each request handled by a server defines a
+ * span.
+ */
+ def apply(): Span = ref().span
+
+ /**
+ * The current trace ID. This is the root span ID, and is the same
+ * for each request handled by the same request tree.
+ */
+ def id(): Long = this().rootId
+
+ /**
+ * Clear the current span.
+ */
def clear() {
current.clear()
}
- def startSpan() {
- this() = newSpan()
+ /**
+ * Start a new span. When identifiers are specified, use those,
+ * otherwise they are generated for you.
+ */
+ def startSpan(id: Option[Long], parentId: Option[Long], rootId: Option[Long]) {
+ clear()
+ this() = Span(id, parentId, rootId)
}
- def startSpan(parentSpanID: Long) {
- startSpan()
- this().traceID.parentSpan = Some(parentSpanID)
+ /**
+ * Start a new span with random identifiers.
+ */
+ def startSpan() {
+ startSpan(None, None, None)
}
+ /**
+ * End the span.
+ *
+ * @return The span that was just ended.
+ */
def endSpan(): Span = {
- Trace().endTime = Time.now
- val span = Trace()
+ val span = this()
clear()
span
}
+ /**
+ * Add a child span to the current span. Used when dispatching a new
+ * request.
+ *
+ * @return the newly-defined child span
+ */
+ def addChild(): Span = {
+ val span = Span(None, Some(this().id), this()._rootId).recording(isRecording)
+ this() = this().copy(children = this().children ++ Seq(span))
+ span
+ }
+
+ /**
+ * Toggle debugging. When debugging is on, all events are recorded,
+ * and (when the codec supports it) piggy-backed in the RPC
+ * transactions.
+ */
def debug(isOn: Boolean) {
- if (isOn && !Trace().transcript.isRecording)
- Trace().transcript = new BufferingTranscript(Trace().traceID)
- else if (!isOn && Trace().transcript.isRecording)
- Trace().transcript = NullTranscript
+ this() = this().recording(isOn)
}
- def spanID = Trace().traceID.span
+ /**
+ * @returns whether we are currently recording events (through
+ * [[com.twitter.finagle.tracing.Trace.record]])
+ */
+ def isRecording = this().isRecording
+
+ /**
+ * Record a new annotation.
+ */
+ def record(annotation: Annotation) {
+ this().transcript.record(annotation)
+ }
+ /**
+ * Record a new [[com.twitter.finagle.tracing.Annotation.Message]]
+ * annotation.
+ */
def record(message: => String) {
- Trace().transcript.record(message)
+ record(Annotation.Message(message))
+ }
+
+ /**
+ * Merge the given spans into the current span.
+ *
+ * @param spans A sequence of spans to merge into the current span.
+ */
+ def merge(spans: Seq[Span]) {
+ this() = this().merge(spans)
}
}
View
1 finagle-core/src/main/scala/com/twitter/finagle/tracing/TracingFilter.scala
@@ -13,7 +13,6 @@ class TracingFilter[Req, Rep](receiver: TraceReceiver)
extends SimpleFilter[Req, Rep]
{
def apply(request: Req, service: Service[Req, Rep]) = {
- Trace.startSpan()
service(request) ensure {
receiver.receiveSpan(Trace.endSpan())
}
View
69 finagle-core/src/main/scala/com/twitter/finagle/tracing/Transcript.scala
@@ -9,62 +9,54 @@ package com.twitter.finagle.tracing
import collection.mutable.ArrayBuffer
import com.twitter.util.Time
-case class Record(
- traceID: TraceID,
- timestamp: Time, // (nanosecond granularity)
- message: String) // an arbitrary string message
-{
- override def toString = "[%s] @ %s: %s".format(traceID, timestamp, message)
+sealed trait Annotation
+object Annotation {
+ case class ClientSend() extends Annotation
+ case class ClientRecv() extends Annotation
+ case class ServerSend() extends Annotation
+ case class ServerRecv() extends Annotation
+ case class Message(content: String) extends Annotation
+}
+
+case class Record(timestamp: Time, annotation: Annotation) {
+ override def toString = "%s: %s".format(timestamp, annotation)
}
trait Transcript extends Iterable[Record] {
// TODO: support log levels?
- def record(message: => String)
- def isRecording = true
- def merge(other: Iterator[Record])
+ def record(annotation: => Annotation)
+ def recordAll(other: Iterator[Record])
- def print() { foreach { println(_) } }
-}
+ // XXX remove.
+ def merge(other: Transcript) = recordAll(other.iterator)
-/**
- * The default transcript uses the current transcript as of per trace
- * context.
- */
-object Transcript extends Transcript {
- def record(message: => String) { Trace().transcript.record(message) }
- def iterator = Trace().transcript.iterator
- override def isRecording = Trace().transcript.isRecording
- def merge(other: Iterator[Record]) = Trace().transcript.merge(other)
+ def isRecording = true
+ def print() { foreach { println(_) } }
}
/**
* Sinks all messages
*/
object NullTranscript extends Transcript {
- def record(message: => String) {}
+ def record(annotation: => Annotation) {}
+ def recordAll(other: Iterator[Record]) {}
def iterator = Iterator.empty
override def isRecording = false
- def merge(other: Iterator[Record]) {}
}
/**
* Buffers messages to an ArrayBuffer.
*/
-class BufferingTranscript(traceID: TraceID) extends Transcript {
+class BufferingTranscript extends Transcript {
private[this] val buffer = new ArrayBuffer[Record]
- def record(message: => String) = synchronized {
- buffer += Record(traceID, Time.now, message)
- }
-
- def iterator = buffer.iterator
-
- def clear() = synchronized {
- buffer.clear()
+ def record(annotation: => Annotation) = synchronized {
+ // TODO: insertion sort?
+ buffer += Record(Time.now, annotation)
}
- def merge(other: Iterator[Record]) = synchronized {
+ def recordAll(other: Iterator[Record]) = synchronized {
// TODO: resolve time drift by causality
var combined = buffer ++ other
combined = combined sortWith { (a, b) => a.timestamp < b.timestamp }
@@ -73,4 +65,17 @@ class BufferingTranscript(traceID: TraceID) extends Transcript {
buffer.clear()
buffer.appendAll(combined)
}
+
+ def iterator = buffer.iterator
+
+ def clear() = synchronized {
+ buffer.clear()
+ }
+}
+
+class FrozenTranscript(underlying: Iterable[Record]) extends Transcript {
+ def record(annotation: => Annotation) {}
+ def recordAll(other: Iterator[Record]) {}
+ def iterator = underlying.iterator
+ override def isRecording = false
}
View
30 finagle-core/src/test/scala/com/twitter/finagle/tracing/TranscriptSpec.scala
@@ -8,40 +8,38 @@ import com.twitter.util.Time
object TrascriptSpec extends Specification {
"BufferingTranscript" should {
- val traceID = TraceID(1L, Some(2L), 0, "myVM")
-
"record traceID, current time, and message" in {
Time.withCurrentTimeFrozen { timeControl =>
- val t = new BufferingTranscript(traceID)
- t.record("hey there")
+ val t = new BufferingTranscript
+ t.record(Annotation.Message("hey there"))
- val expectedRecord = Record(traceID, Time.now, "hey there")
+ val expectedRecord = Record(Time.now, Annotation.Message("hey there"))
t.size must be_==(1)
t.head must be_==(expectedRecord)
}
}
- "merge" in {
+ "recordAll" in {
Time.withCurrentTimeFrozen { timeControl =>
- val t0 = new BufferingTranscript(traceID)
- val t1 = new BufferingTranscript(traceID)
+ val t0 = new BufferingTranscript
+ val t1 = new BufferingTranscript
- t0.record("1")
+ t0.record(Annotation.Message("1"))
timeControl.advance(1.second)
- t1.record("2")
+ t1.record(Annotation.Message("2"))
timeControl.advance(1.second)
- t0.record("3")
+ t0.record(Annotation.Message("3"))
- t0.merge(t1.iterator)
+ t0.recordAll(t1.iterator)
t0 must haveSize(3)
val records = t0.toArray
- records(0).message must be_==("1")
- records(1).message must be_==("2")
- records(2).message must be_==("3")
+ records(0).annotation must be_==(Annotation.Message("1"))
+ records(1).annotation must be_==(Annotation.Message("2"))
+ records(2).annotation must be_==(Annotation.Message("3"))
// Merging again should kill dups:
- t0.merge(t1.iterator)
+ t0.recordAll(t1.iterator)
t0 must haveSize(3)
}
}
View
29 finagle-stress/src/main/scala/com/twitter/finagle/demo/Tracing.scala
@@ -28,8 +28,8 @@ object Tracing1Service extends Tracing1.ServiceIface {
}
def computeSomething(): Future[String] = {
- println("T1 with trace ID", Trace().traceID)
- Trace.record("hey i'm issuing a call")
+ println("T1 with trace ID", Trace.id)
+ Trace.record("ISSUES")
t2Client.computeSomethingElse() map { somethingElse =>
"t1: " + somethingElse
@@ -54,15 +54,14 @@ object Tracing2Service extends Tracing2.ServiceIface {
}
def computeSomethingElse(): Future[String] = {
- println("T2 with trace ID", Trace().traceID)
- Trace.record("hey i'm issuing a call")
+ println("T2 with trace ID", Trace.id)
+ Trace.record("(t2) hey i'm issuing a call")
for {
x <- t3Client.oneMoreThingToCompute()
y <- t3Client.oneMoreThingToCompute()
} yield {
- Trace.record(
- "got my results! (%s and %s), returning".format(x, y))
+ Trace.record("got my results! (%s and %s), returning".format(x, y))
"t2: " + x + y
}
}
@@ -79,11 +78,10 @@ object Tracing3Service extends Tracing3.ServiceIface {
}
def oneMoreThingToCompute(): Future[String] = {
- println("T3 with trace ID", Trace().traceID)
+ println("T3 with trace ID", Trace.id)
val number = count.incrementAndGet()
- Trace.record(
- "(t3) hey i'm issuing a call %s".format(number))
+ Trace.record("(t3) hey i'm issuing a call %s".format(number))
Future("t3: %d".format(number))
}
}
@@ -99,13 +97,14 @@ object Client {
transport, new TBinaryProtocol.Factory())
// Turn (debug) tracing on.
- Trace().transcript = new BufferingTranscript(Trace().traceID)
-
+ Trace.debug(true)
Trace.record("about to start issuing the root request..")
- val result = client.computeSomething()()
- println("result", result)
- println("Trace:")
- Trace().transcript.print()
+ val result = client.computeSomething()
+ result foreach { result =>
+ println("result", result)
+ println("Trace:")
+ Trace().print()
+ }
}
}
View
12 ...er/finagle/thrift/ThriftBufferCodec.scala → .../finagle/thrift/ThriftBufferDecoder.scala
@@ -1,17 +1,23 @@
package com.twitter.finagle.thrift
+/**
+ * A codec for the buffered (unframed) thrift transport.
+ */
+
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.handler.codec.replay.{ReplayingDecoder, VoidEnum}
import org.apache.thrift.protocol.{TProtocolFactory, TProtocolUtil, TType}
-class ThriftBufferCodec(protocolFactory: TProtocolFactory)
+class ThriftBufferDecoder(protocolFactory: TProtocolFactory)
extends ReplayingDecoder[VoidEnum]
{
override def decode(
- ctx: ChannelHandlerContext, channel: Channel,
- buffer: ChannelBuffer, state: VoidEnum
+ ctx: ChannelHandlerContext,
+ channel: Channel,
+ buffer: ChannelBuffer,
+ state: VoidEnum
) = {
val transport = new ChannelBufferToTransport(buffer)
val iprot = protocolFactory.getProtocol(transport)
View
4 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientBufferedCodec.scala
@@ -15,8 +15,8 @@ class ThriftClientBufferedCodec(protocolFactory: TProtocolFactory)
def getPipeline() = {
val pipeline = framedPipelineFactory.getPipeline
pipeline.replace(
- "thriftFrameCodec", "thriftBufferCodec",
- new ThriftBufferCodec(protocolFactory))
+ "thriftFrameCodec", "thriftBufferDecoder",
+ new ThriftBufferDecoder(protocolFactory))
pipeline
}
}
View
65 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftClientFramedCodec.scala
@@ -1,6 +1,7 @@
package com.twitter.finagle.thrift
-import scala.collection.JavaConversions._
+import collection.JavaConversions._
+
import org.jboss.netty.channel.{
ChannelHandlerContext,
SimpleChannelDownstreamHandler, MessageEvent, Channels,
@@ -14,7 +15,9 @@ import com.twitter.util.Time
import com.twitter.finagle._
import com.twitter.finagle.util.{Ok, Error, Cancelled}
import com.twitter.finagle.util.Conversions._
-import com.twitter.finagle.tracing.{TraceID, Trace, Record}
+import com.twitter.finagle.tracing.{Trace, Record, Annotation}
+
+import conversions._
/**
* ThriftClientFramedCodec implements a framed thrift transport that
@@ -44,7 +47,7 @@ class ThriftClientFramedCodec extends ClientCodec[ThriftClientRequest, Array[Byt
val buffer = new OutputBuffer()
buffer().writeMessageBegin(
new TMessage(ThriftTracing.CanTraceMethodName, TMessageType.CALL, 0))
- val options = new TraceOptions
+ val options = new thrift.TraceOptions
options.write(buffer())
buffer().writeMessageEnd()
@@ -105,19 +108,27 @@ private[thrift] class ThriftClientChannelBufferEncoder
* on the wire. It is applied after all framing.
*/
-private[thrift] class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[Byte]]
+private[thrift] class ThriftClientTracingFilter
+ extends SimpleFilter[ThriftClientRequest, Array[Byte]]
{
- def apply(request: ThriftClientRequest,
- service: Service[ThriftClientRequest, Array[Byte]]) =
- {
- val header = new TracedRequest
- header.setParent_span_id(Trace().traceID.span)
+ def apply(
+ request: ThriftClientRequest,
+ service: Service[ThriftClientRequest, Array[Byte]]
+ ) = {
+ // Create a new span identifier for this request.
+ val span = Trace.addChild()
+ val header = new thrift.TracedRequestHeader
+ header.setSpan_id(span.id)
+ header.setTrace_id(span.rootId)
+ span.parentId foreach { header.setParent_span_id(_) }
header.setDebug(Trace().transcript.isRecording)
val tracedRequest = new ThriftClientRequest(
OutputBuffer.messageToArray(header) ++ request.message,
request.oneway)
+ span.transcript.record(Annotation.ClientSend())
+
val reply = service(tracedRequest)
if (tracedRequest.oneway) {
// Oneway requests don't contain replies, and so they can't be
@@ -125,27 +136,27 @@ private[thrift] class ThriftClientTracingFilter extends SimpleFilter[ThriftClien
reply
} else {
reply map { response =>
- val responseHeader = new TracedResponse
+ span.transcript.record(Annotation.ClientRecv())
+
+ // Peel off the TracedResponseHeader and add any piggy-backed
+ // spans to our own transcript (if we're in debug mode).
+ val responseHeader = new thrift.TracedResponseHeader
val rest = InputBuffer.peelMessage(response, responseHeader)
- if (header.debug && responseHeader.isSetTranscript) {
- val records = responseHeader.transcript map { thriftRecord =>
- val traceID = TraceID(
- thriftRecord.getSpan_id(),
- {
- val spanID = thriftRecord.getParent_span_id()
- if (spanID == 0) None else Some(spanID)
- },
- thriftRecord.getHost(),
- thriftRecord.getVm_id())
-
- Record(
- traceID,
- Time.fromMilliseconds(thriftRecord.getTimestamp_ms()),
- thriftRecord.getMessage())
- }
+ if (header.debug && (responseHeader.spans ne null)) {
+ val spans = responseHeader.spans map { _.toFinagleSpan }
+
+ // println("before")
+ // Trace().print()
+ // println("merge")
+ // spans foreach { span =>
+ // println("SPAN", span.idString)
+ // span.print()
+ // }
- Trace().transcript.merge(records.iterator)
+ Trace.merge(spans)
+ // println("after")
+ // Trace().print()
}
rest
View
44 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftServerFramedCodec.scala
@@ -1,5 +1,7 @@
package com.twitter.finagle.thrift
+import collection.JavaConversions._ // XXX
+
import org.apache.thrift.protocol.{TBinaryProtocol, TMessage, TMessageType}
import org.jboss.netty.channel.{
ChannelHandlerContext,
@@ -8,9 +10,13 @@ import org.jboss.netty.channel.{
import org.jboss.netty.buffer.ChannelBuffers
import com.twitter.util.Future
import com.twitter.finagle._
-import com.twitter.finagle.tracing.Trace
+import com.twitter.finagle.tracing.{BufferingTranscript, Trace, Annotation}
+
+import conversions._
-private[thrift] class ThriftServerChannelBufferEncoder extends SimpleChannelDownstreamHandler {
+private[thrift] class ThriftServerChannelBufferEncoder
+ extends SimpleChannelDownstreamHandler
+{
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) = {
e.getMessage match {
// An empty array indicates a oneway reply.
@@ -40,35 +46,31 @@ private[thrift] class ThriftServerTracingFilter
def apply(request: Array[Byte], service: Service[Array[Byte], Array[Byte]]) = {
// What to do on exceptions here?
if (isUpgraded) {
- val header = new TracedRequest
+ val header = new thrift.TracedRequestHeader
val request_ = InputBuffer.peelMessage(request, header)
- Trace.startSpan(header.getParent_span_id)
+ Trace.startSpan(
+ Some(header.getSpan_id),
+ Some(header.getParent_span_id),
+ Some(header.getTrace_id))
+
if (header.debug)
Trace.debug(true) // (don't turn off when !header.debug)
+ Trace.record(Annotation.ServerRecv())
+
service(request_) map { response =>
+ Trace.record(Annotation.ServerSend())
+
// Wrap some trace data.
- val responseHeader = new TracedResponse
+ val responseHeader = new thrift.TracedResponseHeader
if (header.debug) {
- Trace().transcript foreach { record =>
- val thriftRecord = new TranscriptRecord(
- record.traceID.host,
- record.traceID.vm,
- record.traceID.span,
- record.traceID.parentSpan getOrElse 0,
- record.timestamp.inMilliseconds,
- record.message
- )
-
- responseHeader.addToTranscript(thriftRecord)
- }
+ // Piggy-back span data if we're in debug mode.
+ Trace().toThriftSpans foreach { responseHeader.addToSpans(_) }
}
- val responseHeaderBytes =
- OutputBuffer.messageToArray(responseHeader)
- responseHeaderBytes ++ response
+ OutputBuffer.messageToArray(responseHeader) ++ response
}
} else {
val buffer = new InputBuffer(request)
@@ -86,7 +88,7 @@ private[thrift] class ThriftServerTracingFilter
buffer().writeMessageEnd()
// Note: currently there are no options, so there's no need
- // top parse them out.
+ // to parse them out.
Future.value(buffer.toArray)
} else {
service(request)
View
113 finagle-thrift/src/main/scala/com/twitter/finagle/thrift/ThriftTracing.scala
@@ -1,10 +1,121 @@
package com.twitter.finagle.thrift
+/**
+ * Support for finagle tracing in thrift.
+ */
+
+import collection.JavaConversions._
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+
+import org.jboss.netty.buffer.ChannelBuffers
+
+import com.twitter.util.Time
+import com.twitter.finagle.tracing.{
+ Record, Annotation, FrozenTranscript,
+ Transcript, Span, Endpoint}
+
private[thrift] object ThriftTracing {
/**
* v1: transaction id frame
* v2: full tracing header
+ * v3: b3 (big-brother-bird)
*/
- val CanTraceMethodName = "__can__twitter__trace__v2__"
+ val CanTraceMethodName = "__can__finagle__trace__v3__"
}
+private[thrift] object RichSpan {
+ val Utf8 = Charset.forName("UTF-8")
+}
+
+private[thrift] class RichThriftSpan(self: thrift.Span) {
+ import RichSpan._
+
+ private[this] def endpointFromThrift(endpoint: thrift.Endpoint): Option[Endpoint] =
+ Option(endpoint) map { e => Endpoint(e.getIpv4, e.getPort) }
+
+ /**
+ * Creates a Finagle span from this thrift span.
+ */
+ def toFinagleSpan: Span = Span(
+ self.getId,
+ if (self.isSetParent_id) Some(self.getParent_id) else None,
+ if (self.isSetTrace_id) Some(self.getTrace_id) else None,
+ toTranscript,
+ endpointFromThrift(self.getClient),
+ endpointFromThrift(self.getServer),
+ Seq()
+ )
+
+ /**
+ * Translate this thrift-encoded span into a transcript.
+ */
+ def toTranscript: Transcript = {
+ val records = self.annotations map { annotation =>
+ val value = annotation.value match {
+ case thrift.Constants.CLIENT_SEND => Annotation.ClientSend()
+ case thrift.Constants.CLIENT_RECV => Annotation.ClientRecv()
+ case thrift.Constants.SERVER_SEND => Annotation.ServerSend()
+ case thrift.Constants.SERVER_RECV => Annotation.ServerRecv()
+ case value => Annotation.Message(value)
+ }
+
+ Record(Time.fromMilliseconds(annotation.timestamp), value)
+ }
+
+ new FrozenTranscript(records)
+ }
+}
+
+private[thrift] class RichSpan(self: Span) {
+ private[this] def endpointFromFinagle(endpoint: Endpoint): thrift.Endpoint = {
+ val e = new thrift.Endpoint
+ e.setIpv4(endpoint.ipv4)
+ e.setPort(endpoint.port)
+ e
+ }
+
+ /**
+ * Translate this transcript to a set of spans. A transcript may
+ * contain annotations from several spans.
+ */
+ def toThriftSpans: Seq[thrift.Span] = {
+ val span = new thrift.Span
+
+ span.setId(self.id)
+ self.parentId foreach { span.setParent_id(_) }
+ span.setTrace_id(self.rootId)
+
+ // Endpoints (optional)
+ self.client foreach { e => span.setClient(endpointFromFinagle(e)) }
+ self.server foreach { e => span.setServer(endpointFromFinagle(e)) }
+
+ val annotations = self.transcript map { record =>
+ val value = record.annotation match {
+ case Annotation.ClientSend() => thrift.Constants.CLIENT_SEND
+ case Annotation.ClientRecv() => thrift.Constants.CLIENT_RECV
+ case Annotation.ServerSend() => thrift.Constants.SERVER_SEND
+ case Annotation.ServerRecv() => thrift.Constants.SERVER_RECV
+ case Annotation.Message(value) => value
+ }
+
+ val annotation = new thrift.Annotation
+ annotation.setTimestamp(record.timestamp.inMilliseconds.toLong)
+ annotation.setValue(value)
+ annotation
+ }
+
+ annotations foreach { span.addToAnnotations(_) }
+
+ val childThriftSpans =
+ self.children map { new RichSpan(_) } flatMap { _.toThriftSpans }
+ Seq(span) ++ childThriftSpans
+ }
+}
+
+private[thrift] object conversions {
+ implicit def thriftSpanToRichThriftSpan(span: thrift.Span) =
+ new RichThriftSpan(span)
+ implicit def spanToRichSpan(span: Span) =
+ new RichSpan(span)
+}
View
57 finagle-thrift/src/main/thrift/tracing.thrift
@@ -2,24 +2,53 @@
* Define thrift structs for thrift request tracing.
*/
-namespace java com.twitter.finagle.thrift
-
-struct TranscriptRecord {
- 1: i32 host;
- 2: string vm_id;
- 3: i64 span_id;
- 4: i64 parent_span_id;
- 5: i64 timestamp_ms;
- 6: string message;
+namespace java com.twitter.finagle.thrift.thrift
+
+/**
+ * The following is from BigBrotherBird:
+ * http://j.mp/fZZnyD
+ */
+
+const string CLIENT_SEND = "cs"
+const string CLIENT_RECV = "cr"
+const string SERVER_SEND = "ss"
+const string SERVER_RECV = "sr"
+
+struct Annotation {
+ 1: optional i64 timestamp
+ 2: optional string value
+}
+
+struct Endpoint {
+ 1: optional i32 ipv4,
+ 2: optional i16 port
+}
+
+struct Span {
+ 1: optional i64 trace_id // unique trace id, use for all spans in trace
+ 2: optional string name, // span name, rpc method for example
+ 3: optional i64 id, // unique span id, only used for this span
+ 4: optional i64 parent_id, // parent span id
+ 5: optional Endpoint client,
+ 6: optional Endpoint server,
+ 7: optional list<Annotation> annotations, // list of all annotations/events that occured
+ 8: optional map<string, binary> binary_annotations, // any binary annotations
}
/**
+ * The following are for finagle-thrift specific tracing headers &
+ * negotiation.
+ */
+
+/**
* TracedRequest defines trace headers. These carry the span data, and
* a flag indicating whether the request is to be debugged.
*/
-struct TracedRequest {
- 1: i64 parent_span_id;
- 2: bool debug;
+struct TracedRequestHeader {
+ 1: i64 trace_id;
+ 2: i64 span_id;
+ 3: i64 parent_span_id;
+ 4: bool debug;
}
/**
@@ -27,8 +56,8 @@ struct TracedRequest {
* empty unless the request is being debugged, in which case a
* transcript is copied.
*/
-struct TracedResponse {
- 1: list<TranscriptRecord> transcript;
+struct TracedResponseHeader {
+ 1: list<Span> spans;
}
/**
View
21 finagle-thrift/src/test/scala/com/twitter/finagle/thrift/EndToEndSpec.scala
@@ -14,7 +14,8 @@ import org.jboss.netty.channel.local._
import org.jboss.netty.channel.socket.nio._
import com.twitter.test.{B, SomeStruct, AnException, F}
-import com.twitter.finagle.tracing.Trace
+import com.twitter.finagle.tracing
+import com.twitter.finagle.tracing.{Trace, Annotation}
import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
import com.twitter.finagle.util.Conversions._
import com.twitter.silly.Silly
@@ -29,7 +30,7 @@ object EndToEndSpec extends Specification {
def multiply(a: Int, b: Int) = Future { a * b }
def complex_return(someString: String) = Future {
Trace.record("hey it's me!")
- new SomeStruct(123, Trace().traceID.parentSpan.get.toString)
+ new SomeStruct(123, Trace().parentId.get.toString)
}
def someway() = Future.void
}
@@ -53,14 +54,20 @@ object EndToEndSpec extends Specification {
val future = client.multiply(10, 30)
future() must be_==(300)
- import com.twitter.finagle.tracing.BufferingTranscript
- Trace().transcript = new BufferingTranscript(Trace().traceID)
+ Trace.debug(true)
client.complex_return("a string")().arg_two must be_==(
- "%s".format(Trace().traceID.span.toString))
+ "%s".format(Trace().id.toString))
- Trace().transcript must haveSize(1)
- Trace().transcript.head.message must be_==("hey it's me!")
+
+ Trace().children must haveSize(1)
+ val childSpan = Trace().children.head
+
+ childSpan.transcript must haveSize(5)
+ val annotations = childSpan.transcript map { _.annotation }
+ val texts = annotations collect { case Annotation.Message(text) => text }
+ texts must haveSize(1)
+ texts.head must be_==("hey it's me!")
client.add(1, 2)() must throwA[AnException]
client.add_one(1, 2)() // don't block!

No commit comments for this range

Something went wrong with that request. Please try again.