Permalink
Browse files

propagate trace transcripts, test transcripts, etc.

  • Loading branch information...
mariusae committed Feb 16, 2011
1 parent d095dec commit 97c204ec43ae634818cc4d7ee505d522faadd779
@@ -13,9 +13,8 @@ object Host {
dis.readInt
} catch {
case e =>
- log.warning(
- "Failed to retrieve local host address: %s".format(e))
- 0
+ log.warning("Failed to retrieve local host address: %s".format(e))
+ 0
}
def apply(): Int = localHost
@@ -8,18 +8,32 @@ package com.twitter.finagle.tracing
import scala.util.Random
-import com.twitter.util.Local
+import com.twitter.util.{Local, RichU64Long}
+
+case class TraceID(
+ var span: Long,
+ var parentSpan: Option[Long],
+ val host: Int,
+ val vm: String)
+{
+ override def toString = {
+ val spanHex = new RichU64Long(span).toU64HexString
+ val parentSpanHex = parentSpan map (new RichU64Long(_).toU64HexString)
+
+ val spanString = parentSpanHex match {
+ case Some(parentSpanHex) => "%s<:%s".format(spanHex, parentSpanHex)
+ case None => spanHex
+ }
+
+ "%s,%s".format(spanString, vm)
+ }
+}
case class TraceContext(
- var spanID: Long,
- var parentSpanID: Option[Long],
- var transcript: Transcript // an associated transcript
+ var traceID: TraceID,
+ var transcript: Transcript
)
-// Span stuff. Define a thrift struct for carrying this. Blah.
-// transcript: machine identifier (IP address?) subspans, etc? these
-// are usually reconstructed later... it's what we get for logging.
-
object TraceContext {
private[this] val rng = new Random
private[this] val current = new Local[TraceContext]
@@ -35,5 +49,12 @@ object TraceContext {
current().get
}
- def newContext() = TraceContext(rng.nextLong(), None, NullTranscript)
+ def newContext() = {
+ val traceID = TraceID(rng.nextLong(), None, Host(), VMID())
+ TraceContext(traceID, NullTranscript)
+ }
+
+ def reset() {
+ this() = newContext()
+ }
}
@@ -9,19 +9,21 @@ import collection.mutable.ArrayBuffer
import com.twitter.util.Time
case class Record(
- host: Int, // 32-bit IP address
- vmID: String, // virtual machine identifier
- spanID: Long,
- parentSpanID: Option[Long],
+ traceID: TraceID,
timestamp: Time, // (nanosecond granularity)
- message: String // an arbitrary string message
-)
+ message: String) // an arbitrary string message
+{
+ override def toString = "[%s] @ %s: %s".format(traceID, timestamp, message)
+}
trait Transcript extends Iterable[Record] {
- // Log levels?
+ // TODO: support log levels?
def record(message: => String)
def isRecording = true
+ def merge(other: Iterator[Record])
+
+ def print() { foreach { println(_) } }
}
/**
@@ -32,6 +34,7 @@ object Transcript extends Transcript {
def record(message: => String) { TraceContext().transcript.record(message) }
def iterator = TraceContext().transcript.iterator
override def isRecording = TraceContext().transcript.isRecording
+ def merge(other: Iterator[Record]) = TraceContext().transcript.merge(other)
}
/**
@@ -41,25 +44,32 @@ object NullTranscript extends Transcript {
def record(message: => String) {}
def iterator = Iterator.empty
override def isRecording = false
+ def merge(other: Iterator[Record]) {}
}
/**
* Buffers messages to an ArrayBuffer.
*/
-class BufferingTranscript extends Transcript {
+class BufferingTranscript(traceID: TraceID) extends Transcript {
private[this] val buffer = new ArrayBuffer[Record]
def record(message: => String) = synchronized {
- buffer += Record(
- Host(), VMID(),
- TraceContext().spanID, TraceContext().parentSpanID,
- Time.now,
- message)
+ buffer += Record(traceID, Time.now, message)
}
def iterator = buffer.iterator
def clear() = synchronized {
buffer.clear()
}
+
+ def merge(other: Iterator[Record]) = synchronized {
+ // TODO: resolve time drift by causality
+ var combined = buffer ++ other
+ combined = combined sortWith { (a, b) => a.timestamp < b.timestamp }
+ combined = combined distinct
+
+ buffer.clear()
+ buffer.appendAll(combined)
+ }
}
@@ -0,0 +1,49 @@
+package com.twitter.finagle.tracing
+
+import org.specs.Specification
+
+import com.twitter.conversions.time._
+
+import com.twitter.util.Time
+
+object TrascriptSpec extends Specification {
+ "BufferingTranscript" should {
+ val traceID = TraceID(1L, Some(2L), 0, "myVM")
+
+ "record traceID, current time, and message" in {
+ Time.withCurrentTimeFrozen { timeControl =>
+ val t = new BufferingTranscript(traceID)
+ t.record("hey there")
+
+ val expectedRecord = Record(traceID, Time.now, "hey there")
+
+ t.size must be_==(1)
+ t.head must be_==(expectedRecord)
+ }
+ }
+
+ "merge" in {
+ Time.withCurrentTimeFrozen { timeControl =>
+ val t0 = new BufferingTranscript(traceID)
+ val t1 = new BufferingTranscript(traceID)
+
+ t0.record("1")
+ timeControl.advance(1.second)
+ t1.record("2")
+ timeControl.advance(1.second)
+ t0.record("3")
+
+ t0.merge(t1.iterator)
+ t0 must haveSize(3)
+ val records = t0.toArray
+ records(0).message must be_==("1")
+ records(1).message must be_==("2")
+ records(2).message must be_==("3")
+
+ // Merging again should kill dups:
+ t0.merge(t1.iterator)
+ t0 must haveSize(3)
+ }
+ }
+ }
+}
@@ -0,0 +1,112 @@
+package com.twitter.finagle.demo
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.net.InetSocketAddress
+
+import com.twitter.util.Future
+
+import org.apache.thrift.protocol.TBinaryProtocol
+
+import com.twitter.finagle.builder.{ClientBuilder, ServerBuilder}
+import com.twitter.finagle.thrift.{ThriftServerFramedCodec, ThriftClientFramedCodec}
+import com.twitter.finagle.tracing.{TraceContext, BufferingTranscript}
+
+object Tracing1Service extends Tracing1.ServiceIface {
+ private[this] val transport = ClientBuilder()
+ .hosts("localhost:6002")
+ .codec(ThriftClientFramedCodec())
+ .build()
+
+ private[this] val t2Client =
+ new Tracing2.ServiceToClient(transport, new TBinaryProtocol.Factory())
+
+ def main(args: Array[String]) {
+ ServerBuilder()
+ .codec(ThriftServerFramedCodec())
+ .bindTo(new InetSocketAddress(6001))
+ .build(new Tracing1.Service(this, new TBinaryProtocol.Factory()))
+ }
+
+ def computeSomething(): Future[String] = {
+ println("T1 with trace ID", TraceContext().traceID)
+ TraceContext().transcript.record("hey i'm issuing a call")
+
+ t2Client.computeSomethingElse() map { somethingElse =>
+ "t1: " + somethingElse
+ }
+ }
+}
+
+object Tracing2Service extends Tracing2.ServiceIface {
+ private[this] val transport = ClientBuilder()
+ .hosts("localhost:6003")
+ .codec(ThriftClientFramedCodec())
+ .build()
+
+ private[this] val t3Client =
+ new Tracing3.ServiceToClient(transport, new TBinaryProtocol.Factory())
+
+ def main(args: Array[String]) {
+ ServerBuilder()
+ .codec(ThriftServerFramedCodec())
+ .bindTo(new InetSocketAddress(6002))
+ .build(new Tracing2.Service(this, new TBinaryProtocol.Factory()))
+ }
+
+ def computeSomethingElse(): Future[String] = {
+ println("T2 with trace ID", TraceContext().traceID)
+ TraceContext().transcript.record("hey i'm issuing a call")
+
+
+ for {
+ x <- t3Client.oneMoreThingToCompute()
+ y <- t3Client.oneMoreThingToCompute()
+ } yield {
+ TraceContext().transcript.record(
+ "got my results! (%s and %s), returning".format(x, y))
+ "t2: " + x + y
+ }
+ }
+}
+
+object Tracing3Service extends Tracing3.ServiceIface {
+ private[this] val count = new AtomicInteger(0)
+
+ def main(args: Array[String]) {
+ ServerBuilder()
+ .codec(ThriftServerFramedCodec())
+ .bindTo(new InetSocketAddress(6003))
+ .build(new Tracing3.Service(this, new TBinaryProtocol.Factory()))
+ }
+
+ def oneMoreThingToCompute(): Future[String] = {
+ println("T3 with trace ID", TraceContext().traceID)
+
+ val number = count.incrementAndGet()
+ TraceContext().transcript.record(
+ "(t3) hey i'm issuing a call %s".format(number))
+ Future("t3: %d".format(number))
+ }
+}
+
+object Client {
+ def main(args: Array[String]) {
+ val transport = ClientBuilder()
+ .hosts("localhost:6001")
+ .codec(ThriftClientFramedCodec())
+ .build()
+
+ val client = new Tracing1.ServiceToClient(
+ transport, new TBinaryProtocol.Factory())
+
+ // Turn (debug) tracing on.
+ TraceContext().transcript = new BufferingTranscript(TraceContext().traceID)
+
+ TraceContext().transcript.record("about to start issuing the root request..")
+ val result = client.computeSomething()()
+ println("result", result)
+
+ println("Trace:")
+ TraceContext().transcript.print()
+ }
+}
@@ -0,0 +1,14 @@
+
+namespace java com.twitter.finagle.demo
+
+service Tracing1 {
+ string computeSomething();
+}
+
+service Tracing2 {
+ string computeSomethingElse();
+}
+
+service Tracing3 {
+ string oneMoreThingToCompute();
+}
@@ -1,5 +1,7 @@
package com.twitter.finagle.thrift
+import collection.JavaConversions._
+
import org.jboss.netty.channel.{
SimpleChannelHandler, Channel, ChannelEvent, ChannelHandlerContext,
SimpleChannelDownstreamHandler, MessageEvent, Channels,
@@ -10,13 +12,14 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
import org.apache.thrift.protocol.{TBinaryProtocol, TMessage, TMessageType}
import org.apache.thrift.transport.{TMemoryBuffer, TMemoryInputTransport}
-import com.twitter.util.Future
+import com.twitter.conversions.time._
+import com.twitter.util.{Time, Future}
import com.twitter.finagle._
import com.twitter.finagle.util.{Ok, Error, Cancelled, TracingHeader}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.channel.ChannelService
-import com.twitter.finagle.tracing.TraceContext
+import com.twitter.finagle.tracing.{TraceID, TraceContext, Record}
/**
* ThriftClientFramedCodec implements a framed thrift transport that
@@ -117,7 +120,8 @@ class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[
service: Service[ThriftClientRequest, Array[Byte]]) =
{
val header = new TracedRequest
- header.setParent_span_id(TraceContext().spanID)
+ header.setParent_span_id(TraceContext().traceID.span)
+ header.setDebug(TraceContext().transcript.isRecording)
val tracedRequest = request.copy(
message = OutputBuffer.messageToArray(header) ++ request.message)
@@ -129,10 +133,28 @@ class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[
reply
} else {
reply map { response =>
- val header = new TracedResponse
- val rest = InputBuffer.peelMessage(response, header)
+ val responseHeader = new TracedResponse
+ val rest = InputBuffer.peelMessage(response, responseHeader)
+
+ if (header.debug && responseHeader.isSetTranscript) {
+ val records = responseHeader.transcript map { thriftRecord =>
+ val traceID = TraceID(
+ thriftRecord.getSpan_id(),
+ {
+ val spanID = thriftRecord.getParent_span_id()
+ if (spanID == 0) None else Some(spanID)
+ },
+ thriftRecord.getHost(),
+ thriftRecord.getVm_id())
+
+ Record(
+ traceID,
+ Time.fromMilliseconds(thriftRecord.getTimestamp_ms()),
+ thriftRecord.getMessage())
+ }
- // TODO: merge.
+ TraceContext().transcript.merge(records.iterator)
+ }
rest
}
Oops, something went wrong.

0 comments on commit 97c204e

Please sign in to comment.