Skip to content

Commit

Permalink
Merge ae7f570 into 1369d37
Browse files Browse the repository at this point in the history
  • Loading branch information
shlushchanka committed Dec 8, 2016
2 parents 1369d37 + ae7f570 commit 7316f62
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,37 @@ import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller

import com.github.levkhomich.akka.tracing._

import scala.concurrent.ExecutionContext

trait BaseTracingDirectives {

/**
* Completes the request using the given function. The input to the function is
* produced with the in-scope entity unmarshaller and the result value of the
* function is marshalled with the in-scope marshaller. Unmarshalled entity is
* sampled for tracing and can be used thereafter to add trace annotations.
* RPC name is set to unmarshalled entity simple class name.
* After marshalling step, trace is automatically closed and sent to collector service.
* tracedHandleWith can be a convenient method combining entity with complete.
*
* @param service service name to be added to trace
*/
def tracedHandleWith[A <: TracingSupport, B](service: String)(f: A => B)
(implicit um: FromRequestUnmarshaller[A],
m: ToResponseMarshaller[B],
ec: ExecutionContext): Route = {
tracedEntity(service)(um).tapply {
case Tuple1(ts) =>
StandardRoute { ctx =>
val completeFut = ctx.complete(ToResponseMarshallable(f(ts)))
completeFut onComplete {
_ => trace.record(ts, TracingAnnotations.ServerSend.text)
}
completeFut
}
}
}

protected def trace: TracingExtensionImpl

private[this] def tracedEntity[T <: TracingSupport](service: String)(implicit um: FromRequestUnmarshaller[T]): Directive1[T] =
Expand All @@ -48,25 +77,6 @@ trait BaseTracingDirectives {
}
}

/**
* Completes the request using the given function. The input to the function is
* produced with the in-scope entity unmarshaller and the result value of the
* function is marshalled with the in-scope marshaller. Unmarshalled entity is
* sampled for tracing and can be used thereafter to add trace annotations.
* RPC name is set to unmarshalled entity simple class name.
* After marshalling step, trace is automatically closed and sent to collector service.
* tracedHandleWith can be a convenient method combining entity with complete.
*
* @param service service name to be added to trace
*/
def tracedHandleWith[A <: TracingSupport, B](service: String)(f: A => B)(implicit um: FromRequestUnmarshaller[A], m: ToResponseMarshaller[B]): Route =
tracedEntity(service)(um).tapply {
case Tuple1(ts) =>
StandardRoute { ctx =>
ctx.complete(ToResponseMarshallable(f(ts))(traceServerSend(ts.tracingId)))
}
}

private[this] def addHttpAnnotations(tracingId: Long, request: HttpRequest): Unit = {
@inline def recordKeyValue(key: String, value: String): Unit =
trace.addBinaryAnnotation(tracingId, key, ByteBuffer.wrap(value.getBytes), thrift.AnnotationType.STRING)
Expand All @@ -85,12 +95,6 @@ trait BaseTracingDirectives {
}
}

private[this] def traceServerSend[T](tracingId: Long)(implicit m: ToResponseMarshaller[T]): ToResponseMarshaller[T] =
m.compose { v =>
trace.addAnnotation(tracingId, TracingAnnotations.ServerSend.text)
v
}

}

trait TracingDirectives extends BaseTracingDirectives { this: Actor with ActorTracing =>
Expand All @@ -105,7 +109,9 @@ trait TracingDirectives extends BaseTracingDirectives { this: Actor with ActorTr
* and sent to collector service. tracedHandleWith can be a convenient method
* combining entity with complete.
*/
def tracedHandleWith[A <: TracingSupport, B](f: A => B)(implicit um: FromRequestUnmarshaller[A], m: ToResponseMarshaller[B]): Route =
def tracedHandleWith[A <: TracingSupport, B](f: A => B)(implicit um: FromRequestUnmarshaller[A],
m: ToResponseMarshaller[B],
executionContext: ExecutionContext): Route =
tracedHandleWith(self.path.name)(f)

}
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package com.github.levkhomich.akka.tracing.http

import com.typesafe.config.Config

import scala.concurrent.Future
import scala.util.Random
import java.util.concurrent.TimeUnit

import akka.http.scaladsl.model.headers.{ `Content-Encoding`, HttpEncodings, RawHeader }
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{ HttpEncodings, RawHeader, `Content-Encoding` }
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.RejectionHandler
import akka.http.scaladsl.unmarshalling.Unmarshaller
import com.github.levkhomich.akka.tracing._
import com.typesafe.config.Config
import org.specs2.matcher.MatchResult
import org.specs2.mutable.Specification

import com.github.levkhomich.akka.tracing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import scala.util.Random

class TracingDirectivesSpec extends Specification with TracingTestCommons
with BaseTracingDirectives with MockCollector with Specs2FrameworkInterface {
Expand All @@ -31,6 +32,30 @@ class TracingDirectivesSpec extends Specification with TracingTestCommons
}
}

"send trace to server only after response future has completed" in {
val traceKey = "traced-after-future-completed"
val traceValue = "OK"
val tracedHandledWithResponseFuture =
handleRejections(RejectionHandler.default) {
get {
tracedHandleWith(serviceName) { r: TestMessage =>
val computationPromise = Promise[HttpResponse]
system.scheduler.scheduleOnce(delay = FiniteDuration(10, TimeUnit.MILLISECONDS)) {
trace.recordKeyValue(r, traceKey, traceValue)
computationPromise.success(HttpResponse(StatusCodes.OK))
}
computationPromise.future
}
}
}

Get(testPath) ~> tracedHandledWithResponseFuture ~> check {
response.status mustEqual StatusCodes.OK
val span = receiveSpan()
checkBinaryAnnotation(span, traceKey, traceValue)
}
}

"annotate sampled requests (general)" in {
Get(testPath) ~> tracedHandleWithRoute ~> check {
response.status mustEqual StatusCodes.OK
Expand Down

0 comments on commit 7316f62

Please sign in to comment.