-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract transport agnostic zipkin code into finagle-zipkin-core #513
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
target(name='finagle-zipkin-core', | ||
dependencies=[ | ||
'finagle/finagle-zipkin-core/src/main/scala' | ||
] | ||
) | ||
|
||
target(name='tests', | ||
dependencies=[ | ||
'finagle/finagle-zipkin-core/src/test/scala' | ||
] | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
scala_library(name='scala', | ||
provides = scala_artifact( | ||
org = 'com.twitter', | ||
name = 'finagle-zipkin-core', | ||
repo = artifactory, | ||
), | ||
dependencies=[ | ||
'3rdparty/jvm/com/fasterxml/jackson/core:jackson-core', | ||
'3rdparty/jvm/com/fasterxml/jackson/core:jackson-databind', | ||
'3rdparty/jvm/com/fasterxml/jackson/module:jackson-module-scala', | ||
'3rdparty/jvm/org/apache/thrift:thrift-0.5.0', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one option is to drop this dep and instead use io.zipkin.java:zipkin:1.0.0 as that includes an implicit codec. In other words change to use the canonical model library. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very good point. Although I'm going to try keeping the scope of these changes as small as possible. Let's look into it when the rest is done. |
||
'finagle/finagle-core', | ||
'finagle/finagle-thrift', | ||
'finagle/finagle-zipkin/src/main/thrift:thrift-scala', | ||
'util/util-codec', | ||
'util/util-core', | ||
'util/util-events', | ||
], | ||
sources=rglobs('*.scala'), | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
package com.twitter.finagle.zipkin.thrift | ||
package com.twitter.finagle.zipkin.core | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moving the package is a breaking change. i think its probably worth it, but please note these in the "Breaking" section of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the breaking section I listed the public classes (and companion objects) affected.:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indeed you did! apologies for the noise. |
||
|
||
import com.twitter.finagle.thrift.thrift | ||
import com.twitter.util.Time | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
package com.twitter.finagle.zipkin.core | ||
|
||
import com.twitter.conversions.time._ | ||
import com.twitter.finagle.stats.StatsReceiver | ||
import com.twitter.finagle.thrift.thrift | ||
import com.twitter.finagle.tracing | ||
import com.twitter.finagle.tracing._ | ||
import com.twitter.finagle.util.DefaultTimer | ||
import com.twitter.util._ | ||
import java.net.InetSocketAddress | ||
import java.nio.ByteBuffer | ||
import scala.language.reflectiveCalls | ||
|
||
/** | ||
* Receives the Finagle generated traces and sends them off to Zipkin | ||
* @param statsReceiver We generate stats to keep track of traces sent, failures and so on | ||
* @param timer A Timer used for timing out spans in the [[DeadlineSpanMap]] | ||
*/ | ||
abstract private[zipkin] class RawZipkinTracer( | ||
statsReceiver: StatsReceiver, | ||
timer: Timer = DefaultTimer.twitter) | ||
extends Tracer | ||
{ | ||
private[this] val ErrorAnnotation = "%s: %s" // annotation: errorMessage | ||
|
||
// this sends off spans after the deadline is hit, no matter if it ended naturally or not. | ||
private[this] val spanMap: DeadlineSpanMap = | ||
new DeadlineSpanMap(sendSpans, 120.seconds, statsReceiver, timer) | ||
|
||
protected[core] def flush(): Future[Unit] = spanMap.flush() | ||
|
||
/** | ||
* Always sample the request. | ||
*/ | ||
def sampleTrace(traceId: TraceId): Option[Boolean] = Some(true) | ||
|
||
/** | ||
* Mutate the Span with whatever new info we have. | ||
* If we see an "end" annotation we remove the span and send it off. | ||
*/ | ||
protected def mutate(traceId: TraceId)(f: MutableSpan => Unit): Unit = { | ||
spanMap.update(traceId)(f) | ||
} | ||
|
||
private[this] val TrueBB: ByteBuffer = ByteBuffer.wrap(Array[Byte](1)) | ||
private[this] val FalseBB: ByteBuffer = ByteBuffer.wrap(Array[Byte](0)) | ||
|
||
def record(record: Record): Unit = { | ||
record.annotation match { | ||
case tracing.Annotation.WireSend => | ||
annotate(record, thrift.Constants.WIRE_SEND) | ||
case tracing.Annotation.WireRecv => | ||
annotate(record, thrift.Constants.WIRE_RECV) | ||
case tracing.Annotation.WireRecvError(error: String) => | ||
annotate(record, ErrorAnnotation.format(thrift.Constants.WIRE_RECV_ERROR, error)) | ||
case tracing.Annotation.ClientSend() => | ||
annotate(record, thrift.Constants.CLIENT_SEND) | ||
case tracing.Annotation.ClientRecv() => | ||
annotate(record, thrift.Constants.CLIENT_RECV) | ||
case tracing.Annotation.ClientRecvError(error: String) => | ||
annotate(record, ErrorAnnotation.format(thrift.Constants.CLIENT_RECV_ERROR, error)) | ||
case tracing.Annotation.ServerSend() => | ||
annotate(record, thrift.Constants.SERVER_SEND) | ||
case tracing.Annotation.ServerRecv() => | ||
annotate(record, thrift.Constants.SERVER_RECV) | ||
case tracing.Annotation.ServerSendError(error: String) => | ||
annotate(record, ErrorAnnotation.format(thrift.Constants.SERVER_SEND_ERROR, error)) | ||
case tracing.Annotation.ClientSendFragment() => | ||
annotate(record, thrift.Constants.CLIENT_SEND_FRAGMENT) | ||
case tracing.Annotation.ClientRecvFragment() => | ||
annotate(record, thrift.Constants.CLIENT_RECV_FRAGMENT) | ||
case tracing.Annotation.ServerSendFragment() => | ||
annotate(record, thrift.Constants.SERVER_SEND_FRAGMENT) | ||
case tracing.Annotation.ServerRecvFragment() => | ||
annotate(record, thrift.Constants.SERVER_RECV_FRAGMENT) | ||
case tracing.Annotation.Message(value) => | ||
annotate(record, value) | ||
case tracing.Annotation.Rpc(name: String) => | ||
spanMap.update(record.traceId)(_.setName(name)) | ||
case tracing.Annotation.ServiceName(serviceName: String) => | ||
spanMap.update(record.traceId)(_.setServiceName(serviceName)) | ||
case tracing.Annotation.Rpcname(service: String, rpc: String) => | ||
spanMap.update(record.traceId)(_.setServiceName(service).setName(rpc)) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: Boolean) => | ||
binaryAnnotation(record, key, (if (value) TrueBB else FalseBB).duplicate(), thrift.AnnotationType.BOOL) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: Array[Byte]) => | ||
binaryAnnotation(record, key, ByteBuffer.wrap(value), thrift.AnnotationType.BYTES) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: ByteBuffer) => | ||
binaryAnnotation(record, key, value, thrift.AnnotationType.BYTES) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: Short) => | ||
binaryAnnotation(record, key, ByteBuffer.allocate(2).putShort(0, value), thrift.AnnotationType.I16) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: Int) => | ||
binaryAnnotation(record, key, ByteBuffer.allocate(4).putInt(0, value), thrift.AnnotationType.I32) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: Long) => | ||
binaryAnnotation(record, key, ByteBuffer.allocate(8).putLong(0, value), thrift.AnnotationType.I64) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: Double) => | ||
binaryAnnotation(record, key, ByteBuffer.allocate(8).putDouble(0, value), thrift.AnnotationType.DOUBLE) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value: String) => | ||
binaryAnnotation(record, key, ByteBuffer.wrap(value.getBytes), thrift.AnnotationType.STRING) | ||
case tracing.Annotation.BinaryAnnotation(key: String, value) => // Throw error? | ||
case tracing.Annotation.LocalAddr(ia: InetSocketAddress) => | ||
setEndpoint(record, ia) | ||
case tracing.Annotation.ClientAddr(ia: InetSocketAddress) => | ||
// use a binary annotation over a regular annotation to avoid a misleading timestamp | ||
spanMap.update(record.traceId) { _.addBinaryAnnotation(BinaryAnnotation( | ||
thrift.Constants.CLIENT_ADDR, | ||
TrueBB.duplicate(), | ||
thrift.AnnotationType.BOOL, | ||
Endpoint.fromSocketAddress(ia))) | ||
} | ||
case tracing.Annotation.ServerAddr(ia: InetSocketAddress) => | ||
spanMap.update(record.traceId) { _.addBinaryAnnotation(BinaryAnnotation( | ||
thrift.Constants.SERVER_ADDR, | ||
TrueBB.duplicate(), | ||
thrift.AnnotationType.BOOL, | ||
Endpoint.fromSocketAddress(ia))) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Sets the endpoint in the span for any future annotations. Also | ||
* sets the endpoint in any previous annotations that lack one. | ||
*/ | ||
protected def setEndpoint(record: Record, ia: InetSocketAddress): Unit = { | ||
spanMap.update(record.traceId)(_.setEndpoint(Endpoint.fromSocketAddress(ia).boundEndpoint)) | ||
} | ||
|
||
protected def binaryAnnotation( | ||
record: Record, | ||
key: String, | ||
value: ByteBuffer, | ||
annotationType: thrift.AnnotationType | ||
): Unit = { | ||
spanMap.update(record.traceId) { span => | ||
span.addBinaryAnnotation(BinaryAnnotation(key, value, annotationType, span.endpoint)) | ||
} | ||
} | ||
|
||
/** | ||
* Add this record as a time based annotation. | ||
*/ | ||
protected def annotate(record: Record, value: String): Unit = { | ||
spanMap.update(record.traceId) { span => | ||
span.addAnnotation(ZipkinAnnotation(record.timestamp, value, span.endpoint)) | ||
} | ||
} | ||
|
||
def sendSpans(spans: Seq[Span]): Future[Unit] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,6 @@ | ||
package com.twitter.finagle.zipkin.thrift | ||
package com.twitter.finagle.zipkin.core | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. weird this didn't get caught as a rename.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it did in my local git diff There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw, I fixed up the commits so this appears as a rename |
||
|
||
import com.twitter.finagle.stats.{DefaultStatsReceiver, NullStatsReceiver, StatsReceiver} | ||
import com.twitter.finagle.tracing.{TraceId, Record, Tracer, Annotation, Trace} | ||
import com.twitter.finagle.zipkin.{host => Host, initialSampleRate => sampleRateFlag} | ||
import com.twitter.io.Buf | ||
import com.twitter.util.events.{Event, Sink} | ||
import com.twitter.util.{Time, Throw, Try} | ||
|
@@ -69,9 +67,7 @@ private object Json { | |
} | ||
} | ||
|
||
object ZipkinTracer { | ||
|
||
lazy val default: Tracer = mk() | ||
object SamplingTracer { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto re breaking change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and likewise for the other public classes in this file There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I replaced Now I realize I can remove Actions speak louder than words so maybe this commit explains this better than I do: 262614b |
||
|
||
/** | ||
* The [[com.twitter.util.events.Event.Type Event.Type]] for trace events. | ||
|
@@ -110,50 +106,6 @@ object ZipkinTracer { | |
} | ||
} | ||
|
||
/** | ||
* @param scribeHost Host to send trace data to | ||
* @param scribePort Port to send trace data to | ||
* @param statsReceiver Where to log information about tracing success/failures | ||
* @param sampleRate How much data to collect. Default sample rate 0.1%. Max is 1, min 0. | ||
*/ | ||
@deprecated("Use mk() instead", "6.1.0") | ||
def apply( | ||
scribeHost: String = Host().getHostName, | ||
scribePort: Int = Host().getPort, | ||
statsReceiver: StatsReceiver = NullStatsReceiver, | ||
sampleRate: Float = Sampler.DefaultSampleRate | ||
): Tracer.Factory = () => mk(scribeHost, scribePort, statsReceiver, sampleRate) | ||
|
||
/** | ||
* @param host Host to send trace data to | ||
* @param port Port to send trace data to | ||
* @param statsReceiver Where to log information about tracing success/failures | ||
* @param sampleRate How much data to collect. Default sample rate 0.1%. Max is 1, min 0. | ||
*/ | ||
def mk( | ||
host: String = Host().getHostName, | ||
port: Int = Host().getPort, | ||
statsReceiver: StatsReceiver = NullStatsReceiver, | ||
sampleRate: Float = Sampler.DefaultSampleRate | ||
): Tracer = | ||
new ZipkinTracer( | ||
RawZipkinTracer(host, port, statsReceiver), | ||
sampleRate) | ||
|
||
/** | ||
* Util method since named parameters can't be called from Java | ||
* @param sr stats receiver to send successes/failures to | ||
*/ | ||
@deprecated("Use mk() instead", "6.1.0") | ||
def apply(sr: StatsReceiver): Tracer.Factory = () => | ||
mk(Host().getHostName, Host().getPort, sr, Sampler.DefaultSampleRate) | ||
|
||
/** | ||
* Util method since named parameters can't be called from Java | ||
* @param statsReceiver stats receiver to send successes/failures to | ||
*/ | ||
def mk(statsReceiver: StatsReceiver): Tracer = | ||
mk(Host().getHostName, Host().getPort, statsReceiver, Sampler.DefaultSampleRate) | ||
} | ||
|
||
/** | ||
|
@@ -179,13 +131,6 @@ class SamplingTracer( | |
def this(underlyingTracer: Tracer, initialSampleRate: Float) = | ||
this(underlyingTracer, initialSampleRate, Sink.default) | ||
|
||
/** | ||
* Tracer that supports sampling. Will pass through a subset of the records. | ||
*/ | ||
def this() = this( | ||
RawZipkinTracer(Host().getHostName, Host().getPort, DefaultStatsReceiver.scope("zipkin")), | ||
sampleRateFlag()) | ||
|
||
private[this] val sampler = new Sampler | ||
setSampleRate(initialSampleRate) | ||
|
||
|
@@ -200,15 +145,13 @@ class SamplingTracer( | |
if (sink.recording) { | ||
if (Trace.hasId) { | ||
val traceId = Trace.id | ||
sink.event(ZipkinTracer.Trace, objectVal = record.annotation, | ||
sink.event(SamplingTracer.Trace, objectVal = record.annotation, | ||
traceIdVal = traceId.traceId.self, spanIdVal = traceId.spanId.self) | ||
} else { | ||
sink.event(ZipkinTracer.Trace, objectVal = record.annotation) | ||
sink.event(SamplingTracer.Trace, objectVal = record.annotation) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
class ZipkinTracer(tracer: RawZipkinTracer, initialRate: Float) | ||
extends SamplingTracer(tracer, initialRate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for doing all the pants build files! above and beyond what's exepcted! 💯