Skip to content

Commit

Permalink
finagle-zipkin-core: Address race condition in DeadlineSpanMap
Browse files Browse the repository at this point in the history
Problem

DeadlineSpanMap uses a mutable structure, `MutableSpan`, to buffer up
changes for a given TraceId. It has an async task, `flush`, that runs
periodically and can cause calls to `update` to be dropped.

Solution

Use locking to check when this occurs and if it does, run the update
operation again and immediately run `logSpans` on that.

Result

No more lost updates.

JIRA Issues: CSL-8210

Differential Revision: https://phabricator.twitter.biz/D319367
  • Loading branch information
kevinoliver authored and jenkins committed Jun 5, 2019
1 parent 256b79b commit 53901a2
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 108 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -52,6 +52,12 @@ Runtime Behavior Changes
failures (e.g. backup request cancellations). To represent these cases, we
introduce a new ResponseClass: Ignorable. ``PHAB_ID=D316884``

Bug Fixes
~~~~~~~~~

* finagle-zipkin-core: Fix a race condition which could cause a span to get logged
missing some annotations. ``PHAB_ID=D319367``

19.5.1
------

Expand Down
@@ -1,13 +1,10 @@
package com.twitter.finagle.zipkin.core

import com.twitter.conversions.DurationOps._
import com.twitter.finagle.service.TimeoutFilter
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.thrift.thrift.Constants
import com.twitter.finagle.tracing.TraceId
import com.twitter.util.{Duration, Future, Time, Timer}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer

/**
* Takes care of storing the spans in a thread safe fashion. If a span
Expand All @@ -32,75 +29,80 @@ import scala.collection.mutable.ArrayBuffer
private class DeadlineSpanMap(
logSpans: Seq[Span] => Future[Unit],
ttl: Duration,
statsReceiver: StatsReceiver,
timer: Timer,
hold: Duration = 500.milliseconds) {

private[this] val spanMap = new ConcurrentHashMap[TraceId, MutableSpan](64)

private[this] val timerTask = timer.schedule(ttl / 2) { flush(ttl.ago) }
private[this] val flushTask = timer.schedule(ttl / 2) { flush(ttl.ago) }

/**
* Update the mutable span.
* Update the mutable span for the given TraceId.
*
* This will create a new MutableSpan if one does not exist otherwise the existing
* span will be provided.
*
* If the span is on hold (almost complete), wait a short time for any more locally
* generated annotations before removing from the map and sending to `logSpans`.
* generated annotations before removing from the map and sending to [[logSpans]].
*
* @param f expected to be a "quick" operation. No bitcoin mining, please.
*/
def update(traceId: TraceId)(f: MutableSpan => Unit): Unit = {
val span: MutableSpan = {
val span = spanMap.get(traceId)
if (span != null) {
span
// there are a few "dances" here in order to avoid race conditions.
// avoiding a removal from the map and a logSpans call is important
// or updates written to the MutableSpan may not get logged.
val ms: MutableSpan =
spanMap.computeIfAbsent(traceId, new java.util.function.Function[TraceId, MutableSpan]() {
def apply(t: TraceId): MutableSpan = new MutableSpan(traceId, Time.now)
})

val toFlush: Option[MutableSpan] = ms.synchronized {
if (ms.wasFlushed) {
// it's already been flushed. copy most of it and immediately log it.
val copy = ms.copyForImmediateLogging()
f(copy)
copy.addAnnotation(ZipkinAnnotation(Time.now, "finagle.zipkin.late_arrival", ms.endpoint))
Some(copy)
} else {
val newSpan = new MutableSpan(traceId, Time.now)
val prev = spanMap.putIfAbsent(traceId, newSpan)
if (prev == null) newSpan else prev
f(ms)
if (ms.isOnHold) {
timer.doLater(hold) { complete(traceId, ms) }
}
None
}
}

f(span)

if (span.isOnHold) {
timer.doLater(hold) { complete(traceId, span) }
toFlush match {
case Some(copy) => logSpans(Seq(copy.toSpan))
case None => ()
}
}

/**
* Flush spans created earlier than `now`
* Flush spans created at or before the given `deadline`.
*
* They will be removed and have [[logSpans]] called on them.
*
* @return Future indicating completion.
*/
def flush(deadline: Time): Future[Unit] = {
val ss = new ArrayBuffer[Span](spanMap.size)
val beforeDeadline = new ListBuffer[Span]()

val iter = spanMap.entrySet.iterator
val iter = spanMap.entrySet.iterator()
while (iter.hasNext) {
val kv = iter.next()
val span = kv.getValue
if (span.started <= deadline) {
spanMap.remove(kv.getKey, span)
span.addAnnotation(ZipkinAnnotation(deadline, "finagle.flush", span.endpoint))
ss.append(span.toSpan)
val ms = iter.next().getValue
if (ms.started <= deadline) {
ms.synchronized {
ms.addAnnotation(ZipkinAnnotation(deadline, "finagle.flush", ms.endpoint))
beforeDeadline.append(ms.toSpan)
preventMoreUpdates(ms)
iter.remove()
}
}
}

if (ss.isEmpty) Future.Done
else logSpans(ss)
}

/**
* Remove and log the span.
* @param span
* @return Future indicating done.
*/
def complete(traceId: TraceId, span: MutableSpan): Future[Unit] = {
val removed = spanMap.remove(traceId, span)
if (removed)
logSpans(Seq(span.toSpan))
Future.Done
if (beforeDeadline.isEmpty) Future.Done
else logSpans(beforeDeadline)
}

/**
Expand All @@ -110,68 +112,27 @@ private class DeadlineSpanMap(
*/
def flush(): Future[Unit] =
flush(Time.Top)
}

private final class MutableSpan(val traceId: TraceId, val started: Time) {
private[this] var _name: Option[String] = None
private[this] var _service: Option[String] = None
private[this] var _endpoint: Endpoint = Endpoint.Unknown
private[this] var _onHold: Boolean = false

private[this] val annotations = ArrayBuffer.empty[ZipkinAnnotation]
private[this] val binaryAnnotations = ArrayBuffer.empty[BinaryAnnotation]

def endpoint: Endpoint = synchronized { _endpoint }

def setName(n: String): MutableSpan = synchronized {
_name = Some(n)
this
}

def setServiceName(n: String): MutableSpan = synchronized {
_service = Some(n)
this
}

/**
* Add an annotation to the map.
* Remove and log the span.
*
* The special annotations ClientRecv and ServerSend, at client or server respectively,
* are taken as a hint that the span will complete soon. In this case, we don't set the span
* complete right away because additional annotations may still arrive. We want to try and avoid
* annotations that arrive after the rest of the span has been logged because they are likely to
* get dropped at the collector.
* @return Future indicating done.
*/
def addAnnotation(ann: ZipkinAnnotation): MutableSpan = synchronized {
if (ann.value.equals(Constants.CLIENT_RECV) ||
ann.value.equals(Constants.SERVER_SEND) ||
ann.value.equals(TimeoutFilter.TimeoutAnnotation)) {
_onHold = true
}

annotations.append(ann)
this
}

def addBinaryAnnotation(ann: BinaryAnnotation): MutableSpan = synchronized {
binaryAnnotations.append(ann)
this
}

def setEndpoint(ep: Endpoint): MutableSpan = synchronized {
_endpoint = ep
var idx = 0
while (idx < annotations.size) {
val a = annotations(idx)
if (a.endpoint == Endpoint.Unknown) annotations(idx) = a.copy(endpoint = ep)
idx += 1
def complete(traceId: TraceId, ms: MutableSpan): Future[Unit] = {
val logIt: Boolean = ms.synchronized {
val removed = spanMap.remove(traceId, ms)
if (removed) {
preventMoreUpdates(ms)
true
} else {
false
}
}
this
if (logIt) logSpans(Seq(ms.toSpan))
else Future.Done
}

def toSpan: Span = synchronized {
Span(traceId, _service, _name, annotations, binaryAnnotations, _endpoint, started)
}
private[this] def preventMoreUpdates(ms: MutableSpan): Unit =
ms.flush()

def isOnHold: Boolean = synchronized { _onHold }
}
@@ -0,0 +1,115 @@
package com.twitter.finagle.zipkin.core

import com.twitter.finagle.service.TimeoutFilter
import com.twitter.finagle.thrift.thrift.Constants
import com.twitter.finagle.tracing.TraceId
import com.twitter.util.Time
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer

/**
* Note that with regards to thread-safety, [[DeadlineSpanMap]] effectively
* "owns" this class and also participates in synchronizing on the instance itself.
*
* @see [[DeadlineSpanMap]]
*/
private final class MutableSpan(val traceId: TraceId, val started: Time) {
private[this] val flushed = new AtomicBoolean(false)

// thread-safety for all mutable state is handled by synchronizing on `this`.
private[this] var _name: Option[String] = None
private[this] var _service: Option[String] = None
private[this] var _endpoint: Endpoint = Endpoint.Unknown
private[this] var _onHold: Boolean = false

private[this] val annotations = ArrayBuffer.empty[ZipkinAnnotation]
private[this] val binaryAnnotations = ArrayBuffer.empty[BinaryAnnotation]

/**
* Mark this as having been [[DeadlineSpanMap.flush()]]-ed.
* No further updates will be included when logged.
*
* @see [[copyForImmediateLogging()]]
*/
def flush(): Unit = flushed.set(true)

/**
* Whether or not this has been flushed.
*/
def wasFlushed: Boolean = flushed.get

/**
* Used for races during [[DeadlineSpanMap.update]]. Creates a copy
* of most of the fields, but does not include annotations, binary annotations,
* or onHold.
*/
def copyForImmediateLogging(): MutableSpan = synchronized {
val ms = new MutableSpan(traceId, started)
ms.setEndpoint(endpoint)
_service match {
case Some(s) => ms.setServiceName(s)
case _ => // no-op
}
_name match {
case Some(n) => ms.setName(n)
case _ => // no-op
}
// do not copy onHold since we are about to log it.
// do not copy annotations or binaryAnnotations since they would already be logged.
ms
}

def endpoint: Endpoint = synchronized { _endpoint }

def setName(n: String): MutableSpan = synchronized {
_name = Some(n)
this
}

def setServiceName(n: String): MutableSpan = synchronized {
_service = Some(n)
this
}

/**
* Add an annotation to the map.
*
* The special annotations ClientRecv and ServerSend, at client or server respectively,
* are taken as a hint that the span will complete soon. In this case, we don't set the span
* complete right away because additional annotations may still arrive. We want to try and avoid
* annotations that arrive after the rest of the span has been logged because they are likely to
* get dropped at the collector.
*/
def addAnnotation(ann: ZipkinAnnotation): MutableSpan = synchronized {
if (ann.value.equals(Constants.CLIENT_RECV) ||
ann.value.equals(Constants.SERVER_SEND) ||
ann.value.equals(TimeoutFilter.TimeoutAnnotation)) {
_onHold = true
}

annotations.append(ann)
this
}

def addBinaryAnnotation(ann: BinaryAnnotation): MutableSpan = synchronized {
binaryAnnotations.append(ann)
this
}

def setEndpoint(ep: Endpoint): MutableSpan = synchronized {
_endpoint = ep
var idx = 0
while (idx < annotations.size) {
val a = annotations(idx)
if (a.endpoint == Endpoint.Unknown) annotations(idx) = a.copy(endpoint = ep)
idx += 1
}
this
}

def toSpan: Span = synchronized {
Span(traceId, _service, _name, annotations.toList, binaryAnnotations.toList, _endpoint, started)
}

def isOnHold: Boolean = synchronized { _onHold }
}
Expand Up @@ -27,7 +27,7 @@ abstract class RawZipkinTracer(statsReceiver: StatsReceiver, timer: Timer = Defa

// this sends off spans after the deadline is hit, no matter if it ended naturally or not.
private[this] val spanMap: DeadlineSpanMap =
new DeadlineSpanMap(sendSpans, 120.seconds, statsReceiver, timer)
new DeadlineSpanMap(sendSpans, 120.seconds, timer)

protected[core] def flush(): Future[Unit] = spanMap.flush()

Expand Down

0 comments on commit 53901a2

Please sign in to comment.