Skip to content

Commit

Permalink
Merge pull request #70 from drpacman/downstream-sample-pull
Browse files Browse the repository at this point in the history
Ensure sampling decisions are honoured across services
  • Loading branch information
levkhomich committed May 1, 2016
2 parents fb4a6a1 + 5b7714f commit 2d2e3eb
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ trait TracingSettings extends GlobalSettings with PlayControllerTracing {
lazy val excludedHeaders = Set.empty[String]

protected def sample(request: RequestHeader): Unit = {
trace.sample(request, serviceName)
val upstreamSampling = request.headers.get(TracingHeaders.Sampled).map(_.toLowerCase)
upstreamSampling match {
case Some("0") | Some("false") =>
// do not sample
case Some("1") | Some("true") =>
trace.sample(request, serviceName, true)
case Some(_) | None =>
trace.sample(request, serviceName)
}
}

protected def addHttpAnnotations(request: RequestHeader): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class PlayTracingSpec extends PlaySpecification with TracingTestCommons with Moc
additionalConfiguration = configuration
)

def disabledLocalSamplingApplication: FakeApplication = FakeApplication(
withRoutes = routes,
withGlobal = Some(new GlobalSettings with TracingSettings),
additionalConfiguration = configuration ++ Map(TracingExtension.AkkaTracingSampleRate -> 0)
)

"Play tracing" should {
"sample requests" in new WithApplication(fakeApplication) {
val result = route(FakeRequest("GET", TestPath)).map(Await.result(_, defaultAwaitTimeout.duration))
Expand Down Expand Up @@ -155,6 +161,41 @@ class PlayTracingSpec extends PlaySpecification with TracingTestCommons with Moc
checkAnnotation(span, TracingExtension.getStackTrace(npe))
}

Seq("0", "false").foreach { value =>
s"ensure upstream 'do not sample' decision is honoured for sampled value of $value" in new WithApplication(fakeApplication) {

val spanId = Random.nextLong
val result = route(FakeRequest("GET", TestPath + "?key=value",
FakeHeaders(Seq(
TracingHeaders.TraceId -> Seq(SpanMetadata.idToString(spanId)),
TracingHeaders.Sampled -> Seq("false")
)), AnyContentAsEmpty)).map(Await.result(_, defaultAwaitTimeout.duration))
expectSpans(0)
}
}

Seq("1", "true").foreach { value =>
s"ensure upstream 'do sample' decision is honoured for sampled value $value" in new WithApplication(disabledLocalSamplingApplication) {
val spanId = Random.nextLong
val result = route(FakeRequest("GET", TestPath + "?key=value",
FakeHeaders(Seq(
TracingHeaders.TraceId -> Seq(SpanMetadata.idToString(spanId)),
TracingHeaders.Sampled -> Seq(value)
)), AnyContentAsEmpty)).map(Await.result(_, defaultAwaitTimeout.duration))
expectSpans(1)
}
}

"ensure local sampling decision is honoured if invalid sampled valued is supplied" in new WithApplication(fakeApplication) {
val spanId = Random.nextLong
val result = route(FakeRequest("GET", TestPath + "?key=value",
FakeHeaders(Seq(
TracingHeaders.TraceId -> Seq(SpanMetadata.idToString(spanId)),
TracingHeaders.Sampled -> Seq("unexpected value")
)), AnyContentAsEmpty)).map(Await.result(_, defaultAwaitTimeout.duration))
expectSpans(1)
}

}

step {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,27 @@ trait TracedSprayPipeline {

def tracedPipeline[T](parent: BaseTracingSupport): SendReceive = {
val clientRequest = new TracingSupport {}
trace.createChild(clientRequest, parent).map(metadata =>
addHeaders(List(
HttpHeaders.RawHeader(TracingHeaders.TraceId, SpanMetadata.idToString(metadata.traceId)),
HttpHeaders.RawHeader(TracingHeaders.SpanId, SpanMetadata.idToString(metadata.spanId)),
HttpHeaders.RawHeader(TracingHeaders.ParentSpanId, SpanMetadata.idToString(metadata.parentId.get)),
HttpHeaders.RawHeader(TracingHeaders.Sampled, "true")
)) ~>
startTrace(clientRequest) ~>
sendAndReceive ~>
completeTrace(clientRequest)).getOrElse(sendAndReceive)
trace.createChild(clientRequest, parent).map(tracedRequest(clientRequest, _)).getOrElse(untracedRequest(parent.tracingId))
}

private[this] def untracedRequest(traceId: Long): SendReceive = {
addHeaders(List(
HttpHeaders.RawHeader(TracingHeaders.TraceId, SpanMetadata.idToString(traceId)),
HttpHeaders.RawHeader(TracingHeaders.Sampled, "false")
)) ~>
sendAndReceive
}

private[this] def tracedRequest(clientRequest: BaseTracingSupport, metadata: SpanMetadata): SendReceive = {
addHeaders(List(
HttpHeaders.RawHeader(TracingHeaders.TraceId, SpanMetadata.idToString(metadata.traceId)),
HttpHeaders.RawHeader(TracingHeaders.SpanId, SpanMetadata.idToString(metadata.spanId)),
HttpHeaders.RawHeader(TracingHeaders.ParentSpanId, SpanMetadata.idToString(metadata.parentId.get)),
HttpHeaders.RawHeader(TracingHeaders.Sampled, "true")
)) ~>
startTrace(clientRequest) ~>
sendAndReceive ~>
completeTrace(clientRequest)
}

def startTrace(ts: BaseTracingSupport)(request: HttpRequest): HttpRequest = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.github.levkhomich.akka.tracing.http

import scala.concurrent.Future
import scala.concurrent.{ Promise, Future }

import org.specs2.matcher.FutureMatchers
import org.specs2.mutable.Specification
import org.specs2.matcher.MatchResult
import spray.client.pipelining._
import spray.http._

Expand Down Expand Up @@ -43,6 +44,43 @@ class TracedSprayPipelineSpec extends Specification with FutureMatchers with Tra
childSpan.parent_id mustEqual parentSpan.id
childSpan.id mustNotEqual parentSpan.id
}

"indicate to downstream service trace is not sampled if we are not sampling" in {
val result = Promise[MatchResult[_]]()
val parent = nextRandomMessage
val testPipeline = new TracedSprayPipeline {
val system = self.system
override def sendAndReceive = {
case req: HttpRequest =>
result.trySuccess(
(req.headers.find(_.name == TracingHeaders.Sampled).map(_.value) must beSome("false")) and
(req.headers.find(_.name == TracingHeaders.TraceId).map(_.value) must not beNull)
)
Future.successful(HttpResponse(StatusCodes.OK, bodyEntity))
}
}
testPipeline.tracedPipeline[String](parent)(Get("http://test.com"))
result.future.await
}

"indicate to downstream service that we are sampled along with the trace id if we are sampling" in {
val result = Promise[MatchResult[_]]()
val parent = nextRandomMessage
trace.sample(parent, "test trace", force = true)
val testPipeline = new TracedSprayPipeline {
val system = self.system
override def sendAndReceive = {
case req: HttpRequest =>
result.trySuccess(
(req.headers.find(_.name == TracingHeaders.Sampled).map(_.value) must beSome("true")) and
(req.headers.find(_.name == TracingHeaders.TraceId).map(_.value) must not beNull)
)
Future.successful(HttpResponse(StatusCodes.OK, bodyEntity))
}
}
testPipeline.tracedPipeline[String](parent)(Get("http://test.com"))
result.future.await
}
}

step {
Expand Down

0 comments on commit 2d2e3eb

Please sign in to comment.