Skip to content

Commit

Permalink
Reduce the garbage generated in the collector. Before we got about 40…
Browse files Browse the repository at this point in the history
…0kb of garbage per sampled request, after it's at about 300kb.

Includes these changes
* don't recalculate the lowercase version of service names in span each time they are used
* don't create the incoming data log message unless debug is turned on
* reuse some data instead of recalculating when indexing in Cassandra

Author: johanoskarsson
Pull Request: #25
URL: #25
  • Loading branch information
johanoskarsson committed Jun 13, 2012
1 parent 8edcc1b commit 02fbd24
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 11 deletions.
Expand Up @@ -80,9 +80,10 @@ case class Span(traceId: Long, name: String, id: Long, parentId: Option[Long],
binaryAnnotations)
}

def serviceNames: Set[String] = {
annotations.flatMap(a => a.host.map(h => h.serviceName.toLowerCase)).toSet
}
/**
* @return the unique set of service names as lower case
*/
lazy val serviceNames: Set[String] = annotations.flatMap(a => a.host.map(h => h.serviceName.toLowerCase)).toSet

/**
* Tries to extract the best possible service name
Expand Down
Expand Up @@ -27,7 +27,7 @@ class SpanSpec extends Specification {
List(expectedAnnotation), Nil)

val annotation1 = Annotation(1, "value1", Some(Endpoint(1, 2, "service")))
val annotation2 = Annotation(2, "value2", Some(Endpoint(3, 4, "service")))
val annotation2 = Annotation(2, "value2", Some(Endpoint(3, 4, "Service"))) // upper case service name
val annotation3 = Annotation(3, "value3", Some(Endpoint(5, 6, "service")))

val spanWith3Annotations = Span(12345, "methodcall", 666, None,
Expand All @@ -52,7 +52,13 @@ class SpanSpec extends Specification {
Span.fromThrift(noBinaryAnnotationsSpan) mustEqual Span(0, "name", 0, None, List(), Seq())
}

"getAnnotationsAsMap" in {
"serviceNames is lowercase" in {
val names = spanWith3Annotations.serviceNames
names.size mustBe 1
names(0) mustBe "service"
}

"serviceNames" in {
val map = expectedSpan.getAnnotationsAsMap
val actualAnnotation = map.get(annotationValue).get
expectedAnnotation mustEqual actualAnnotation
Expand Down
Expand Up @@ -45,7 +45,7 @@ class WriteQueueWorker(queue: BlockingQueue[List[String]],
def processScribeMessage(msg: String) {
try {
val span = Stats.time("deserializeSpan") { deserializer.fromString(msg) }
log.debug("Processing span: " + span + " from " + msg)
log.ifDebug("Processing span: " + span + " from " + msg)
processSpan(Span.fromThrift(span))
} catch {
case e: Exception => {
Expand Down
@@ -0,0 +1,24 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.twitter.zipkin.collector.sampler

/**
* Let through everything.
*/
object EverythingGlobalSampler extends GlobalSampler {
override def apply(traceId: Long): Boolean = true
}
Expand Up @@ -220,14 +220,13 @@ trait CassandraIndex extends Index with Cassandra {

val futures = serviceNames.map(serviceName => {
WRITE_REQUEST_COUNTER.incr()
val serviceSpanIndexKey = serviceName.toLowerCase + "." + span.name.toLowerCase
val serviceSpanIndexKey = serviceName + "." + span.name.toLowerCase
val serviceSpanIndexCol = Column[Long, Long](timestamp, span.traceId).ttl(config.tracesTimeToLive)
val serviceSpanNameFuture = serviceSpanNameIndex.insert(serviceSpanIndexKey, serviceSpanIndexCol)

WRITE_REQUEST_COUNTER.incr()
val serviceIndexKey = serviceName.toLowerCase
val serviceIndexCol = Column[Long, Long](timestamp, span.traceId).ttl(config.tracesTimeToLive)
val serviceNameFuture = serviceNameIndex.insert(serviceIndexKey, serviceIndexCol)
val serviceNameFuture = serviceNameIndex.insert(serviceName, serviceIndexCol)
List(serviceSpanNameFuture, serviceNameFuture)
}).toList.flatten
Future.join(futures)
Expand Down Expand Up @@ -264,9 +263,10 @@ trait CassandraIndex extends Index with Cassandra {
ba.host match {
case Some(endpoint) => {
WRITE_REQUEST_COUNTER.incr(2)
val key = encode(endpoint.serviceName, ba.key).getBytes
val col = Column[Long, Long](timestamp, span.traceId).ttl(config.tracesTimeToLive)
batch.insert(ByteBuffer.wrap(encode(endpoint.serviceName, ba.key).getBytes ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col)
batch.insert(ByteBuffer.wrap(encode(endpoint.serviceName, ba.key).getBytes), col)
batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col)
batch.insert(ByteBuffer.wrap(key), col)
}
case None =>
}
Expand Down

0 comments on commit 02fbd24

Please sign in to comment.