Skip to content

Commit

Permalink
Modularize remaining collector Scribe dependencies
Browse files Browse the repository at this point in the history
This change makes it easier to extend the collector to accept different
input types. Doing so would only require writing an `Adapter` and a
`ProcessFilter`.

Changes:
- Add `ProcessorFilter`
  - Composable filter on top of `Processor`s that can transform the
    input data type
- Move all deserialization and extraction of data from
  `CollectorService` and `WriteQueueWorker` to Scribe related
  `ProcessFilter` and `Processor`.
- `WriteQueueWorker` now takes only one `Processor` rather than a
  sequence of them.
- Add `FanoutProcessor` to blast work to multiple `Processor`s
- Update README with module diagram

Author: @franklinhu
Pull Request: #31
URL: #31
  • Loading branch information
Franklin Hu committed Jun 14, 2012
1 parent 0a3d75e commit 9e5c223
Show file tree
Hide file tree
Showing 25 changed files with 486 additions and 137 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ Once the data is stored and indexed we need a way to extract it. This is where t
### UI
Most of our users access the data via our UI. It's a Rails app that uses <a href="http://d3js.org/">D3</a> to visualize the trace data. Note that there is no built in authentication in the UI.

## Modules
![Modules (doc/modules.png)](https://github.com/twitter/zipkin/raw/master/doc/modules.png)

## Installation

Expand Down Expand Up @@ -122,9 +124,9 @@ Note that the above uses the Twitter version of Scribe with support for using Zo
We've developed Zipkin with <a href="http://www.scala-lang.org/downloads">Scala 2.9.1</a>, <a href="http://www.scala-sbt.org/download.html">SBT 0.11.2</a>, and JDK6.

1. `git clone https://github.com/twitter/zipkin.git`
1. `cd zipkin/zipkin-server`
1. `cp config/collector-dev.scala config/collector-prod.scala`
1. `cp config/query-dev.scala config/query-prod.scala`
1. `cd zipkin`
1. `cp zipkin-scribe/config/collector-dev.scala zipkin-scribe/config/collector-prod.scala`
1. `cp zipkin-server/config/query-dev.scala zipkin-server/config/query-prod.scala`
1. Modify the configs above as needed. Pay particular attention to ZooKeeper and Cassandra server entries.
1. `bin/sbt update package-dist` (This downloads SBT 0.11.2 if it doesn't already exist)
1. `scp dist/zipkin*.zip [server]`
Expand All @@ -133,6 +135,12 @@ We've developed Zipkin with <a href="http://www.scala-lang.org/downloads">Scala
1. `mkdir -p /var/log/zipkin`
1. Start the collector and query daemon.

You can also run the collector and query services through SBT.

To run the Scribe collector service: `bin/sbt 'project zipkin-scribe' 'run -f zipkin-scribe/config/collector-dev.scala'`

To run the query service: `bin/sbt 'project zipkin-server' 'run -f zipkin-server/config/query-dev.scala'`

### Zipkin UI
The UI is a standard Rails 3 app.

Expand Down
Binary file added doc/modules.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion zipkin-scribe/config/collector-dev.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ new ScribeZipkinCollectorConfig {
) :: new TimeSeriesCollectorFactory
)

def writeQueueConfig = new WriteQueueConfig {
def writeQueueConfig = new WriteQueueConfig[T] {
writeQueueMaxSize = 500
flusherPoolSize = 10
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.zookeeper.KeeperException
/**
* This class implements the log method from the Scribe Thrift interface.
*/
class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue, categories: Set[String])
class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: WriteQueue[Seq[_ <: gen.LogEntry]], categories: Set[String])
extends gen.ZipkinCollector.FutureIface with CollectorService {
private val log = Logger.get

Expand Down Expand Up @@ -81,15 +81,15 @@ class ScribeCollectorService(config: ZipkinCollectorConfig, val writeQueue: Writ
return Ok
}

val scribeMessages = logEntries.flatMap {
val scribeMessages = logEntries.filter {
entry =>
if (!categories.contains(entry.category.toLowerCase())) {
Stats.incr("collector.invalid_category")
None
false
} else {
Some(entry.message)
true
}
}.toList
}

if (scribeMessages.isEmpty) {
Ok
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.twitter.zipkin.collector.processor

import com.twitter.logging.Logger
import com.twitter.ostrich.stats.Stats
import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.zipkin.adapter.ThriftAdapter
import com.twitter.zipkin.common.Span
import com.twitter.zipkin.gen

/**
* Transforms a `Seq[gen.LogEntry]` to `Seq[Span]` for a collector service to consume.
* Assumes:
* - the Scribe struct contains a `message` that is the Base64 encoded Thrift Span struct.
* - the sequence of `LogEntry`s only contains messages we want to pass on (already filtered
* by category)
*/
class ScribeProcessorFilter extends ProcessorFilter[Seq[gen.LogEntry], Seq[Span]] {

private val log = Logger.get

val deserializer = new BinaryThriftStructSerializer[gen.Span] {
def codec = gen.Span
}

def apply(logEntries: Seq[gen.LogEntry]): Seq[Span] = {
logEntries.map {
_.`message`
}.flatMap {
msg =>
try {
val span = Stats.time("deserializeSpan") {
deserializer.fromString(msg)
}
log.ifDebug("Processing span: " + span + " from " + msg)
Some(ThriftAdapter(span))
} catch {
case e: Exception => {
// scribe doesn't have any ResultCode.ERROR or similar
// let's just swallow this invalid msg
log.warning(e, "Invalid msg: %s", msg)
Stats.incr("collector.invalid_msg")
None
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,28 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.twitter.zipkin.config

import com.twitter.zipkin.collector.processor.ScribeProcessorFilter
import com.twitter.zipkin.config.collector.CollectorServerConfig
import com.twitter.zipkin.gen

trait ScribeZipkinCollectorConfig extends ZipkinCollectorConfig {
type T = Seq[gen.LogEntry]
val serverConfig: CollectorServerConfig = new ScribeCollectorServerConfig(this)

def rawDataFilter = new ScribeProcessorFilter
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val serializer = new BinaryThriftStructSerializer[gen.Span] {
def codec = gen.Span
}
val category = "zipkin"

val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil)
val validList = List(gen.LogEntry("b3", serializer.toString(ThriftAdapter(validSpan))))
val validList = List(gen.LogEntry(category, serializer.toString(ThriftAdapter(validSpan))))

val wrongCatList = List(gen.LogEntry("wrongcat", serializer.toString(ThriftAdapter(validSpan))))

val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA="

val queue = mock[WriteQueue]
val queue = mock[WriteQueue[Seq[gen.LogEntry]]]
val zkSampleRateConfig = mock[AdjustableRateConfig]

val config = new ScribeZipkinCollectorConfig {
Expand All @@ -52,7 +51,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
override lazy val sampleRateConfig = zkSampleRateConfig
}

def scribeCollectorService = new ScribeCollectorService(config, config.writeQueue, Set("b3")) {
def scribeCollectorService = new ScribeCollectorService(config, config.writeQueue, Set(category)) {
running = true
}

Expand All @@ -61,7 +60,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val cs = scribeCollectorService

expect {
one(queue).add(List(base64)) willReturn(true)
one(queue).add(validList) willReturn(true)
}

gen.ResultCode.Ok mustEqual cs.log(validList)()
Expand All @@ -71,7 +70,7 @@ class ScribeCollectorServiceSpec extends Specification with JMocker with ClassMo
val cs = scribeCollectorService

expect {
one(queue).add(List(base64)) willReturn(false)
one(queue).add(validList) willReturn(false)
}

gen.ResultCode.TryLater mustEqual cs.log(validList)()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.zipkin.collector.processor

import com.twitter.scrooge.BinaryThriftStructSerializer
import com.twitter.zipkin.adapter.ThriftAdapter
import com.twitter.zipkin.common.{Annotation, Span}
import com.twitter.zipkin.gen
import org.specs.Specification

class ScribeProcessorFilterSpec extends Specification {
val serializer = new BinaryThriftStructSerializer[gen.Span] {
def codec = gen.Span
}

"ScribeProcessorFilter" should {
val category = "zipkin"

val base64 = "CgABAAAAAAAAAHsLAAMAAAADYm9vCgAEAAAAAAAAAcgPAAYMAAAAAQoAAQAAAAAAAAABCwACAAAAA2JhaAAPAAgMAAAAAAA="

val validSpan = Span(123, "boo", 456, None, List(new Annotation(1, "bah", None)), Nil)
val serialized = serializer.toString(ThriftAdapter(validSpan))

val base64LogEntries = Seq(gen.LogEntry(category, base64))
val serializedLogEntries = Seq(gen.LogEntry(category, serialized))

val badLogEntries = Seq(gen.LogEntry(category, "garbage!"))
val filter = new ScribeProcessorFilter

"convert gen.LogEntry to Span" in {
filter.apply(base64LogEntries) mustEqual Seq(validSpan)
}

"convert serialized thrift to Span" in {
filter.apply(serializedLogEntries) mustEqual Seq(validSpan)
}

"deal with garbage" in {
filter.apply(badLogEntries) mustEqual Seq.empty[Span]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package com.twitter.zipkin.collector
import com.twitter.ostrich.admin.Service

trait CollectorService extends Service {

val writeQueue: WriteQueue
val writeQueue: WriteQueue[_ <: AnyRef]

@volatile var running = false

Expand All @@ -29,7 +28,6 @@ trait CollectorService extends Service {

def shutdown() {
running = false
writeQueue.flushAll()
writeQueue.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,53 @@
package com.twitter.zipkin.collector

import com.twitter.ostrich.admin.Service
import java.util.concurrent.{Executors, ArrayBlockingQueue}
import com.twitter.ostrich.stats.Stats
import processor.Processor
import sampler.GlobalSampler
import com.twitter.zipkin.collector.processor.Processor
import com.twitter.zipkin.collector.sampler.GlobalSampler
import java.util.concurrent.ArrayBlockingQueue

class WriteQueue(writeQueueMaxSize: Int,
class WriteQueue[T](writeQueueMaxSize: Int,
flusherPoolSize: Int,
processors: Seq[Processor],
processor: Processor[T],
sampler: GlobalSampler) extends Service {

private val queue = new ArrayBlockingQueue[List[String]](writeQueueMaxSize)
private val queue = new ArrayBlockingQueue[T](writeQueueMaxSize)
Stats.addGauge("write_queue_qsize") { queue.size }
private var workers: Seq[WriteQueueWorker] = Seq()
private var workers: Seq[WriteQueueWorker[T]] = Seq()
@volatile var running: Boolean = false

def start() {
workers = (0 until flusherPoolSize).toSeq map { i: Int =>
val worker = new WriteQueueWorker(queue, processors, sampler)
val worker = new WriteQueueWorker[T](queue, processor, sampler)
worker.start()
worker
}
running = true
}

/**
* Will block until all entries in queue have been flushed.
* Assumes now new entries will be added to queue.
*/
def flushAll() {
private def flushAll() {
while(!queue.isEmpty) {
Thread.sleep(100)
}
}

def shutdown() {
running = false
flushAll()
workers foreach { _.stop() }
workers foreach { _.shutdown() }
processors.foreach {_.shutdown()}
processor.shutdown()
}

def add(messages: List[String]): Boolean = {
queue.offer(messages)
def add(messages: T): Boolean = {
if (running) {
queue.offer(messages)
} else {
false
}
}
}
Loading

0 comments on commit 9e5c223

Please sign in to comment.