Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Modularize remaining collector Scribe dependencies

This change makes it easier to extend the collector to accept different
input types. Doing so would only require writing an `Adapter` and a
`ProcessFilter`.

Changes:
- Add `ProcessorFilter`
  - Composable filter on top of `Processor`s that can transform the
    input data type
- Move all deserialization and extraction of data from
  `CollectorService` and `WriteQueueWorker` to Scribe related
  `ProcessFilter` and `Processor`.
- `WriteQueueWorker` now takes only one `Processor` rather than a
  sequence of them.
- Add `FanoutProcessor` to blast work to multiple `Processor`s
- Update README with module diagram

Author: @franklinhu
Pull Request: #31
URL: #31
  • Loading branch information...
commit 096d3c43ca82b4ce3c4f8a2d1c928edd89e777cc 1 parent d3880db
Franklin Hu authored
Showing with 486 additions and 137 deletions.
  1. +11 −3 README.md
  2. BIN  doc/modules.png
  3. +1 −1  zipkin-scribe/config/collector-dev.scala
  4. 0  {zipkin-server → zipkin-scribe}/src/main/scala/com/twitter/zipkin/collector/ResilientZKNode.scala
  5. +5 −5 zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ScribeCollectorService.scala
  6. +63 −0 zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilter.scala
  7. +21 −0 zipkin-scribe/src/main/scala/com/twitter/zipkin/config/ScribeZipkinCollectorConfig.scala
  8. +6 −7 zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/ScribeCollectorServiceSpec.scala
  9. +55 −0 zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilterSpec.scala
  10. +1 −3 zipkin-server/src/main/scala/com/twitter/zipkin/collector/CollectorService.scala
  11. +20 −12 zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueue.scala
  12. +11 −50 zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala
  13. +36 −0 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/FanoutProcessor.scala
  14. +2 −2 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/IndexProcessor.scala
  15. +2 −2 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/OstrichProcessor.scala
  16. +27 −6 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/Processor.scala
  17. +49 −0 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/ProcessorFilter.scala
  18. +39 −0 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/SamplerProcessorFilter.scala
  19. +2 −3 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/StorageProcessor.scala
  20. +28 −14 zipkin-server/src/main/scala/com/twitter/zipkin/config/ZipkinCollectorConfig.scala
  21. +9 −26 zipkin-server/src/test/scala/com/twitter/zipkin/collector/WriteQueueWorkerSpec.scala
  22. +39 −0 zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/FanoutProcessorSpec.scala
  23. +2 −2 zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/OstrichProcessorSpec.scala
  24. +56 −0 zipkin-server/src/test/scala/com/twitter/zipkin/collector/processor/ProcessorFilterSpec.scala
  25. +1 −1  zipkin-test/src/test/resources/TestCollector.scala
View
14 README.md
@@ -84,6 +84,8 @@ Once the data is stored and indexed we need a way to extract it. This is where t
### UI
Most of our users access the data via our UI. It's a Rails app that uses <a href="http://d3js.org/">D3</a> to visualize the trace data. Note that there is no built in authentication in the UI.
+## Modules
+![Modules (doc/modules.png)](https://github.com/twitter/zipkin/raw/master/doc/modules.png)
## Installation
@@ -122,9 +124,9 @@ Note that the above uses the Twitter version of Scribe with support for using Zo
We've developed Zipkin with <a href="http://www.scala-lang.org/downloads">Scala 2.9.1</a>, <a href="http://www.scala-sbt.org/download.html">SBT 0.11.2</a>, and JDK6.
1. `git clone https://github.com/twitter/zipkin.git`
-1. `cd zipkin/zipkin-server`
-1. `cp config/collector-dev.scala config/collector-prod.scala`
-1. `cp config/query-dev.scala config/query-prod.scala`
+1. `cd zipkin`
+1. `cp zipkin-scribe/config/collector-dev.scala zipkin-scribe/config/collector-prod.scala`
+1. `cp zipkin-server/config/query-dev.scala zipkin-server/config/query-prod.scala`
1. Modify the configs above as needed. Pay particular attention to ZooKeeper and Cassandra server entries.
1. `bin/sbt update package-dist` (This downloads SBT 0.11.2 if it doesn't already exist)
1. `scp dist/zipkin*.zip [server]`
@@ -133,6 +135,12 @@ We've developed Zipkin with <a href="http://www.scala-lang.org/downloads">Scala
1. `mkdir -p /var/log/zipkin`
1. Start the collector and query daemon.
+You can also run the collector and query services through SBT.
+
+To run the Scribe collector service: `bin/sbt 'project zipkin-scribe' 'run -f zipkin-scribe/config/collector-dev.scala'`
+
+To run the query service: `bin/sbt 'project zipkin-server' 'run -f zipkin-server/config/query-dev.scala'`
+
### Zipkin UI
The UI is a standard Rails 3 app.
View
BIN  doc/modules.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
View
2  zipkin-scribe/config/collector-dev.scala
@@ -35,7 +35,7 @@ new ScribeZipkinCollectorConfig {
) :: new TimeSeriesCollectorFactory
)
- def writeQueueConfig = new WriteQueueConfig {
+ def writeQueueConfig = new WriteQueueConfig[T] {
writeQueueMaxSize = 500
flusherPoolSize = 10
}
View
0  ...er/zipkin/collector/ResilientZKNode.scala → ...er/zipkin/collector/ResilientZKNode.scala
File renamed without changes
View
10 zipkin-scribe/src/main/scala/com/twitter/zipkin/collector/ScribeCollectorService.scala
@@ -27,7 +27,7 @@ import org.apache.zookeeper.KeeperException
/**
* This class implements the log method from the Scribe Thrift interface.
*/
-class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue, categories: Set[String])
+class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue[Seq[_ <: gen.LogEntry]], categories: Set[String])
extends gen.ZipkinCollector.FutureIface with CollectorService {
private val log = Logger.get
@@ -81,15 +81,15 @@ class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: Writ
return Ok
}
- val scribeMessages = logEntries.flatMap {
+ val scribeMessages = logEntries.filter {
entry =>
if (!categories.contains(entry.category.toLowerCase())) {
Stats.incr("collector.invalid_category")
- None
+ false
} else {
- Some(entry.message)
+ true
}
- }.toList
+ }
if (scribeMessages.isEmpty) {
Ok
View
63 ...-scribe/src/main/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilter.scala
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.twitter.zipkin.collector.processor
+
+import com.twitter.logging.Logger
+import com.twitter.ostrich.stats.Stats
+import com.twitter.scrooge.BinaryThriftStructSerializer
+import com.twitter.zipkin.adapter.ThriftAdapter
+import com.twitter.zipkin.common.Span
+import com.twitter.zipkin.gen
+
+/**
+ * Transforms a `Seq[gen.LogEntry]` to `Seq[Span]` for a collector service to consume.
+ * Assumes:
+ * - the Scribe struct contains a `message` that is the Base64 encoded Thrift Span struct.
+ * - the sequence of `LogEntry`s only contains messages we want to pass on (already filtered
+ * by category)
+ */
+class ScribeProcessorFilter extends ProcessorFilter[Seq[gen.LogEntry], Seq[Span]] {
+
+ private val log = Logger.get
+
+ val deserializer = new BinaryThriftStructSerializer[gen.Span] {
+ def codec = gen.Span
+ }
+
+ def apply(logEntries: Seq[gen.LogEntry]): Seq[Span] = {
+ logEntries.map {
+ _.`message`
+ }.flatMap {
+ msg =>
+ try {
+ val span = Stats.time("deserializeSpan") {
+ deserializer.fromString(msg)
+ }
+ log.ifDebug("Processing span: " + span + " from " + msg)
+ Some(ThriftAdapter(span))
+ } catch {
+ case e: Exception => {
+ // scribe doesn't have any ResultCode.ERROR or similar
+ // let's just swallow this invalid msg
+ log.warning(e, "Invalid msg: %s", msg)
+ Stats.incr("collector.invalid_msg")
+ None
+ }
+ }
+ }
+ }
+}
View
21 zipkin-scribe/src/main/scala/com/twitter/zipkin/config/ScribeZipkinCollectorConfig.scala
@@ -1,7 +1,28 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
package com.twitter.zipkin.config
+import com.twitter.zipkin.collector.processor.ScribeProcessorFilter
import com.twitter.zipkin.config.collector.CollectorServerConfig
+import com.twitter.zipkin.gen
trait ScribeZipkinCollectorConfig extends ZipkinCollectorConfig {
+ type T = Seq[gen.LogEntry]
val serverConfig: CollectorServerConfig = new ScribeCollectorServerConfig(this)
+
+ def rawDataFilter = new ScribeProcessorFilter
}
View
13 zipkin-scribe/src/test/scala/com/twitter/zipkin/collector/ScribeCollectorServiceSpec.scala
@@ -30,15 +30,14 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val serializer = new BinaryThriftStructSerializer[gen.Span] {
def codec = gen.Span
}
+ val category = "zipkin"
val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil)
- val validList = List(gen.LogEntry("b3", serializer.toString(ThriftAdapter(validSpan))))
+ val validList = List(gen.LogEntry(category, serializer.toString(ThriftAdapter(validSpan))))
val wrongCatList = List(gen.LogEntry("wrongcat", serializer.toString(ThriftAdapter(validSpan))))
- val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA="
-
- val queue = mock[WriteQueue]
+ val queue = mock[WriteQueue[Seq[gen.LogEntry]]]
val zkSampleRateConfig = mock[AdjustableRateConfig]
val config = new ScribeZipkinCollectorConfig {
@@ -52,7 +51,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
override lazy val sampleRateConfig = zkSampleRateConfig
}
- def scribeCollectorService = new ScribeCollectorService(config, config.writeQueue, Set("b3")) {
+ def scribeCollectorService = new ScribeCollectorService(config, config.writeQueue, Set(category)) {
running = true
}
@@ -61,7 +60,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val cs = scribeCollectorService
expect {
- one(queue).add(List(base64)) willReturn(true)
+ one(queue).add(validList) willReturn(true)
}
gen.ResultCode.Ok mustEqual cs.log(validList)()
@@ -71,7 +70,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val cs = scribeCollectorService
expect {
- one(queue).add(List(base64)) willReturn(false)
+ one(queue).add(validList) willReturn(false)
}
gen.ResultCode.TryLater mustEqual cs.log(validList)()
View
55 ...ibe/src/test/scala/com/twitter/zipkin/collector/processor/ScribeProcessorFilterSpec.scala
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.zipkin.collector.processor
+
+import com.twitter.scrooge.BinaryThriftStructSerializer
+import com.twitter.zipkin.adapter.ThriftAdapter
+import com.twitter.zipkin.common.{Annotation, Span}
+import com.twitter.zipkin.gen
+import org.specs.Specification
+
+class ScribeProcessorFilterSpec extends Specification {
+ val serializer = new BinaryThriftStructSerializer[gen.Span] {
+ def codec = gen.Span
+ }
+
+ "ScribeProcessorFilter" should {
+ val category = "zipkin"
+
+ val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA="
+
+ val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil)
+ val serialized = serializer.toString(ThriftAdapter(validSpan))
+
+ val base64LogEntries = Seq(gen.LogEntry(category, base64))
+ val serializedLogEntries = Seq(gen.LogEntry(category, serialized))
+
+ val badLogEntries = Seq(gen.LogEntry(category, "garbage!"))
+ val filter = new ScribeProcessorFilter
+
+ "convert gen.LogEntry to Span" in {
+ filter.apply(base64LogEntries) mustEqual Seq(validSpan)
+ }
+
+ "convert serialized thrift to Span" in {
+ filter.apply(serializedLogEntries) mustEqual Seq(validSpan)
+ }
+
+ "deal with garbage" in {
+ filter.apply(badLogEntries) mustEqual Seq.empty[Span]
+ }
+ }
+}
View
4 zipkin-server/src/main/scala/com/twitter/zipkin/collector/CollectorService.scala
@@ -18,8 +18,7 @@ package com.twitter.zipkin.collector
import com.twitter.ostrich.admin.Service
trait CollectorService extends Service {
-
- val writeQueue: WriteQueue
+ val writeQueue: WriteQueue[_ <: AnyRef]
@volatile var running = false
@@ -29,7 +28,6 @@ trait CollectorService extends Service {
def shutdown() {
running = false
- writeQueue.flushAll()
writeQueue.shutdown()
}
}
View
32 zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueue.scala
@@ -17,45 +17,53 @@
package com.twitter.zipkin.collector
import com.twitter.ostrich.admin.Service
-import java.util.concurrent.{Executors, ArrayBlockingQueue}
import com.twitter.ostrich.stats.Stats
-import processor.Processor
-import sampler.GlobalSampler
+import com.twitter.zipkin.collector.processor.Processor
+import com.twitter.zipkin.collector.sampler.GlobalSampler
+import java.util.concurrent.ArrayBlockingQueue
-class WriteQueue(writeQueueMaxSize: Int,
+class WriteQueue[T](writeQueueMaxSize: Int,
flusherPoolSize: Int,
- processors: Seq[Processor],
+ processor: Processor[T],
sampler: GlobalSampler) extends Service {
- private val queue = new ArrayBlockingQueue[List[String]](writeQueueMaxSize)
+ private val queue = new ArrayBlockingQueue[T](writeQueueMaxSize)
Stats.addGauge("write_queue_qsize") { queue.size }
- private var workers: Seq[WriteQueueWorker] = Seq()
+ private var workers: Seq[WriteQueueWorker[T]] = Seq()
+ @volatile var running: Boolean = false
def start() {
workers = (0 until flusherPoolSize).toSeq map { i: Int =>
- val worker = new WriteQueueWorker(queue, processors, sampler)
+ val worker = new WriteQueueWorker[T](queue, processor, sampler)
worker.start()
worker
}
+ running = true
}
/**
* Will block until all entries in queue have been flushed.
* Assumes now new entries will be added to queue.
*/
- def flushAll() {
+ private def flushAll() {
while(!queue.isEmpty) {
Thread.sleep(100)
}
}
def shutdown() {
+ running = false
+ flushAll()
workers foreach { _.stop() }
workers foreach { _.shutdown() }
- processors.foreach {_.shutdown()}
+ processor.shutdown()
}
- def add(messages: List[String]): Boolean = {
- queue.offer(messages)
+ def add(messages: T): Boolean = {
+ if (running) {
+ queue.offer(messages)
+ } else {
+ false
+ }
}
}
View
61 zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala
@@ -16,69 +16,30 @@
*/
package com.twitter.zipkin.collector
-import com.twitter.zipkin.gen
-import com.twitter.zipkin.common.Span
import com.twitter.logging.Logger
-import com.twitter.util.Future
-import processor.Processor
-import sampler.GlobalSampler
-import com.twitter.ostrich.stats.Stats
-import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.ostrich.admin.BackgroundProcess
+import com.twitter.scrooge.BinaryThriftStructSerializer
+import com.twitter.zipkin.gen
+import com.twitter.zipkin.collector.processor.Processor
+import com.twitter.zipkin.collector.sampler.GlobalSampler
import java.util.concurrent.{TimeUnit, BlockingQueue}
-import com.twitter.zipkin.adapter.ThriftAdapter
-class WriteQueueWorker(queue: BlockingQueue[List[String]],
- processors: Seq[Processor],
+class WriteQueueWorker[T](queue: BlockingQueue[T],
+ processor: Processor[T],
sample: GlobalSampler) extends BackgroundProcess("WriteQueueWorker", false) {
private val log = Logger.get
val deserializer = new BinaryThriftStructSerializer[gen.Span] { def codec = gen.Span }
- def runLoop() = {
+ def runLoop() {
val item = queue.poll(500, TimeUnit.MILLISECONDS)
- if (item ne null) {
- item foreach (processScribeMessage(_))
+ if (item != null) {
+ process(item)
}
}
- def processScribeMessage(msg: String) {
- try {
- val span = Stats.time("deserializeSpan") { deserializer.fromString(msg) }
- log.ifDebug("Processing span: " + span + " from " + msg)
- processSpan(ThriftAdapter(span))
- } catch {
- case e: Exception => {
- // scribe doesn't have any ResultCode.ERROR or similar
- // let's just swallow this invalid msg
- log.warning(e, "Invalid msg: %s", msg)
- Stats.incr("collector.invalid_msg")
- }
- }
- }
-
- def processSpan(span: Span) {
- try {
- span.serviceNames.foreach { name => Stats.incr("received_" + name) }
-
- // check if we want to store this particular trace or not
- if (sample(span.traceId)) {
- Stats.time("processSpan") {
- span.serviceNames.foreach { name => Stats.incr("process_" + name) }
- Future.join {
- processors map { _.processSpan(span) }
- } onSuccess { e =>
- Stats.incr("collector.processSpan_success")
- } onFailure { e =>
- Stats.incr("collector.processSpan_failed")
- }
- }
- }
- } catch {
- case e: Exception =>
- log.error(e, "Processing of " + span + " failed %s", e)
- Stats.incr("collector.invalid_msg")
- }
+ private[collector] def process(item: T) {
+ processor.process(item)
}
}
View
36 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/FanoutProcessor.scala
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.twitter.zipkin.collector.processor
+
+import com.twitter.util.Future
+
+/**
+ * Fans out a single item to a set of `Processor`s
+ * @param processors
+ * @tparam T
+ */
+class FanoutProcessor[T](processors: Seq[Processor[T]]) extends Processor[T] {
+ def process(item: T): Future[Unit] = {
+ Future.join {
+ processors map { _.process(item) }
+ }
+ }
+
+ def shutdown() {
+ processors.foreach { _.shutdown() }
+ }
+}
View
4 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/IndexProcessor.scala
@@ -23,9 +23,9 @@ import com.twitter.util.Future
/**
* Index the incoming spans.
*/
-class IndexProcessor(index: Index, indexFilter: IndexingFilter) extends Processor {
+class IndexProcessor(index: Index, indexFilter: IndexingFilter) extends Processor[Span] {
- def processSpan(span: Span) = {
+ def process(span: Span) = {
if (indexFilter.shouldIndex(span)) {
Future.join(Seq {
index.indexTraceIdByServiceAndName(span) onFailure failureHandler("indexTraceIdByServiceAndName")
View
4 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/OstrichProcessor.scala
@@ -26,9 +26,9 @@ import com.twitter.util.Future
* Adds server side duration data to ostrich, which
* in turn can be sent to a monitoring system where it can be queried.
*/
-class OstrichProcessor(serviceStatsPrefix: String) extends Processor {
+class OstrichProcessor(serviceStatsPrefix: String) extends Processor[Span] {
- def processSpan(span: Span): Future[Unit] = {
+ def process(span: Span): Future[Unit] = {
for {
start <- span.getAnnotation(gen.Constants.SERVER_RECV)
end <- span.getAnnotation(gen.Constants.SERVER_SEND)
View
33 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/Processor.scala
@@ -17,24 +17,23 @@ package com.twitter.zipkin.collector.processor
*
*/
-import com.twitter.zipkin.common.Span
import com.twitter.finagle.TooManyWaitersException
import com.twitter.logging.Logger
import com.twitter.ostrich.stats.Stats
import com.twitter.util.Future
/**
- * A processor that takes a span and processes it some way.
+ * A processor that takes an item (Span) and processes it some way.
* Could be: storing it or aggregating the data in some way for example.
*/
-trait Processor {
+trait Processor[T] {
private val log = Logger.get
/**
- * Process the span.
+ * Process the item.
*/
- def processSpan(span: Span): Future[Unit]
+ def process(item: T): Future[Unit]
/**
* Shut down this processor
@@ -48,4 +47,26 @@ trait Processor {
log.error(e, method)
}
}
-}
+}
+
+class NullProcessor[T] extends Processor[T] {
+ def process(item: T): Future[Unit] = Future.Unit
+ def shutdown() {}
+}
+
+/**
+ * Processes a sequence of items
+ * @param processor
+ * @tparam T
+ */
+class SequenceProcessor[T](processor: Processor[T]) extends Processor[Seq[T]] {
+ def process(items: Seq[T]): Future[Unit] = {
+ Future.join {
+ items.map {
+ processor.process(_)
+ }
+ }
+ }
+
+ def shutdown() { processor.shutdown() }
+}
View
49 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/ProcessorFilter.scala
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.twitter.zipkin.collector.processor
+
+import com.twitter.util.Future
+
+/**
+ * `ProcessFilter`s are filters that can be composed on top of `Processor`s to transform
+ * items between data types.
+ * @tparam T input data type
+ * @tparam U output data type
+ */
+trait ProcessorFilter[T,U] {
+
+ def andThen[V](next: ProcessorFilter[U,V]): ProcessorFilter[T,V] =
+ new ProcessorFilter[T,V] {
+ def apply(item: T): V = {
+ next.apply {
+ ProcessorFilter.this.apply(item)
+ }
+ }
+ }
+
+ def andThen(processor: Processor[U]): Processor[T] = new Processor[T] {
+ def process(item: T): Future[Unit] = {
+ processor.process {
+ ProcessorFilter.this.apply(item)
+ }
+ }
+
+ def shutdown() { processor.shutdown() }
+ }
+
+ def apply(item: T): U
+}
View
39 ...server/src/main/scala/com/twitter/zipkin/collector/processor/SamplerProcessorFilter.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.twitter.zipkin.collector.processor
+
+import com.twitter.ostrich.stats.Stats
+import com.twitter.zipkin.collector.sampler.GlobalSampler
+import com.twitter.zipkin.common.Span
+
+/**
+ * Filters out `Span`s that do not meet a `GlobalSampler`'s criteria
+ * @param sampler
+ */
+class SamplerProcessorFilter(sampler: GlobalSampler) extends ProcessorFilter[Seq[Span], Seq[Span]] {
+ def apply(spans: Seq[Span]): Seq[Span] = {
+ spans.flatMap { span =>
+ span.serviceNames.foreach { name => Stats.incr("received_" + name) }
+
+ if (sampler(span.traceId)) {
+ Some(span)
+ } else {
+ None
+ }
+ }
+ }
+}
View
5 zipkin-server/src/main/scala/com/twitter/zipkin/collector/processor/StorageProcessor.scala
@@ -17,14 +17,13 @@ package com.twitter.zipkin.collector.processor
import com.twitter.zipkin.common.Span
import com.twitter.zipkin.storage.Storage
-import com.twitter.logging.Logger
/**
* Store the incoming span in the storage system.
*/
-class StorageProcessor(storage: Storage) extends Processor {
+class StorageProcessor(storage: Storage) extends Processor[Span] {
- def processSpan(span: Span) =
+ def process(span: Span) =
storage.storeSpan(span) onFailure failureHandler("storeSpan")
def shutdown() = storage.close()
View
42 zipkin-server/src/main/scala/com/twitter/zipkin/config/ZipkinCollectorConfig.scala
@@ -18,7 +18,6 @@ package com.twitter.zipkin.config
import com.twitter.zipkin.storage.{Index, Storage}
import com.twitter.zipkin.collector.{WriteQueue, ZipkinCollector}
import com.twitter.zipkin.collector.filter.{DefaultClientIndexingFilter, IndexingFilter}
-import com.twitter.zipkin.collector.processor.{OstrichProcessor, IndexProcessor, StorageProcessor, Processor}
import com.twitter.zipkin.collector.sampler.{AdaptiveSampler, ZooKeeperGlobalSampler, GlobalSampler}
import com.twitter.zipkin.config.collector.CollectorServerConfig
import com.twitter.zipkin.config.sampler._
@@ -32,6 +31,8 @@ import java.net.{InetAddress, InetSocketAddress}
import org.apache.zookeeper.ZooDefs.Ids
import scala.collection.JavaConverters._
import scala.collection.Set
+import com.twitter.zipkin.collector.processor._
+import com.twitter.zipkin.common.Span
trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] {
@@ -93,14 +94,27 @@ trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] {
def globalSampler: GlobalSampler = new ZooKeeperGlobalSampler(sampleRateConfig)
- lazy val processors: Seq[Processor] = {
- new StorageProcessor(storage) ::
- new IndexProcessor(index, indexingFilter) ::
- new OstrichProcessor(serviceStatsPrefix)
- }
-
- def writeQueueConfig: WriteQueueConfig
- lazy val writeQueue: WriteQueue = writeQueueConfig.apply(processors, globalSampler)
+ /**
+ * To accommodate a particular input type `T`, define a `rawDataFilter` that
+ * converts the input data type (ex: Scrooge-generated Thrift) into a
+ * sequence of `com.twitter.zipkin.common.Span`s
+ */
+ type T
+ def rawDataFilter: ProcessorFilter[T, Seq[Span]]
+
+ lazy val processor: Processor[T] =
+ rawDataFilter andThen
+ new SamplerProcessorFilter(globalSampler) andThen
+ new SequenceProcessor[Span](
+ new FanoutProcessor[Span]({
+ new StorageProcessor(storage) ::
+ new IndexProcessor(index, indexingFilter) ::
+ new OstrichProcessor(serviceStatsPrefix)
+ })
+ )
+
+ def writeQueueConfig: WriteQueueConfig[T]
+ lazy val writeQueue: WriteQueue[T] = writeQueueConfig.apply(processor, globalSampler)
lazy val indexingFilter: IndexingFilter = new DefaultClientIndexingFilter
@@ -113,20 +127,20 @@ trait ZipkinCollectorConfig extends ZipkinConfig[ZipkinCollector] {
}
}
-trait WriteQueueConfig extends Config[WriteQueue] {
+trait WriteQueueConfig[T] extends Config[WriteQueue[T]] {
var writeQueueMaxSize: Int = 500
var flusherPoolSize: Int = 10
- def apply(processors: Seq[Processor], sampler: GlobalSampler): WriteQueue = {
- val wq = new WriteQueue(writeQueueMaxSize, flusherPoolSize, processors, sampler)
+ def apply(processor: Processor[T], sampler: GlobalSampler): WriteQueue[T] = {
+ val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, processor, sampler)
wq.start()
ServiceTracker.register(wq)
wq
}
- def apply(): WriteQueue = {
- val wq = new WriteQueue(writeQueueMaxSize, flusherPoolSize, Seq.empty, new GlobalSampler{})
+ def apply(): WriteQueue[T] = {
+ val wq = new WriteQueue[T](writeQueueMaxSize, flusherPoolSize, new NullProcessor[T], new GlobalSampler{})
wq.start()
ServiceTracker.register(wq)
wq
View
35 zipkin-server/src/test/scala/com/twitter/zipkin/collector/WriteQueueWorkerSpec.scala
@@ -16,44 +16,27 @@
*/
package com.twitter.zipkin.collector
+import com.twitter.zipkin.collector.processor.Processor
+import com.twitter.zipkin.collector.sampler.GlobalSampler
+import com.twitter.zipkin.common.{Annotation, Endpoint, Span}
import org.specs.Specification
import org.specs.mock.{ClassMocker, JMocker}
-import processor.Processor
-import sampler.GlobalSampler
-import scala.collection._
-import com.twitter.zipkin.common.{Annotation, Endpoint, Span}
import java.util.concurrent.BlockingQueue
class WriteQueueWorkerSpec extends Specification with JMocker with ClassMocker {
"WriteQueueWorker" should {
- "sample" in {
+ "hand off to processor" in {
+ val processor = mock[Processor[Span]]
+ val queue = mock[BlockingQueue[Span]]
val sampler = mock[GlobalSampler]
- val processor = mock[Processor]
- val queue = mock[BlockingQueue[List[String]]]
- val w = new WriteQueueWorker(queue, Seq(processor), sampler)
+ val w = new WriteQueueWorker[Span](queue, processor, sampler)
val span = Span(123, "boo", 456, None, List(Annotation(123, "value", Some(Endpoint(1,2,"service")))), Nil)
expect {
- one(sampler).apply(123L) willReturn(true)
- one(processor).processSpan(span)
+ one(processor).process(span)
}
-
- w.processSpan(span)
- }
-
- "deserialize garbage" in {
- val garbage = "garbage!"
- val sampler = mock[GlobalSampler]
- val processor = mock[Processor]
-
- val w = new WriteQueueWorker(null, Seq(processor), sampler)
-
- expect {
- never(processor).processSpan(any)
- }
-
- w.processScribeMessage(garbage)
+ w.process(span)
}
}
}
View
39 ...in-server/src/test/scala/com/twitter/zipkin/collector/processor/FanoutProcessorSpec.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.twitter.zipkin.collector.processor
+
+import org.specs.Specification
+import org.specs.mock.{JMocker, ClassMocker}
+
+class FanoutProcessorSpec extends Specification with JMocker with ClassMocker {
+ "FanoutProcessor" should {
+ "fanout" in {
+ val proc1 = mock[Processor[Int]]
+ val proc2 = mock[Processor[Int]]
+
+ val fanout = new FanoutProcessor[Int](Seq(proc1, proc2))
+ val item = 1
+
+ expect {
+ one(proc1).process(item)
+ one(proc2).process(item)
+ }
+
+ fanout.process(item)
+ }
+ }
+}
View
4 ...n-server/src/test/scala/com/twitter/zipkin/collector/processor/OstrichProcessorSpec.scala
@@ -39,7 +39,7 @@ class OstrichProcessorSpec extends Specification {
val span = Span(12345, "methodcall", 666, None, List(annotation1, annotation2, annotation3), Nil)
- agg.processSpan(span)
+ agg.process(span)
Stats.getMetrics()(prefix + "service") mustEqual distribution
@@ -55,7 +55,7 @@ class OstrichProcessorSpec extends Specification {
val span = Span(12345, "methodcall", 666, None, List(annotation1, annotation2, annotation3), Nil)
- agg.processSpan(span)
+ agg.process(span)
Stats.getMetrics()(prefix + "service") mustNotBe distribution
Stats.getMetrics()(prefix + "service.methodcall") mustNotBe distribution
View
56 ...in-server/src/test/scala/com/twitter/zipkin/collector/processor/ProcessorFilterSpec.scala
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2012 Twitter Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.twitter.zipkin.collector.processor
+
+import org.specs.mock.{JMocker, ClassMocker}
+import org.specs.Specification
+
+class ProcessorFilterSpec extends Specification with JMocker with ClassMocker {
+ "ProcessorFilter" should {
+ "compose" in {
+ val filter1 = new ProcessorFilter[Int, Double] {
+ def apply(item: Int) = (item + 1).toDouble
+ }
+ val filter2 = new ProcessorFilter[Double, Long] {
+ def apply(item: Double) = (item + 1).toLong
+ }
+
+ "with other filter" in {
+ val composed = filter1 andThen filter2
+ val item = 1
+ val expected = 3L
+
+ val actual = composed.apply(item)
+ actual mustEqual expected
+ }
+
+ "with Processor" in {
+ val proc = mock[Processor[Long]]
+ val composed = filter1 andThen filter2 andThen proc
+
+ val item = 1
+ val procExpected = 3L
+
+ expect {
+ one(proc).process(procExpected)
+ }
+
+ composed.process(item)
+ }
+ }
+ }
+}
View
2  zipkin-test/src/test/resources/TestCollector.scala
@@ -37,7 +37,7 @@ new ScribeZipkinCollectorConfig {
) :: new TimeSeriesCollectorFactory
)
- def writeQueueConfig = new WriteQueueConfig {
+ def writeQueueConfig = new WriteQueueConfig[T] {
writeQueueMaxSize = 500
flusherPoolSize = 10
}
Please sign in to comment.
Something went wrong with that request. Please try again.