Skip to content

Commit

Permalink
finagle-zipkin: Improve concurrency of DeadlineSpanMap
Browse files Browse the repository at this point in the history
Problem

`c.t.f.zipkin.thrift.DeadlineSpanMap` uses coarse grained locks for
thread-safety.

Solution

Use finer grained synchronization on the individual MutableSpan's via
a `j.u.c.ConcurrentHashMap`.

RB_ID=714446
  • Loading branch information
kevinoliver authored and jenkins committed Jul 20, 2015
1 parent 3bbb502 commit ed7e492
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 52 deletions.
@@ -1,85 +1,78 @@
package com.twitter.finagle.zipkin.thrift

import collection.mutable.{ArrayBuffer, HashMap}
import com.twitter.finagle.service.TimeoutFilter
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.thrift.Constants
import com.twitter.finagle.tracing.TraceId
import com.twitter.util.{Time, Timer, Duration, Future, TimerTask}
import com.twitter.util.{Duration, Future, Time, Timer}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer

/**
* Takes care of storing the spans in a thread safe fashion. If a span
* is not removed from the map it will expire after the deadline is reached
* and sent off to scribe despite being incomplete.
*/
private class DeadlineSpanMap(
logSpans: Seq[Span] => Future[Unit],
ttl: Duration,
statsReceiver: StatsReceiver,
timer: Timer
) {
logSpans: Seq[Span] => Future[Unit],
ttl: Duration,
statsReceiver: StatsReceiver,
timer: Timer)
{

private[this] val spanMap = HashMap[TraceId, MutableSpan]()
private[this] val unfinishedCounter = statsReceiver.scope("log_span").counter("unfinished")
private[this] val spanMap = new ConcurrentHashMap[TraceId, MutableSpan](64)

private[this] val timerTask = timer.schedule(ttl / 2) { flush(ttl.ago) }

/**
* Synchronously update the mutable span.
* Update the mutable span.
*
* This will create a new MutableSpan if one does not exist otherwise the existing
* span will be provided.
*
* If the span is deemed complete it will be removed from the map and sent to `logSpans`
*/
def update(traceId: TraceId)(f: MutableSpan => Unit): Unit = synchronized {
val span = spanMap.get(traceId) match {
case Some(span) =>
f(span)
span

case None =>
val span = new MutableSpan(traceId, Time.now)
spanMap.put(traceId, span)
f(span)
def update(traceId: TraceId)(f: MutableSpan => Unit): Unit = {
val span: MutableSpan = {
val span = spanMap.get(traceId)
if (span != null) {
span
} else {
val newSpan = new MutableSpan(traceId, Time.now)
val prev = spanMap.putIfAbsent(traceId, newSpan)
if (prev == null) newSpan else prev
}
}

f(span)

if (span.isComplete) {
remove(traceId)
spanMap.remove(traceId, span)
logSpans(Seq(span.toSpan))
}
}

/**
* Synchronize and remove the span from the map.
*/
def remove(traceId: TraceId): Option[MutableSpan] = synchronized {
spanMap.remove(traceId)
}

/**
* Flush spans created earlier than `now`
*
* @return Future indicating completion.
*/
def flush(deadline: Time): Future[Unit] = {
val spans = new ArrayBuffer[Span](spanMap.size)

synchronized {
spanMap.keys.toList foreach { traceId =>
spanMap.get(traceId) match {
case Some(span) if span.started <= deadline =>
remove(traceId)
span.addAnnotation(ZipkinAnnotation(deadline, "finagle.flush", span.endpoint, None))
spans.append(span.toSpan)
case _ => ()
}
val ss = new ArrayBuffer[Span](spanMap.size)

val iter = spanMap.entrySet.iterator
while (iter.hasNext) {
val kv = iter.next()
val span = kv.getValue
if (span.started <= deadline) {
spanMap.remove(kv.getKey, span)
span.addAnnotation(ZipkinAnnotation(deadline, "finagle.flush", span.endpoint, None))
ss.append(span.toSpan)
}
}

if (spans.isEmpty) Future.Done
else logSpans(spans.toSeq)
if (ss.isEmpty) Future.Done
else logSpans(ss)
}

/**
Expand All @@ -97,22 +90,22 @@ private final class MutableSpan(val traceId: TraceId, val started: Time) {
private[this] var _service: Option[String] = None
private[this] var _endpoint: Endpoint = Endpoint.Unknown

private[this] var annotations = ArrayBuffer.empty[ZipkinAnnotation]
private[this] val annotations = ArrayBuffer.empty[ZipkinAnnotation]
private[this] val binaryAnnotations = ArrayBuffer.empty[BinaryAnnotation]

def endpoint: Endpoint = _endpoint
def endpoint: Endpoint = synchronized { _endpoint }

def setName(n: String): MutableSpan = {
def setName(n: String): MutableSpan = synchronized {
_name = Some(n)
this
}

def setServiceName(n: String): MutableSpan = {
def setServiceName(n: String): MutableSpan = synchronized {
_service = Some(n)
this
}

def addAnnotation(ann: ZipkinAnnotation): MutableSpan = {
def addAnnotation(ann: ZipkinAnnotation): MutableSpan = synchronized {
if (!_isComplete && (
ann.value.equals(Constants.CLIENT_RECV) ||
ann.value.equals(Constants.SERVER_SEND) ||
Expand All @@ -123,12 +116,12 @@ private final class MutableSpan(val traceId: TraceId, val started: Time) {
this
}

def addBinaryAnnotation(ann: BinaryAnnotation): MutableSpan = {
def addBinaryAnnotation(ann: BinaryAnnotation): MutableSpan = synchronized {
binaryAnnotations.append(ann)
this
}

def setEndpoint(ep: Endpoint): MutableSpan = {
def setEndpoint(ep: Endpoint): MutableSpan = synchronized {
_endpoint = ep
var idx = 0
while (idx < annotations.size) {
Expand All @@ -139,8 +132,9 @@ private final class MutableSpan(val traceId: TraceId, val started: Time) {
this
}

def toSpan: Span =
def toSpan: Span = synchronized {
Span(traceId, _service, _name, annotations, binaryAnnotations, _endpoint)
}

def isComplete: Boolean = _isComplete
def isComplete: Boolean = synchronized { _isComplete }
}
Expand Up @@ -27,8 +27,7 @@ class DeadlineSpanMapTest extends FunSuite {
tc.advance(10.seconds) // advance timer
timer.tick() // execute scheduled event

// span must have been removed and logged
assert(map.remove(traceId) === None)
// span must have been logged
assert(spansLogged)
}
}
Expand Down

0 comments on commit ed7e492

Please sign in to comment.