Skip to content

Commit

Permalink
move span submission routines into separate actor (possible fix for #47
Browse files Browse the repository at this point in the history
…, #55)
  • Loading branch information
levkhomich committed Dec 28, 2014
1 parent 9c8eff8 commit a4a3527
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }

import akka.actor._
import com.github.levkhomich.akka.tracing.actor.SpanHolder
import org.apache.thrift.transport.{ TSocket, TFramedTransport }

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,25 @@
* limitations under the License.
*/

package com.github.levkhomich.akka.tracing
package com.github.levkhomich.akka.tracing.actor

import java.net.{ SocketException, NoRouteToHostException, ConnectException, InetAddress }
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util
import javax.xml.bind.DatatypeConverter

import akka.actor.{ Props, Actor, ActorLogging, Cancellable }
import com.github.levkhomich.akka.tracing.{ BaseTracingSupport, thrift }
import org.apache.thrift.transport.TTransport

import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.{ Success, Try }
import scala.util.control.ControlThrowable

import akka.actor.{ Actor, ActorLogging, Cancellable }
import org.apache.thrift.TApplicationException
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{ TTransportException, TTransport }

import com.github.levkhomich.akka.tracing.thrift.TReusableTransport

private[tracing] object SpanHolder {
private final case class Enqueue(spanId: Long, cancelJob: Boolean)

final case class Sample(ts: BaseTracingSupport, serviceName: String, rpcName: String, timestamp: Long)
final case class Receive(ts: BaseTracingSupport, serviceName: String, rpcName: String, timestamp: Long)
final case class Enqueue(spanId: Long, cancelJob: Boolean)
case object SendEnqueued
final case class AddAnnotation(spanId: Long, timestamp: Long, msg: String)
final case class AddBinaryAnnotation(spanId: Long, key: String, value: ByteBuffer, valueType: thrift.AnnotationType)
final case class CreateChildSpan(spanId: Long, parentId: Long, optTraceId: Option[Long], spanName: String)
Expand All @@ -49,33 +43,21 @@ private[tracing] object SpanHolder {
*/
private[tracing] class SpanHolder(transport: TTransport) extends Actor with ActorLogging {

import SpanHolder._
import com.github.levkhomich.akka.tracing.actor.SpanHolder._

private[this] val submitter = context.system.actorOf(Props(classOf[SpanSubmitter], transport))

// map of spanId -> span for uncompleted traces
private[this] val spans = mutable.Map[Long, thrift.Span]()
// scheduler jobs which send incomplete traces by timeout
private[this] val sendJobs = mutable.Map[Long, Cancellable]()
// next submission batch
private[this] val nextBatch = mutable.UnrolledBuffer[thrift.Span]()

// buffer for submitted spans, which should be resent in case of connectivity problems
private[this] var submittedSpans: mutable.Buffer[thrift.LogEntry] = mutable.Buffer.empty
// buffer's size limit
private[this] val maxSubmissionBufferSize = 1000

private[this] val protocolFactory = new TBinaryProtocol.Factory()
private[this] val thriftBuffer = new TReusableTransport()

private[this] val endpoints = mutable.Map[Long, thrift.Endpoint]()
private[this] val localAddress = ByteBuffer.wrap(InetAddress.getLocalHost.getAddress).getInt
private[this] val unknownEndpoint = new thrift.Endpoint(localAddress, 0, "unknown")

private[this] val microTimeAdjustment = System.currentTimeMillis * 1000 - System.nanoTime / 1000

private[this] val client = new thrift.Scribe.Client(new TBinaryProtocol(transport))

scheduleNextBatch()

override def receive: Receive = {
case Sample(ts, serviceName, rpcName, timestamp) =>
lookup(ts.$spanId) match {
Expand All @@ -99,9 +81,6 @@ private[tracing] class SpanHolder(transport: TTransport) extends Actor with Acto
case Enqueue(spanId, cancelJob) =>
enqueue(spanId, cancelJob)

case SendEnqueued =>
send()

case AddAnnotation(spanId, timestamp, msg) =>
lookup(spanId) foreach { spanInt =>
val a = new thrift.Annotation(adjustedMicroTime(timestamp), msg)
Expand All @@ -127,24 +106,9 @@ private[tracing] class SpanHolder(transport: TTransport) extends Actor with Acto
}

override def postStop(): Unit = {
import scala.collection.JavaConversions._
// we don't want to resend at this point
submittedSpans.clear()
spans.keys.foreach(id =>
enqueue(id, cancelJob = true)
)
if (!nextBatch.isEmpty) {
Try {
client.Log(nextBatch.map(spanToLogEntry))
if (transport.isOpen) {
transport.close()
}
} recover {
case e =>
handleSubmissionError(e)
log.error(s"Zipkin collector is unavailable. Failed to send ${nextBatch.size} spans during postStop.")
}
}
super.postStop()
}

Expand All @@ -165,62 +129,12 @@ private[tracing] class SpanHolder(transport: TTransport) extends Actor with Acto

private[this] def enqueue(id: Long, cancelJob: Boolean): Unit = {
sendJobs.remove(id).foreach(job => if (cancelJob) job.cancel())
spans.remove(id).foreach(span => nextBatch.append(span))
spans.remove(id).foreach(submitter ! SpanSubmitter.Enqueue(_))
endpoints.remove(id)
}

private[this] def send(): Unit = {
import scala.collection.JavaConversions._
if (!nextBatch.isEmpty) {
submittedSpans ++= nextBatch.map(spanToLogEntry)
nextBatch.clear()
}
if (!submittedSpans.isEmpty) {
Future {
if (!transport.isOpen) {
transport.open()
TracingExtension(context.system).markCollectorAsAvailable()
log.warning("Successfully connected to Zipkin collector.")
}
client.Log(submittedSpans)
} recover {
case e =>
handleSubmissionError(e)
// reconnect next time
transport.close()
thrift.ResultCode.TRY_LATER
} onComplete {
case Success(thrift.ResultCode.OK) =>
submittedSpans.clear()
scheduleNextBatch()
case _ =>
log.warning(s"Zipkin collector unavailable. Failed to send ${submittedSpans.size} spans.")
limitSubmittedSpansSize()
scheduleNextBatch()
}
} else {
scheduleNextBatch()
}
}

private[this] def limitSubmittedSpansSize(): Unit = {
val delta = submittedSpans.size - maxSubmissionBufferSize
if (delta > 0) {
log.error(s"Dropping $delta spans because of maxSubmissionBufferSize policy.")
submittedSpans = submittedSpans.takeRight(maxSubmissionBufferSize)
}
}

private[this] def spanToLogEntry(spanInt: thrift.Span): thrift.LogEntry = {
spanInt.write(protocolFactory.getProtocol(thriftBuffer))
val thriftBytes = thriftBuffer.getArray.take(thriftBuffer.length)
thriftBuffer.reset()
val encodedSpan = DatatypeConverter.printBase64Binary(thriftBytes) + '\n'
new thrift.LogEntry("zipkin", encodedSpan)
}

private[this] def endpointFor(spanId: Long): thrift.Endpoint =
endpoints.get(spanId).getOrElse(unknownEndpoint)
endpoints.getOrElse(spanId, unknownEndpoint)

private[this] def recvAnnotationList(nanoTime: Long, endpont: thrift.Endpoint): util.ArrayList[thrift.Annotation] = {
val serverRecvAnn = new thrift.Annotation(adjustedMicroTime(nanoTime), thrift.zipkinConstants.SERVER_RECV)
Expand All @@ -230,36 +144,4 @@ private[tracing] class SpanHolder(transport: TTransport) extends Actor with Acto
annotations
}

private[this] def handleSubmissionError(e: Throwable): Unit =
e match {
case te: TTransportException =>
te.getCause match {
case null =>
log.error("Thrift transport error: " + te.getMessage)
case e: ConnectException =>
log.error("Can't connect to Zipkin: " + e.getMessage)
case e: NoRouteToHostException =>
log.error("No route to Zipkin: " + e.getMessage)
case e: SocketException =>
log.error("Socket error: " + TracingExtension.getStackTrace(e))
case t: Throwable =>
log.error("Unknown transport error: " + TracingExtension.getStackTrace(t))
}
TracingExtension(context.system).markCollectorAsUnavailable()
case t: TApplicationException =>
log.error("Thrift client error: " + t.getMessage)
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
log.error("Oh, look! We have an unknown error here: " + TracingExtension.getStackTrace(t))
}

private[this] def scheduleNextBatch(): Unit =
if (TracingExtension(context.system).isEnabled) {
context.system.scheduler.scheduleOnce(2.seconds, self, SendEnqueued)
} else {
log.error("Trying to reconnect in 10 seconds")
context.system.scheduler.scheduleOnce(10.seconds, self, SendEnqueued)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/**
* Copyright 2014 the Akka Tracing contributors. See AUTHORS for more details.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.levkhomich.akka.tracing.actor

import java.net.{ ConnectException, NoRouteToHostException, SocketException }
import javax.xml.bind.DatatypeConverter
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.util.control.ControlThrowable
import scala.util.{ Success, Try }

import akka.actor.{ Actor, ActorLogging }
import org.apache.thrift.TApplicationException
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{ TTransport, TTransportException }

import com.github.levkhomich.akka.tracing.thrift.TReusableTransport
import com.github.levkhomich.akka.tracing.{ TracingExtension, thrift }

private[actor] object SpanSubmitter {
final case class Enqueue(span: thrift.Span)
private case object SendEnqueued
}

/**
* Internal API
*/
private[actor] class SpanSubmitter(transport: TTransport) extends Actor with ActorLogging {

import SpanSubmitter._

// next submission batch
private[this] val nextBatch = mutable.UnrolledBuffer[thrift.Span]()

// buffer for submitted spans, which should be resent in case of connectivity problems
private[this] var submittedSpans: mutable.Buffer[thrift.LogEntry] = mutable.Buffer.empty
// buffer's size limit
private[this] val maxSubmissionBufferSize = 1000

private[this] val protocolFactory = new TBinaryProtocol.Factory()
private[this] val thriftBuffer = new TReusableTransport()

private[this] val client = new thrift.Scribe.Client(new TBinaryProtocol(transport))

scheduleNextBatch()

override def receive: Receive = {
case Enqueue(span) =>
nextBatch.append(span)

case SendEnqueued =>
send()
}

override def postStop(): Unit = {
import scala.collection.JavaConversions._
// we don't want to resend at this point
submittedSpans.clear()
if (!nextBatch.isEmpty) {
Try {
client.Log(nextBatch.map(spanToLogEntry))
if (transport.isOpen) {
transport.close()
}
} recover {
case e =>
handleSubmissionError(e)
log.error(s"Zipkin collector is unavailable. Failed to send ${nextBatch.size} spans during postStop.")
}
}
super.postStop()
}

private[this] def send(): Unit = {
import scala.collection.JavaConversions._
if (!nextBatch.isEmpty) {
submittedSpans ++= nextBatch.map(spanToLogEntry)
nextBatch.clear()
}
if (!submittedSpans.isEmpty) {
Try {
if (!transport.isOpen) {
transport.open()
TracingExtension(context.system).markCollectorAsAvailable()
log.warning("Successfully connected to Zipkin collector.")
}
client.Log(submittedSpans)
} recover {
case e =>
handleSubmissionError(e)
// reconnect next time
transport.close()
thrift.ResultCode.TRY_LATER
} match {
case Success(thrift.ResultCode.OK) =>
submittedSpans.clear()
case _ =>
log.warning(s"Zipkin collector unavailable. Failed to send ${submittedSpans.size} spans.")
limitSubmittedSpansSize()
}
}
scheduleNextBatch()
}

private[this] def limitSubmittedSpansSize(): Unit = {
val delta = submittedSpans.size - maxSubmissionBufferSize
if (delta > 0) {
log.error(s"Dropping $delta spans because of maxSubmissionBufferSize policy.")
submittedSpans = submittedSpans.takeRight(maxSubmissionBufferSize)
}
}

private[this] def spanToLogEntry(spanInt: thrift.Span): thrift.LogEntry = {
spanInt.write(protocolFactory.getProtocol(thriftBuffer))
val thriftBytes = thriftBuffer.getArray.take(thriftBuffer.length)
thriftBuffer.reset()
val encodedSpan = DatatypeConverter.printBase64Binary(thriftBytes) + '\n'
new thrift.LogEntry("zipkin", encodedSpan)
}

private[this] def handleSubmissionError(e: Throwable): Unit =
e match {
case te: TTransportException =>
te.getCause match {
case null =>
log.error("Thrift transport error: " + te.getMessage)
case e: ConnectException =>
log.error("Can't connect to Zipkin: " + e.getMessage)
case e: NoRouteToHostException =>
log.error("No route to Zipkin: " + e.getMessage)
case e: SocketException =>
log.error("Socket error: " + e.getMessage)
case t: Throwable =>
log.error("Unknown transport error: " + TracingExtension.getStackTrace(t))
}
TracingExtension(context.system).markCollectorAsUnavailable()
case t: TApplicationException =>
log.error("Thrift client error: " + t.getMessage)
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
log.error("Oh, look! We have an unknown error here: " + TracingExtension.getStackTrace(t))
}

private[this] def scheduleNextBatch(): Unit = {
import context.dispatcher
if (TracingExtension(context.system).isEnabled) {
context.system.scheduler.scheduleOnce(2.seconds, self, SendEnqueued)
} else {
log.error("Trying to reconnect in 10 seconds")
context.system.scheduler.scheduleOnce(10.seconds, self, SendEnqueued)
}
}

}

0 comments on commit a4a3527

Please sign in to comment.