Permalink
Browse files

new (and cleaner) core datastructures for tracing

  • Loading branch information...
1 parent e12b4e6 commit 1f197a7b01636f783888aca596e074baac31e74b @mariusae mariusae committed Feb 15, 2011
@@ -11,7 +11,8 @@ import scala.util.Random
import com.twitter.util.Local
case class TraceContext(
- var transactionID: Long, // 64-bit transction identifier
+ var spanID: Long,
+ var parentSpanID: Option[Long],
var transcript: Transcript // an associated transcript
)
@@ -34,5 +35,5 @@ object TraceContext {
current().get
}
- def newContext() = TraceContext(rng.nextLong(), NullTranscript)
+ def newContext() = TraceContext(rng.nextLong(), None, NullTranscript)
}
@@ -10,6 +10,9 @@ import com.twitter.util.Time
case class Record(
host: Int, // 32-bit IP address
+ vmID: String, // virtual machine identifier
+ spanID: Long,
+ parentSpanID: Option[Long],
timestamp: Time, // (nanosecond granularity)
message: String // an arbitrary string message
)
@@ -38,7 +41,7 @@ object NullTranscript extends Transcript {
def record(message: => String) {}
def iterator = Iterator.empty
override def isRecording = false
-}
+}
/**
* Buffers messages to an ArrayBuffer.
@@ -47,7 +50,11 @@ class BufferingTranscript extends Transcript {
private[this] val buffer = new ArrayBuffer[Record]
def record(message: => String) = synchronized {
- buffer += Record(Host(), Time.now, message)
+ buffer += Record(
+ Host(), VMID(),
+ TraceContext().spanID, TraceContext().parentSpanID,
+ Time.now,
+ message)
}
def iterator = buffer.iterator
@@ -0,0 +1,6 @@
+package com.twitter.finagle.tracing
+
+object VMID {
+ private[this] val id = management.ManagementFactory.getRuntimeMXBean.getName
+ def apply() = id
+}
@@ -0,0 +1,26 @@
+package com.twitter.finagle.thrift
+
+import org.apache.thrift.TBase
+import org.apache.thrift.protocol.TBinaryProtocol
+import org.apache.thrift.transport.TMemoryInputTransport
+
+object InputBuffer {
+ private[thrift] val protocolFactory = new TBinaryProtocol.Factory()
+
+ def peelMessage(bytes: Array[Byte], message: TBase[_, _]) = {
+ val buffer = new InputBuffer(bytes)
+ message.read(buffer())
+ buffer.remainder
+ }
+}
+
+class InputBuffer(bytes: Array[Byte]) {
+ import InputBuffer._
+
+ private[this] val memoryTransport = new TMemoryInputTransport(bytes)
+ private[this] val iprot = protocolFactory.getProtocol(memoryTransport)
+
+ def apply() = iprot
+
+ def remainder = bytes drop memoryTransport.getBufferPosition
+}
@@ -0,0 +1,36 @@
+package com.twitter.finagle.thrift
+
+/**
+ * OutputBuffers are convenient ways of getting at TProtocols for
+ * output to byte arrays
+ */
+
+import org.apache.thrift.protocol.TBinaryProtocol
+import org.apache.thrift.transport.TMemoryBuffer
+import org.apache.thrift.TBase
+
+object OutputBuffer {
+ private[thrift] val protocolFactory = new TBinaryProtocol.Factory()
+
+ def messageToArray(message: TBase[_, _]) = {
+ val buffer = new OutputBuffer
+ message.write(buffer())
+ buffer.toArray
+ }
+}
+
+class OutputBuffer {
+ import OutputBuffer._
+
+ private[this] val memoryBuffer = new TMemoryBuffer(512)
+ private[this] val oprot = protocolFactory.getProtocol(memoryBuffer)
+
+ def apply() = oprot
+
+ def toArray = {
+ oprot.getTransport().flush()
+ java.util.Arrays.copyOfRange(
+ memoryBuffer.getArray(), 0, memoryBuffer.length())
+ }
+}
+
@@ -1,14 +1,18 @@
package com.twitter.finagle.thrift
+/**
+ * ThriftChannel decoder: this simply converts the underlying
+ * ChannelBuffers (which have been deframed) into byte arrays.
+ */
+
import org.jboss.netty.channel.{ChannelHandlerContext, Channel}
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
class ThriftChannelBufferDecoder extends OneToOneDecoder {
- def decode(ctx: ChannelHandlerContext, ch: Channel, message: Object) = {
+ def decode(ctx: ChannelHandlerContext, ch: Channel, message: Object) =
message match {
case buffer: ChannelBuffer => buffer.array() // is this kosher?
case _ => throw new IllegalArgumentException("no byte buffer")
}
- }
}
@@ -18,83 +18,124 @@ import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.channel.ChannelService
import com.twitter.finagle.tracing.TraceContext
-class ThriftClientChannelBufferEncoder extends SimpleChannelDownstreamHandler {
- override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
- e.getMessage match {
- case ThriftClientRequest(message, oneway) =>
- Channels.write(ctx, e.getFuture, ChannelBuffers.wrappedBuffer(message))
- if (oneway) {
- // oneway RPCs are satisfied when the write is complete.
- e.getFuture() {
- case Ok(_) =>
- Channels.fireMessageReceived(ctx, ChannelBuffers.EMPTY_BUFFER)
- case Error(e) =>
- Channels.fireExceptionCaught(ctx, e)
- case Cancelled =>
- Channels.fireExceptionCaught(ctx, new CancelledRequestException)
- }
- }
-
- case _ =>
- throw new IllegalArgumentException("no byte array")
- }
- }
-}
-
+/**
+ * ThriftClientFramedCodec implements a framed thrift transport that
+ * supports upgrading in order to provide TraceContexts across
+ * requests.
+ */
object ThriftClientFramedCodec {
def apply() = new ThriftClientFramedCodec
}
-class ThriftClientFramedCodec extends Codec[ThriftClientRequest, Array[Byte]] {
+class ThriftClientFramedCodec extends Codec[ThriftClientRequest, Array[Byte]]
+{
val clientPipelineFactory =
new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
pipeline.addLast("thriftFrameCodec", new ThriftFrameCodec)
- pipeline.addLast("byteEncoder", new ThriftClientChannelBufferEncoder)
- pipeline.addLast("byteDecoder", new ThriftChannelBufferDecoder)
+ pipeline.addLast("byteEncoder", new ThriftClientChannelBufferEncoder)
+ pipeline.addLast("byteDecoder", new ThriftChannelBufferDecoder)
pipeline
}
}
val serverPipelineFactory = clientPipelineFactory
- override def prepareClientChannel(
- underlying: Service[ThriftClientRequest, Array[Byte]]) =
+ override def prepareClientChannel(underlying: Service[ThriftClientRequest, Array[Byte]]) =
{
// Attempt to upgrade the protocol the first time around by
// sending a magic method invocation.
- val memoryBuffer = new TMemoryBuffer(512)
- val protocolFactory = new TBinaryProtocol.Factory()
- val oprot = protocolFactory.getProtocol(memoryBuffer)
- oprot.writeMessageBegin(
+ val buffer = new OutputBuffer()
+ buffer().writeMessageBegin(
new TMessage(ThriftTracing.CanTraceMethodName, TMessageType.CALL, 0))
- val args = new CanTwitterTrace.can_twitter_trace_args()
- args.write(oprot)
- oprot.writeMessageEnd()
- oprot.getTransport().flush()
-
- val message = java.util.Arrays.copyOfRange(
- memoryBuffer.getArray(), 0, memoryBuffer.length())
-
- underlying(ThriftClientRequest(message, false)) map { reply =>
- val memoryTransport = new TMemoryInputTransport(reply)
- val iprot = protocolFactory.getProtocol(memoryTransport)
- val msg = iprot.readMessageBegin()
- if (msg.`type` == TMessageType.EXCEPTION)
+ val options = new TraceOptions
+ options.write(buffer())
+ buffer().writeMessageEnd()
+
+ underlying(ThriftClientRequest(buffer.toArray, false)) map { bytes =>
+ val protocolFactory = new TBinaryProtocol.Factory()
+ val memoryTransport = new TMemoryInputTransport(bytes)
+ val iprot = protocolFactory.getProtocol(memoryTransport)
+ val reply = iprot.readMessageBegin()
+
+ if (reply.`type` == TMessageType.EXCEPTION) {
+ // Return just the underlying service if we caused an
+ // exception: this means the remote end didn't support
+ // tracing.
underlying
- else
+ } else {
+ // Otherwise, apply our tracing filter first. This will read
+ // the TraceData frames, and apply them to the current
+ // TraceContext.
(new ThriftClientTracingFilter) andThen underlying
+ }
}
}
}
+/**
+ * ThriftClientChannelBufferEncoder translates ThriftClientRequests to
+ * bytes on the wire. It satisfies the request immediately if it is a
+ * "oneway" request.
+ */
+class ThriftClientChannelBufferEncoder
+ extends SimpleChannelDownstreamHandler
+{
+ override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
+ e.getMessage match {
+ case ThriftClientRequest(message, oneway) =>
+ Channels.write(ctx, e.getFuture, ChannelBuffers.wrappedBuffer(message))
+ if (oneway) {
+ // oneway RPCs are satisfied when the write is complete.
+ e.getFuture() {
+ case Ok(_) =>
+ Channels.fireMessageReceived(ctx, ChannelBuffers.EMPTY_BUFFER)
+ case Error(e) =>
+ Channels.fireExceptionCaught(ctx, e)
+ case Cancelled =>
+ Channels.fireExceptionCaught(ctx, new CancelledRequestException)
+ }
+ }
+
+ case _ =>
+ throw new IllegalArgumentException("No ThriftClientRequest on the wire")
+ }
+ }
+}
+
+/**
+ * ThriftClientTracingFilter implements TraceContext support for
+ * thrift. This is applied *after* the Channel has been upgraded (via
+ * negotiation). It serializes the current TraceContext into a header
+ * on the wire. It is applied after all framing.
+ */
+
class ThriftClientTracingFilter extends SimpleFilter[ThriftClientRequest, Array[Byte]]
{
def apply(request: ThriftClientRequest,
- service: Service[ThriftClientRequest, Array[Byte]]) = {
- val message = TracingHeader.encode(TraceContext().transactionID, request.message)
- val tracedRequest = ThriftClientRequest(message, request.oneway)
- service(tracedRequest)
+ service: Service[ThriftClientRequest, Array[Byte]]) =
+ {
+ val header = new TracedRequest
+ header.setParent_span_id(TraceContext().spanID)
+
+ val tracedRequest = request.copy(
+ message = OutputBuffer.messageToArray(header) ++ request.message)
+
+ val reply = service(tracedRequest)
+ if (tracedRequest.oneway) {
+ // Oneway requests don't contain replies, and so they can't be
+ // traced.
+ reply
+ } else {
+ reply map { response =>
+ val header = new TracedResponse
+ val rest = InputBuffer.peelMessage(response, header)
+
+ // TODO: merge.
+
+ rest
+ }
+ }
}
}
@@ -40,42 +40,45 @@ class ThriftServerTracingFilter
{
// Concurrency is not an issue here since we have an instance per
// channel, and receive only one request at a time (thrift does no
- // pipelining). We don't protect against this in the underlying
- // codec, however.
+ // pipelining). Furthermore, finagle will guarantee this by
+ // serializing requests.
private[this] var isUpgraded = false
private[this] val protocolFactory = new TBinaryProtocol.Factory()
def apply(request: Array[Byte], service: Service[Array[Byte], Array[Byte]]) = {
// What to do on exceptions here?
if (isUpgraded) {
- val (body, txid) = TracingHeader.decode(request)
-
- val memoryTransport = new TMemoryInputTransport(body)
- val iprot = protocolFactory.getProtocol(memoryTransport)
- val msg = iprot.readMessageBegin()
-
- TraceContext().transactionID = txid
- service(body)
+ val header = new TracedRequest
+ val request_ = InputBuffer.peelMessage(request, header)
+
+ // TODO: Check isset?
+ TraceContext().parentSpanID = Some(header.getParent_span_id)
+
+ // has a problem with dups-- but we filter them out.
+
+ service(request_) map { response =>
+ // Wrap some trace data.
+ val header = new TracedResponse
+ val headerBytes = OutputBuffer.messageToArray(header)
+ headerBytes ++ response
+ }
} else {
- val memoryTransport = new TMemoryInputTransport(request)
- val iprot = protocolFactory.getProtocol(memoryTransport)
- val msg = iprot.readMessageBegin()
+ val buffer = new InputBuffer(request)
+ val msg = buffer().readMessageBegin()
- // Only try once?
- if (msg.`type` == TMessageType.CALL && msg.name == ThriftTracing.CanTraceMethodName) {
+ // TODO: only try once?
+ if (msg.`type` == TMessageType.CALL &&
+ msg.name == ThriftTracing.CanTraceMethodName) {
// upgrade & reply.
isUpgraded = true
- val memoryBuffer = new TMemoryBuffer(512)
- val protocolFactory = new TBinaryProtocol.Factory()
- val oprot = protocolFactory.getProtocol(memoryBuffer)
- oprot.writeMessageBegin(
+ val buffer = new OutputBuffer
+ buffer().writeMessageBegin(
new TMessage(ThriftTracing.CanTraceMethodName, TMessageType.REPLY, msg.seqid))
- oprot.writeMessageEnd()
+ buffer().writeMessageEnd()
- Future.value(
- java.util.Arrays.copyOfRange(
- memoryBuffer.getArray(), 0, memoryBuffer.length()))
+ // TODO: parse out options?
+ Future.value(buffer.toArray)
} else {
service(request)
}
Oops, something went wrong.

0 comments on commit 1f197a7

Please sign in to comment.