From 5bae51e2be2a27d55669faa15059dcab42cac0b9 Mon Sep 17 00:00:00 2001 From: Franklin Hu Date: Thu, 11 Oct 2012 14:35:09 -0700 Subject: [PATCH] collector: add kafka store Add module that allows users to write their spans from the collector to a Kafka queue Author: @franklinhu Fixes #174 URL: https://github.com/twitter/zipkin/pull/174 --- NOTICE | 21 +++++++ project/Project.scala | 25 +++++++- zipkin-kafka/README.md | 7 +++ .../com/twitter/zipkin/collector/Kafka.scala | 62 +++++++++++++++++++ .../twitter/zipkin/config/KafkaConfig.scala | 40 ++++++++++++ 5 files changed, 152 insertions(+), 3 deletions(-) create mode 100644 NOTICE create mode 100644 zipkin-kafka/README.md create mode 100644 zipkin-kafka/src/main/scala/com/twitter/zipkin/collector/Kafka.scala create mode 100644 zipkin-kafka/src/main/scala/com/twitter/zipkin/config/KafkaConfig.scala diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000000..981e7de322 --- /dev/null +++ b/NOTICE @@ -0,0 +1,21 @@ +Zipkin is a distributed tracing system. +Copyright 2012 Twitter, Inc. + +Zipkin has a number of optional third party dependencies: + +Cassandra +Apache Public License 2.0 +http://cassandra.apache.org/ + +Kafka 0.7.0 +Apache Public License 2.0 +http://incubator.apache.org/kafka/ + +Scribe +Apache Public License 2.0 +https://github.com/facebook/scribe + +ZooKeeper +Apache Public License 2.0 +http://zookeeper.apache.org/ + diff --git a/project/Project.scala b/project/Project.scala index 15f1837808..436e82516e 100644 --- a/project/Project.scala +++ b/project/Project.scala @@ -28,8 +28,7 @@ object Zipkin extends Build { Project( id = "zipkin", base = file(".") - ) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService) - + ) aggregate(hadoop, hadoopjobrunner, test, thrift, queryCore, queryService, common, scrooge, collectorScribe, web, cassandra, collectorCore, collectorService, kafka) lazy val hadoop = Project( id = "zipkin-hadoop", @@ -299,6 +298,26 @@ object Zipkin extends Build { libraryDependencies ++= testDependencies ).dependsOn(collectorCore, scrooge) + lazy val kafka = + Project( + id = "zipkin-kafka", + base = file("zipkin-kafka"), + settings = Project.defaultSettings ++ + StandardProject.newSettings ++ + SubversionPublisher.newSettings ++ + TravisCiRepos.newSettings + ).settings( + version := "0.3.0-SNAPSHOT", + libraryDependencies ++= Seq( + "org.clojars.jasonjckn" % "kafka_2.9.1" % "0.7.0" + ) ++ testDependencies, + resolvers ++= (proxyRepo match { + case None => Seq( + "clojars" at "http://clojars.org/repo") + case Some(pr) => Seq() // if proxy is set we assume that it has the artifacts we would get from the above repo + }) + ).dependsOn(collectorCore, scrooge) + lazy val collectorService = Project( id = "zipkin-collector-service", base = file("zipkin-collector-service"), @@ -318,7 +337,7 @@ object Zipkin extends Build { base => (base / "config" +++ base / "src" / "test" / "resources").get } - ).dependsOn(collectorCore, collectorScribe, cassandra) + ).dependsOn(collectorCore, collectorScribe, cassandra, kafka) lazy val web = Project( diff --git a/zipkin-kafka/README.md b/zipkin-kafka/README.md new file mode 100644 index 0000000000..14a599615b --- /dev/null +++ b/zipkin-kafka/README.md @@ -0,0 +1,7 @@ +# zipkin-kafka + +WARNING: Experimental + +This module adds a collector side store that writes to a Kafka cluster +based on ZooKeeper discovery. + diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/collector/Kafka.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/collector/Kafka.scala new file mode 100644 index 0000000000..519e4a4b03 --- /dev/null +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/collector/Kafka.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector + +import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.Service +import com.twitter.logging.Logger +import com.twitter.util.Future +import com.twitter.scrooge.BinaryThriftStructSerializer +import com.twitter.zipkin.common.Span +import com.twitter.zipkin.conversions.thrift._ +import com.twitter.zipkin.gen +import kafka.message.Message +import kafka.producer.{ProducerData, Producer} +import kafka.serializer.Encoder + +class Kafka( + kafka: Producer[String, gen.Span], + topic: String, + statsReceiver: StatsReceiver +) extends Service[Span, Unit] { + + private[this] val log = Logger.get() + + def apply(req: Span): Future[Unit] = { + statsReceiver.counter("try").incr() + val producerData = new ProducerData[String, gen.Span](topic, Seq(req.toThrift)) + Future { + kafka.send(producerData) + } onSuccess { (_) => + statsReceiver.counter("success").incr() + } + } + + override def release() { + kafka.close() + } +} + +class SpanEncoder extends Encoder[gen.Span] { + val serializer = new BinaryThriftStructSerializer[gen.Span] { + def codec = gen.Span + } + + def toMessage(span: gen.Span): Message = { + new Message(serializer.toBytes(span)) + } +} diff --git a/zipkin-kafka/src/main/scala/com/twitter/zipkin/config/KafkaConfig.scala b/zipkin-kafka/src/main/scala/com/twitter/zipkin/config/KafkaConfig.scala new file mode 100644 index 0000000000..8a2ba4a3c6 --- /dev/null +++ b/zipkin-kafka/src/main/scala/com/twitter/zipkin/config/KafkaConfig.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.config + +import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver} +import com.twitter.util.Config +import com.twitter.zipkin.collector.Kafka +import com.twitter.zipkin.gen +import java.util.Properties +import kafka.producer.Producer +import kafka.producer.ProducerConfig + +trait KafkaConfig extends Config[Kafka] { + var zkConnectString: String = "localhost:2181" + var topic: String = "zipkin" + var statsReceiver: StatsReceiver = NullStatsReceiver + + def apply(): Kafka = { + val properties = new Properties + properties.put("zk.connect", zkConnectString) + properties.put("serializer.class", "com.twitter.zipkin.collector.SpanEncoder") + properties.put("producer.type", "sync") + val producer = new Producer[String, gen.Span](new ProducerConfig(properties)) + new Kafka(producer, topic, statsReceiver.scope("kafka")) + } +}