Skip to content
This repository

collector: add kafka store #174

Closed
wants to merge 4 commits into from

3 participants

Franklin Hu Chris Aniszczyk Johan Oskarsson
Franklin Hu
Collaborator

Add module that allows users to write their spans from the collector to a Kafka queue

Chris Aniszczyk
Owner

Looks like Kafka is being moved to the ASF incubator which is good:
https://github.com/apache/kafka

What sucks is they have no releases yet there.

We should probably start adding a NOTICE (e.g., https://github.com/twitter/scalding/blob/develop/NOTICE) file to track these third party dependencies...

Chris Aniszczyk
Owner

Apparently there is a 0.7.1 release... not available on any maven repo that I can find...

http://incubator.apache.org/kafka/downloads.html

Franklin Hu
Collaborator

They didn't seem to have an official Scala 2.9.* build, and the 2.8 jar is ABI incompatible with 2.9. I just used one that @jasonjckn published on clojars. I'll add the NOTICE file.

Johan Oskarsson
Collaborator

+1. Perhaps add a readme describing the module but also mentioning that it is experimental as per your email.

Franklin Hu franklinhu closed this pull request from a commit October 11, 2012
Franklin Hu collector: add kafka store
Add module that allows users to write their spans from the collector to
a Kafka queue

Author: @franklinhu
Fixes #174
URL: #174
5bae51e
Franklin Hu franklinhu closed this in 5bae51e October 11, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
21  NOTICE
... ...
@@ -0,0 +1,21 @@
  1
+Zipkin is a distributed tracing system.
  2
+Copyright 2012 Twitter, Inc.
  3
+
  4
+Zipkin has a number of optional third party dependencies:
  5
+
  6
+Cassandra
  7
+Apache Public License 2.0
  8
+http://cassandra.apache.org/
  9
+
  10
+Kafka 0.7.0
  11
+Apache Public License 2.0
  12
+http://incubator.apache.org/kafka/
  13
+
  14
+Scribe
  15
+Apache Public License 2.0
  16
+https://github.com/facebook/scribe
  17
+
  18
+ZooKeeper
  19
+Apache Public License 2.0
  20
+http://zookeeper.apache.org/
  21
+
25  project/Project.scala
@@ -28,8 +28,7 @@ object Zipkin extends Build {
28 28
     Project(
29 29
       id = "zipkin",
30 30
       base = file(".")
31  
-    ) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService)
32  
-  
  31
+    ) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService, kafka)
33 32
 
34 33
   lazy val hadoop = Project(
35 34
     id = "zipkin-hadoop",
@@ -299,6 +298,26 @@ object Zipkin extends Build {
299 298
       libraryDependencies ++= testDependencies
300 299
     ).dependsOn(collectorCore, scrooge)
301 300
 
  301
+  lazy val kafka =
  302
+    Project(
  303
+      id = "zipkin-kafka",
  304
+      base = file("zipkin-kafka"),
  305
+      settings = Project.defaultSettings ++
  306
+        StandardProject.newSettings ++
  307
+        SubversionPublisher.newSettings ++
  308
+        TravisCiRepos.newSettings
  309
+    ).settings(
  310
+      version := "0.3.0-SNAPSHOT",
  311
+      libraryDependencies ++= Seq(
  312
+        "org.clojars.jasonjckn"      % "kafka_2.9.1"    % "0.7.0"
  313
+      ) ++ testDependencies,
  314
+      resolvers ++= (proxyRepo match {
  315
+        case None => Seq(
  316
+          "clojars" at "http://clojars.org/repo")
  317
+        case Some(pr) => Seq() // if proxy is set we assume that it has the artifacts we would get from the above repo
  318
+      })
  319
+    ).dependsOn(collectorCore, scrooge)
  320
+
302 321
   lazy val collectorService = Project(
303 322
     id = "zipkin-collector-service",
304 323
     base = file("zipkin-collector-service"),
@@ -318,7 +337,7 @@ object Zipkin extends Build {
318 337
       base =>
319 338
         (base / "config" +++ base / "src" / "test" / "resources").get
320 339
     }
321  
-  ).dependsOn(collectorCore, collectorScribe, cassandra)
  340
+  ).dependsOn(collectorCore, collectorScribe, cassandra, kafka)
322 341
 
323 342
   lazy val web =
324 343
     Project(
7  zipkin-kafka/README.md
Source Rendered
... ...
@@ -0,0 +1,7 @@
  1
+# zipkin-kafka
  2
+
  3
+WARNING: Experimental
  4
+
  5
+This module adds a collector side store that writes to a Kafka cluster
  6
+based on ZooKeeper discovery.
  7
+
62  zipkin-kafka/src/main/scala/com/twitter/zipkin/collector/Kafka.scala
... ...
@@ -0,0 +1,62 @@
  1
+/*
  2
+ * Copyright 2012 Twitter Inc.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *      http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ *  Unless required by applicable law or agreed to in writing, software
  11
+ *  distributed under the License is distributed on an "AS IS" BASIS,
  12
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ *  See the License for the specific language governing permissions and
  14
+ *  limitations under the License.
  15
+ *
  16
+ */
  17
+package com.twitter.zipkin.collector
  18
+
  19
+import com.twitter.finagle.stats.StatsReceiver
  20
+import com.twitter.finagle.Service
  21
+import com.twitter.logging.Logger
  22
+import com.twitter.util.Future
  23
+import com.twitter.scrooge.BinaryThriftStructSerializer
  24
+import com.twitter.zipkin.common.Span
  25
+import com.twitter.zipkin.conversions.thrift._
  26
+import com.twitter.zipkin.gen
  27
+import kafka.message.Message
  28
+import kafka.producer.{ProducerData, Producer}
  29
+import kafka.serializer.Encoder
  30
+
  31
+class Kafka(
  32
+  kafka: Producer[String, gen.Span],
  33
+  topic: String,
  34
+  statsReceiver: StatsReceiver
  35
+) extends Service[Span, Unit] {
  36
+
  37
+  private[this] val log = Logger.get()
  38
+
  39
+  def apply(req: Span): Future[Unit] = {
  40
+    statsReceiver.counter("try").incr()
  41
+    val producerData = new ProducerData[String, gen.Span](topic, Seq(req.toThrift))
  42
+    Future {
  43
+      kafka.send(producerData)
  44
+    } onSuccess { (_) =>
  45
+      statsReceiver.counter("success").incr()
  46
+    }
  47
+  }
  48
+
  49
+  override def release() {
  50
+    kafka.close()
  51
+  }
  52
+}
  53
+
  54
+class SpanEncoder extends Encoder[gen.Span] {
  55
+  val serializer = new BinaryThriftStructSerializer[gen.Span] {
  56
+    def codec = gen.Span
  57
+  }
  58
+
  59
+  def toMessage(span: gen.Span): Message = {
  60
+    new Message(serializer.toBytes(span))
  61
+  }
  62
+}
40  zipkin-kafka/src/main/scala/com/twitter/zipkin/config/KafkaConfig.scala
... ...
@@ -0,0 +1,40 @@
  1
+/*
  2
+ * Copyright 2012 Twitter Inc.
  3
+ *
  4
+ * Licensed under the Apache License, Version 2.0 (the "License");
  5
+ * you may not use this file except in compliance with the License.
  6
+ * You may obtain a copy of the License at
  7
+ *
  8
+ *      http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ *  Unless required by applicable law or agreed to in writing, software
  11
+ *  distributed under the License is distributed on an "AS IS" BASIS,
  12
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13
+ *  See the License for the specific language governing permissions and
  14
+ *  limitations under the License.
  15
+ *
  16
+ */
  17
+package com.twitter.zipkin.config
  18
+
  19
+import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
  20
+import com.twitter.util.Config
  21
+import com.twitter.zipkin.collector.Kafka
  22
+import com.twitter.zipkin.gen
  23
+import java.util.Properties
  24
+import kafka.producer.Producer
  25
+import kafka.producer.ProducerConfig
  26
+
  27
+trait KafkaConfig extends Config[Kafka] {
  28
+  var zkConnectString: String = "localhost:2181"
  29
+  var topic: String = "zipkin"
  30
+  var statsReceiver: StatsReceiver = NullStatsReceiver
  31
+
  32
+  def apply(): Kafka = {
  33
+    val properties = new Properties
  34
+    properties.put("zk.connect", zkConnectString)
  35
+    properties.put("serializer.class", "com.twitter.zipkin.collector.SpanEncoder")
  36
+    properties.put("producer.type", "sync")
  37
+    val producer = new Producer[String, gen.Span](new ProducerConfig(properties))
  38
+    new Kafka(producer, topic, statsReceiver.scope("kafka"))
  39
+  }
  40
+}
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.