Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add Kafka Finagle Service

  • Loading branch information...
commit 0545fbb357230154128ea8822464d0f44df9dc67 1 parent 2a4515d
Franklin Hu authored
View
25 project/Project.scala
@@ -28,8 +28,7 @@ object Zipkin extends Build {
Project(
id = "zipkin",
base = file(".")
- ) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService)
-
+ ) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService, kafka)
lazy val hadoop = Project(
id = "zipkin-hadoop",
@@ -299,6 +298,26 @@ object Zipkin extends Build {
libraryDependencies ++= testDependencies
).dependsOn(collectorCore, scrooge)
+ lazy val kafka =
+ Project(
+ id = "zipkin-kafka",
+ base = file("zipkin-kafka"),
+ settings = Project.defaultSettings ++
+ StandardProject.newSettings ++
+ SubversionPublisher.newSettings ++
+ TravisCiRepos.newSettings
+ ).settings(
+ version := "0.3.0-SNAPSHOT",
+ libraryDependencies ++= Seq(
+ "org.clojars.jasonjckn" % "kafka_2.9.1" % "0.7.0"
+ ) ++ testDependencies,
+ resolvers ++= (proxyRepo match {
+ case None => Seq(
+ "clojars" at "http://clojars.org/repo")
+ case Some(pr) => Seq() // if proxy is set we assume that it has the artifacts we would get from the above repo
+ })
+ ).dependsOn(collectorCore, scrooge)
+
lazy val collectorService = Project(
id = "zipkin-collector-service",
base = file("zipkin-collector-service"),
@@ -318,7 +337,7 @@ object Zipkin extends Build {
base =>
(base / "config" +++ base / "src" / "test" / "resources").get
}
- ).dependsOn(collectorCore, collectorScribe, cassandra)
+ ).dependsOn(collectorCore, collectorScribe, cassandra, kafka)
lazy val web =
Project(
View
46 zipkin-kafka/src/main/scala/com/twitter/zipkin/collector/Kafka.scala
@@ -0,0 +1,46 @@
+package com.twitter.zipkin.collector
+
+import com.twitter.finagle.stats.StatsReceiver
+import com.twitter.finagle.Service
+import com.twitter.logging.Logger
+import com.twitter.util.Future
+import com.twitter.scrooge.BinaryThriftStructSerializer
+import com.twitter.zipkin.common.Span
+import com.twitter.zipkin.conversions.thrift._
+import com.twitter.zipkin.gen
+import kafka.message.Message
+import kafka.producer.{ProducerData, Producer}
+import kafka.serializer.Encoder
+
+class Kafka(
+ kafka: Producer[String, gen.Span],
+ topic: String,
+ statsReceiver: StatsReceiver
+) extends Service[Span, Unit] {
+
+ private[this] val log = Logger.get()
+
+ def apply(req: Span): Future[Unit] = {
+ statsReceiver.counter("try").incr()
+ val producerData = new ProducerData[String, gen.Span](topic, Seq(req.toThrift))
+ Future {
+ kafka.send(producerData)
+ } onSuccess { (_) =>
+ statsReceiver.counter("success").incr()
+ }
+ }
+
+ override def release() {
+ kafka.close()
+ }
+}
+
+class SpanEncoder extends Encoder[gen.Span] {
+ val serializer = new BinaryThriftStructSerializer[gen.Span] {
+ def codec = gen.Span
+ }
+
+ def toMessage(span: gen.Span): Message = {
+ new Message(serializer.toBytes(span))
+ }
+}
View
24 zipkin-kafka/src/main/scala/com/twitter/zipkin/config/KafkaConfig.scala
@@ -0,0 +1,24 @@
+package com.twitter.zipkin.config
+
+import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
+import com.twitter.util.Config
+import com.twitter.zipkin.collector.Kafka
+import com.twitter.zipkin.gen
+import java.util.Properties
+import kafka.producer.Producer
+import kafka.producer.ProducerConfig
+
+trait KafkaConfig extends Config[Kafka] {
+ var zkConnectString: String = "localhost:2181"
+ var topic: String = "zipkin"
+ var statsReceiver: StatsReceiver = NullStatsReceiver
+
+ def apply(): Kafka = {
+ val properties = new Properties
+ properties.put("zk.connect", zkConnectString)
+ properties.put("serializer.class", "com.twitter.zipkin.collector.SpanEncoder")
+ properties.put("producer.type", "sync")
+ val producer = new Producer[String, gen.Span](new ProducerConfig(properties))
+ new Kafka(producer, topic, statsReceiver.scope("kafka"))
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.