Navigation Menu

Skip to content

Commit

Permalink
collector: add kafka store
Browse files Browse the repository at this point in the history
Add module that allows users to write their spans from the collector to
a Kafka queue

Author: @franklinhu
Fixes #174
URL: #174
  • Loading branch information
Franklin Hu committed Oct 11, 2012
1 parent 65f7b15 commit 5bae51e
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 3 deletions.
21 changes: 21 additions & 0 deletions NOTICE
@@ -0,0 +1,21 @@
Zipkin is a distributed tracing system.
Copyright 2012 Twitter, Inc.

Zipkin has a number of optional third party dependencies:

Cassandra
Apache Public License 2.0
http://cassandra.apache.org/

Kafka 0.7.0
Apache Public License 2.0
http://incubator.apache.org/kafka/

Scribe
Apache Public License 2.0
https://github.com/facebook/scribe

ZooKeeper
Apache Public License 2.0
http://zookeeper.apache.org/

25 changes: 22 additions & 3 deletions project/Project.scala
Expand Up @@ -28,8 +28,7 @@ object Zipkin extends Build {
Project(
id = "zipkin",
base = file(".")
) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService)

) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService, kafka)

lazy val hadoop = Project(
id = "zipkin-hadoop",
Expand Down Expand Up @@ -299,6 +298,26 @@ object Zipkin extends Build {
libraryDependencies ++= testDependencies
).dependsOn(collectorCore, scrooge)

lazy val kafka =
Project(
id = "zipkin-kafka",
base = file("zipkin-kafka"),
settings = Project.defaultSettings ++
StandardProject.newSettings ++
SubversionPublisher.newSettings ++
TravisCiRepos.newSettings
).settings(
version := "0.3.0-SNAPSHOT",
libraryDependencies ++= Seq(
"org.clojars.jasonjckn" % "kafka_2.9.1" % "0.7.0"
) ++ testDependencies,
resolvers ++= (proxyRepo match {
case None => Seq(
"clojars" at "http://clojars.org/repo")
case Some(pr) => Seq() // if proxy is set we assume that it has the artifacts we would get from the above repo
})
).dependsOn(collectorCore, scrooge)

lazy val collectorService = Project(
id = "zipkin-collector-service",
base = file("zipkin-collector-service"),
Expand All @@ -318,7 +337,7 @@ object Zipkin extends Build {
base =>
(base / "config" +++ base / "src" / "test" / "resources").get
}
).dependsOn(collectorCore, collectorScribe, cassandra)
).dependsOn(collectorCore, collectorScribe, cassandra, kafka)

lazy val web =
Project(
Expand Down
7 changes: 7 additions & 0 deletions zipkin-kafka/README.md
@@ -0,0 +1,7 @@
# zipkin-kafka

WARNING: Experimental

This module adds a collector side store that writes to a Kafka cluster
based on ZooKeeper discovery.

@@ -0,0 +1,62 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.twitter.zipkin.collector

import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.Service
import com.twitter.logging.Logger
import com.twitter.util.Future
import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.zipkin.common.Span
import com.twitter.zipkin.conversions.thrift._
import com.twitter.zipkin.gen
import kafka.message.Message
import kafka.producer.{ProducerData, Producer}
import kafka.serializer.Encoder

class Kafka(
kafka: Producer[String, gen.Span],
topic: String,
statsReceiver: StatsReceiver
) extends Service[Span, Unit] {

private[this] val log = Logger.get()

def apply(req: Span): Future[Unit] = {
statsReceiver.counter("try").incr()
val producerData = new ProducerData[String, gen.Span](topic, Seq(req.toThrift))
Future {
kafka.send(producerData)
} onSuccess { (_) =>
statsReceiver.counter("success").incr()
}
}

override def release() {
kafka.close()
}
}

class SpanEncoder extends Encoder[gen.Span] {
val serializer = new BinaryThriftStructSerializer[gen.Span] {
def codec = gen.Span
}

def toMessage(span: gen.Span): Message = {
new Message(serializer.toBytes(span))
}
}
@@ -0,0 +1,40 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.twitter.zipkin.config

import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
import com.twitter.util.Config
import com.twitter.zipkin.collector.Kafka
import com.twitter.zipkin.gen
import java.util.Properties
import kafka.producer.Producer
import kafka.producer.ProducerConfig

trait KafkaConfig extends Config[Kafka] {
var zkConnectString: String = "localhost:2181"
var topic: String = "zipkin"
var statsReceiver: StatsReceiver = NullStatsReceiver

def apply(): Kafka = {
val properties = new Properties
properties.put("zk.connect", zkConnectString)
properties.put("serializer.class", "com.twitter.zipkin.collector.SpanEncoder")
properties.put("producer.type", "sync")
val producer = new Producer[String, gen.Span](new ProducerConfig(properties))
new Kafka(producer, topic, statsReceiver.scope("kafka"))
}
}

0 comments on commit 5bae51e

Please sign in to comment.