Skip to content
This repository has been archived by the owner on Mar 29, 2020. It is now read-only.

Commit

Permalink
Finish the span only when the entity is completely drained
Browse files Browse the repository at this point in the history
Also, make the 404 operation re-name configurable
  • Loading branch information
Falmarri committed Jul 19, 2018
1 parent 44f3534 commit 7c9c75d
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 29 deletions.
4 changes: 3 additions & 1 deletion kamon-akka-http/src/main/resources/reference.conf
Expand Up @@ -17,9 +17,11 @@ kamon.akka-http {
# Add http status codes as metric tags. The default value is false
add-http-status-code-as-metric-tag = false

not-found-operation-name = "unhandled"

modules {
kamon-akka-http {
requires-aspectj = yes
}
}
}
}
16 changes: 14 additions & 2 deletions kamon-akka-http/src/main/scala/kamon/akka/http/AkkaHttp.scala
Expand Up @@ -17,7 +17,7 @@
package kamon.akka.http

import akka.actor.ReflectiveDynamicAccess
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.headers.Host
import com.typesafe.config.Config
import kamon.{Kamon, OnReconfigureHook}
Expand All @@ -33,12 +33,19 @@ object AkkaHttp {
def clientOperationName(request: HttpRequest): String =
nameGenerator.clientOperationName(request)

def serverNotFoundOperationName(response: HttpResponse): Option[String] =
nameGenerator.serverNotFoundOperationName(response)

trait OperationNameGenerator {
def serverOperationName(request: HttpRequest): String
def clientOperationName(request: HttpRequest): String

def serverNotFoundOperationName(request: HttpResponse): Option[String] = Some(defaultNotFoundOperationName)
}


@volatile private var defaultNotFoundOperationName = defaultNotFoundOperationNameFromConfig(Kamon.config)

private def defaultOperationNameGenerator(): OperationNameGenerator = new OperationNameGenerator {

def clientOperationName(request: HttpRequest): String = {
Expand All @@ -53,6 +60,10 @@ object AkkaHttp {
request.header[Host].map(_.host.toString())
}

private def defaultNotFoundOperationNameFromConfig(config: Config): String = {
config.getString("kamon.akka-http.not-found-operation-name")
}

private def nameGeneratorFromConfig(config: Config): OperationNameGenerator = {
val nameGeneratorFQN = config.getString("kamon.akka-http.name-generator")
if(nameGeneratorFQN == "default") defaultOperationNameGenerator() else {
Expand All @@ -64,12 +75,13 @@ object AkkaHttp {
@volatile var addHttpStatusCodeAsMetricTag: Boolean = addHttpStatusCodeAsMetricTagFromConfig(Kamon.config())

private def addHttpStatusCodeAsMetricTagFromConfig(config: Config): Boolean =
Kamon.config.getBoolean("kamon.akka-http.add-http-status-code-as-metric-tag")
config.getBoolean("kamon.akka-http.add-http-status-code-as-metric-tag")

Kamon.onReconfigure(new OnReconfigureHook {
override def onReconfigure(newConfig: Config): Unit = {
nameGenerator = nameGeneratorFromConfig(newConfig)
addHttpStatusCodeAsMetricTag = addHttpStatusCodeAsMetricTagFromConfig(newConfig)
defaultNotFoundOperationName = defaultNotFoundOperationNameFromConfig(newConfig)
}
})
}
Expand Up @@ -20,14 +20,16 @@ import akka.NotUsed
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream._
import akka.stream.scaladsl.{BidiFlow, Flow}
import akka.stream.scaladsl.{BidiFlow, Flow, Keep}
import akka.stream.stage._
import akka.util.ByteString
import kamon.Kamon
import kamon.context.{Context => KamonContext}
import kamon.akka.http.{AkkaHttp, AkkaHttpMetrics}
import kamon.context.TextMap
import kamon.context.{TextMap, Context => KamonContext}
import kamon.trace.Span

import scala.util.{Failure, Success}

/**
* Wraps an {@code Flow[HttpRequest,HttpResponse]} with the necessary steps to output
* the http metrics defined in AkkaHttpMetrics.
Expand Down Expand Up @@ -89,15 +91,45 @@ object ServerFlowWrapper {
}

if(status == 404)
span.setOperationName("unhandled")
serverNotFoundOperationName(response).foreach(o => span.setOperationName(o))

if(status >= 500 && status <= 599)
span.addError(response.status.reason())

activeRequests.decrement()
span.finish()

push(responseOut, includeTraceToken(response, Kamon.currentContext()))
span.mark("response-ready")

val resp = includeTraceToken(
response,
Kamon.currentContext()
)

push(
responseOut,
if (!resp.entity.isKnownEmpty()) {
resp.transformEntityDataBytes(
Flow[ByteString]
.watchTermination()(Keep.right)
.mapMaterializedValue { f =>
f.andThen {
case Success(_) =>
activeRequests.decrement()
span.finish()
case Failure(e) =>
span.addError("Response entity stream failed", e)
activeRequests.decrement()
span.finish()

}(materializer.executionContext)
}
)

} else {
activeRequests.decrement()
span.finish()
resp
}

)
}
override def onUpstreamFinish(): Unit = completeStage()
})
Expand All @@ -115,12 +147,9 @@ object ServerFlowWrapper {
def apply(flow: Flow[HttpRequest, HttpResponse, NotUsed], interface: String, port: Int): Flow[HttpRequest, HttpResponse, NotUsed] =
BidiFlow.fromGraph(wrap(interface, port)).join(flow)

private def includeTraceToken(response: HttpResponse, context: KamonContext): HttpResponse = response match {
case response: HttpResponse response.withHeaders(
private def includeTraceToken(response: HttpResponse, context: KamonContext): HttpResponse = response.withHeaders(
response.headers ++ Kamon.contextCodec().HttpHeaders.encode(context).values.map(k => RawHeader(k._1, k._2))
)
case other other
}

private def extractContext(request: HttpRequest) = Kamon.contextCodec().HttpHeaders.decode(new TextMap {
private val headersKeyValueMap = request.headers.map(h => h.name -> h.value()).toMap
Expand Down
Expand Up @@ -51,7 +51,7 @@ class AkkaHttpClientTracingSpec extends WordSpecLike with Matchers with BeforeAn
"the Akka HTTP client instrumentation" should {
"create a client Span when using the request level API - Http().singleRequest(...)" in {
val target = s"http://$interface:$port/$dummyPathOk"
Http().singleRequest(HttpRequest(uri = target))
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())

eventually(timeout(10 seconds)) {
val span = reporter.nextSpan().value
Expand All @@ -68,7 +68,7 @@ class AkkaHttpClientTracingSpec extends WordSpecLike with Matchers with BeforeAn
"pick up customizations from the SpanCustomizer in context" in {
val target = s"http://$interface:$port/$dummyPathOk"
Kamon.withContextKey(SpanCustomizer.ContextKey, SpanCustomizer.forOperationName("get-dummy-path")) {
Http().singleRequest(HttpRequest(uri = target))
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())
}

eventually(timeout(10 seconds)) {
Expand Down Expand Up @@ -107,7 +107,7 @@ class AkkaHttpClientTracingSpec extends WordSpecLike with Matchers with BeforeAn

"mark Spans as errors if the client request failed" in {
val target = s"http://$interface:$port/$dummyPathError"
Http().singleRequest(HttpRequest(uri = target))
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())

eventually(timeout(10 seconds)) {
val span = reporter.nextSpan().value
Expand Down
Expand Up @@ -71,6 +71,10 @@ class AkkaHttpServerMetricsSpec extends WordSpecLike with Matchers with BeforeAn
val connectionSettings = ClientConnectionSettings(system).withIdleTimeout(1 second)
Source.single(request)
.via(Http().outgoingConnection(interface, port, settings = connectionSettings))
.map{r =>
r.discardEntityBytes()
r
}
.runWith(Sink.head)
}

Expand Down
Expand Up @@ -18,21 +18,22 @@ package kamon.akka.http

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpMethods, HttpRequest}
import akka.http.scaladsl.model.HttpRequest
import akka.stream.ActorMaterializer
import kamon.Kamon
import kamon.context.{Context, Key, TextMap}
import kamon.context.{Context, Key}
import kamon.testkit._
import kamon.trace.Span.TagValue
import kamon.trace.Span.{Mark, TagValue}
import kamon.trace.{Span, SpanCustomizer}
import kamon.util.Registration
import org.json4s.native.JsonMethods.parse
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpecLike}
import org.scalatest._
import org.scalatest.concurrent.{Eventually, ScalaFutures}

import scala.concurrent.duration._

class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with MetricInspection
class AkkaHttpServerTracingSpec extends WordSpecLike
with Matchers with ScalaFutures with Inside with BeforeAndAfterAll with MetricInspection
with Reconfigure with TestWebServer with Eventually with OptionValues {

import TestWebServer.Endpoints._
Expand All @@ -49,7 +50,7 @@ class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with BeforeAn
"the Akka HTTP server instrumentation" should {
"create a server Span when receiving requests" in {
val target = s"http://$interface:$port/$dummyPathOk"
Http().singleRequest(HttpRequest(uri = target))
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())

eventually(timeout(10 seconds)) {
val span = reporter.nextSpan().value
Expand All @@ -65,7 +66,7 @@ class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with BeforeAn

"change the Span operation name when using the operationName directive" in {
val target = s"http://$interface:$port/$traceOk"
Http().singleRequest(HttpRequest(uri = target))
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())

eventually(timeout(10 seconds)) {
val span = reporter.nextSpan().value
Expand All @@ -81,7 +82,7 @@ class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with BeforeAn

"mark spans as error when request fails" in {
val target = s"http://$interface:$port/$dummyPathError"
Http().singleRequest(HttpRequest(uri = target))
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())

eventually(timeout(10 seconds)) {
val span = reporter.nextSpan().value
Expand All @@ -97,11 +98,12 @@ class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with BeforeAn

"change the operation name to 'unhandled' when the response status code is 404" in {
val target = s"http://$interface:$port/unknown-path"
Http().singleRequest(HttpRequest(uri = target))
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())

eventually(timeout(10 seconds)) {
val span = reporter.nextSpan().value
val spanTags = stringTag(span) _

span.operationName shouldBe "unhandled"
spanTags("component") shouldBe "akka.http.server"
spanTags("span.kind") shouldBe "server"
Expand Down Expand Up @@ -134,6 +136,29 @@ class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with BeforeAn
}
}


"correctly time entity transfer timings" in {
val target = s"http://$interface:$port/$stream"
Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes())

val span = eventually(timeout(10 seconds)) {
val span = reporter.nextSpan().value

span.operationName shouldBe "stream"
span
}
val spanTags = stringTag(span) _
inside(span.marks){
case List(m2@Mark(_, "response-ready")) =>

}
spanTags("component") shouldBe "akka.http.server"
spanTags("span.kind") shouldBe "server"
spanTags("http.method") shouldBe "GET"
spanTags("http.url") shouldBe target
}


def stringTag(span: Span.FinishedSpan)(tag: String): String = {
span.tags(tag).asInstanceOf[TagValue.String].string
}
Expand Down
20 changes: 20 additions & 0 deletions kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala
Expand Up @@ -18,17 +18,22 @@ package kamon.testkit

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse}
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.Connection
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import kamon.Kamon
import kamon.akka.http.TracingDirectives
import kamon.context.Key
import org.json4s.{DefaultFormats, native}
import scala.concurrent.duration._

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random

trait TestWebServer extends TracingDirectives {
implicit val serialization = native.Serialization
Expand Down Expand Up @@ -86,6 +91,20 @@ trait TestWebServer extends TracingDirectives {
OK
}
}
} ~
path(stream) {
complete{

val longStringContentStream = Source(
Range(1, 16).map{ i =>
ByteString(
100 * ('a' + i).toChar
)
}
)

HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, longStringContentStream))
}
}
}
}
Expand All @@ -104,6 +123,7 @@ trait TestWebServer extends TracingDirectives {
val replyWithHeaders: String = "reply-with-headers"
val basicContext: String = "basic-context"
val waitTen: String = "wait"
val stream: String = "stream"

implicit class Converter(endpoint: String) {
implicit def withSlash: String = "/" + endpoint
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
@@ -1 +1 @@
sbt.version=0.13.13
sbt.version=0.13.17

0 comments on commit 7c9c75d

Please sign in to comment.