diff --git a/build.sbt b/build.sbt
index 47d223789..e153dfcd5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -126,7 +126,8 @@ lazy val instrumentation = (project in file("instrumentation"))
`kamon-system-metrics`,
`kamon-akka`,
`kamon-akka-http`,
- `kamon-play`
+ `kamon-play`,
+ `kamon-okhttp`,
)
@@ -386,6 +387,23 @@ lazy val `kamon-play` = (project in file("instrumentation/kamon-play"))
)
+lazy val `kamon-okhttp` = (project in file("instrumentation/kamon-okhttp"))
+ .disablePlugins(AssemblyPlugin)
+ .enablePlugins(JavaAgent)
+ .settings(instrumentationSettings)
+ .settings(
+ libraryDependencies ++= Seq(
+ kanelaAgent % "provided",
+ "com.squareup.okhttp3" % "okhttp" % "3.14.9" % "provided",
+
+ scalatest % "test",
+ logbackClassic % "test",
+ "org.eclipse.jetty" % "jetty-server" % "9.4.25.v20191220" % "test",
+ "org.eclipse.jetty" % "jetty-servlet" % "9.4.25.v20191220" % "test",
+ )
+ ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")
+
+
/**
* Reporters
*/
@@ -597,5 +615,6 @@ val `kamon-bundle` = (project in file("bundle/kamon-bundle"))
`kamon-system-metrics` % "shaded",
`kamon-akka` % "shaded",
`kamon-akka-http` % "shaded",
- `kamon-play` % "shaded"
+ `kamon-play` % "shaded",
+ `kamon-okhttp` % "shaded",
)
\ No newline at end of file
diff --git a/instrumentation/kamon-okhttp/src/main/resources/reference.conf b/instrumentation/kamon-okhttp/src/main/resources/reference.conf
new file mode 100644
index 000000000..0e90da290
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/main/resources/reference.conf
@@ -0,0 +1,104 @@
+# ============================================ #
+# kamon okhttp3 client reference configuration #
+# ============================================ #
+
+kamon.instrumentation.okhttp {
+
+ http-client {
+ #
+ # Configuration for HTTP context propagation.
+ #
+ propagation {
+
+ # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
+ # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
+ # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
+ enabled = yes
+
+ # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
+ # configuration for more details on how to configure the detault HTTP context propagation.
+ channel = "default"
+ }
+
+ tracing {
+
+ # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests
+ # and finish them when the response is received from the server.
+ enabled = yes
+
+ # Enables collection of span metrics using the `span.processing-time` metric.
+ span-metrics = on
+
+ # Select which tags should be included as span and span metric tags. The possible options are:
+ # - span: the tag is added as a Span tag (i.e. using span.tag(...))
+ # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
+ # - off: the tag is not used.
+ #
+ tags {
+
+ # Use the http.url tag.
+ url = span
+
+ # Use the http.method tag.
+ method = metric
+
+ # Use the http.status_code tag.
+ status-code = metric
+
+ # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type
+ # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration
+ # should be added:
+ #
+ # from-context {
+ # customer_type = span
+ # }
+ #
+ from-context {
+
+ }
+ }
+
+ operations {
+
+ # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP
+ # Client instrumentation will always try to use the HTTP Operation Name Generator configured bellow to get
+ # a name, but if it fails to generate it then this name will be used.
+ default = "http.client.request"
+
+ # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms:
+ # - hostname: Uses the request Host as the operation name.
+ # - method: Uses the request HTTP method as the operation name.
+ #
+ name-generator = "kamon.okhttp3.instrumentation.OkHttpOperationNameGenerator"
+ }
+ }
+ }
+}
+
+kamon {
+ okhttp {
+ # Fully qualified name of the implementation of kamon.okhttp3.NameGenerator that will be used for assigning names
+ # names to Spans.
+ name-generator = kamon.okhttp3.DefaultNameGenerator
+ # Metrics for okhttp
+ metrics {
+ enabled = true
+ }
+ }
+}
+
+kanela {
+ modules {
+ okhttp-module {
+ name = "OkHttp Instrumentation Module"
+ description = "Provides context propagation, distributed tracing and HTTP client and server metrics for OkHttp"
+ stoppable = true
+ instrumentations = [
+ "kamon.okhttp3.instrumentation.OkHttpInstrumentation"
+ ]
+ within = [
+ "okhttp3..*"
+ ]
+ }
+ }
+}
diff --git a/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/KamonOkHttpTracing.scala b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/KamonOkHttpTracing.scala
new file mode 100644
index 000000000..133b37330
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/KamonOkHttpTracing.scala
@@ -0,0 +1,95 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2020 the kamon project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.okhttp3.instrumentation
+
+import java.util
+
+import kamon.Kamon
+import kamon.context.HttpPropagation.HeaderWriter
+import kamon.instrumentation.http.{HttpClientInstrumentation, HttpMessage}
+import okhttp3.{Request, Response}
+
+import scala.collection.immutable.Map
+import scala.collection.{JavaConverters, mutable}
+
+object KamonOkHttpTracing {
+ private val httpClientConfig = Kamon.config.getConfig("kamon.instrumentation.okhttp.http-client")
+ private val instrumentation = HttpClientInstrumentation.from(httpClientConfig, "okhttp-client")
+
+ def withNewSpan(request: Request): HttpClientInstrumentation.RequestHandler[Request] = {
+ instrumentation.createHandler(getRequestBuilder(request), Kamon.currentContext)
+ }
+
+ def successContinuation(requestHandler: HttpClientInstrumentation.RequestHandler[Request], response: Response): Response = {
+ requestHandler.processResponse(toKamonResponse(response))
+ response
+ }
+
+ def failureContinuation(requestHandler: HttpClientInstrumentation.RequestHandler[Request], error: Throwable): Unit = {
+ requestHandler.span.fail(error)
+ requestHandler.span.finish()
+ }
+
+ def getRequestBuilder(request: Request): HttpMessage.RequestBuilder[Request] = new HttpMessage.RequestBuilder[Request]() {
+ private val _headers = mutable.Map[String, String]()
+
+ override def read(header: String): Option[String] = Option.apply(request.header(header))
+
+ override def readAll: Map[String, String] = {
+ JavaConverters
+ .mapAsScalaMapConverter(request.headers.toMultimap)
+ .asScala
+ .mapValues((values: util.List[String]) => values.get(0))
+ .toMap
+ }
+
+ override def url: String = request.url.toString
+
+ override def path: String = request.url.uri.getPath
+
+ override def method: String = request.method
+
+ override def host: String = request.url.host
+
+ override def port: Int = request.url.port
+
+ override def write(header: String, value: String): Unit = {
+ _headers += (header -> value)
+ }
+
+ override def build: Request = {
+ val newHeadersMap = request.headers.newBuilder
+ _headers.foreach { case (key, value) => newHeadersMap.add(key, value) }
+ request.newBuilder.headers(newHeadersMap.build).build
+ }
+ }
+
+ def toKamonResponse(response: Response): HttpMessage.Response = new HttpMessage.Response() {
+ override def statusCode: Int = response.code()
+ }
+
+ trait HeaderHandler extends HeaderWriter {
+ private val _headers = mutable.Map[String, String]()
+
+ override def write(header: String, value: String): Unit = {
+ _headers += (header -> value)
+ }
+
+ def headers: mutable.Map[String, String] = _headers
+ }
+
+}
diff --git a/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/KamonTracingInterceptor.scala b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/KamonTracingInterceptor.scala
new file mode 100644
index 000000000..a53c0b92e
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/KamonTracingInterceptor.scala
@@ -0,0 +1,35 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2020 the kamon project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.okhttp3.instrumentation
+
+import okhttp3.{Interceptor, Response}
+
+final class KamonTracingInterceptor extends Interceptor {
+
+ override def intercept(chain: Interceptor.Chain): Response = {
+ val clientRequestHandler = KamonOkHttpTracing.withNewSpan(chain.request)
+ val request = clientRequestHandler.request
+ try {
+ val response = chain.proceed(request)
+ KamonOkHttpTracing.successContinuation(clientRequestHandler, response)
+ } catch {
+ case error: Throwable =>
+ KamonOkHttpTracing.failureContinuation(clientRequestHandler, error)
+ throw error
+ }
+ }
+}
diff --git a/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/OkHttpInstrumentation.scala b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/OkHttpInstrumentation.scala
new file mode 100644
index 000000000..5d58bc9ab
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/OkHttpInstrumentation.scala
@@ -0,0 +1,50 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2020 the kamon project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.okhttp3.instrumentation
+
+import kanela.agent.api.instrumentation.InstrumentationBuilder
+import kanela.agent.libs.net.bytebuddy.asm.Advice
+import okhttp3.OkHttpClient
+
+class OkHttpInstrumentation extends InstrumentationBuilder {
+
+ /**
+ * Instrument:
+ *
+ * okhttp3.OkHttpClient::constructor
+ */
+ onType("okhttp3.OkHttpClient")
+ .advise(isConstructor() and takesOneArgumentOf("okhttp3.OkHttpClient$Builder"), classOf[OkHttpClientBuilderAdvisor])
+}
+
+/**
+ * Avisor for okhttp3.OkHttpClient::constructor(OkHttpClient.Builder)
+ */
+class OkHttpClientBuilderAdvisor
+
+object OkHttpClientBuilderAdvisor {
+
+ import scala.collection.JavaConverters._
+
+ @Advice.OnMethodEnter(suppress = classOf[Throwable])
+ def addKamonInterceptor(@Advice.Argument(0) builder: OkHttpClient.Builder): Unit = {
+ val interceptors = builder.networkInterceptors.asScala
+ if (!interceptors.exists(_.isInstanceOf[KamonTracingInterceptor])) {
+ builder.addNetworkInterceptor(new KamonTracingInterceptor)
+ }
+ }
+}
diff --git a/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/OkHttpOperationNameGenerator.scala b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/OkHttpOperationNameGenerator.scala
new file mode 100644
index 000000000..f98f1a3eb
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/main/scala/kamon/okhttp3/instrumentation/OkHttpOperationNameGenerator.scala
@@ -0,0 +1,25 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2020 the kamon project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.okhttp3.instrumentation
+
+import kamon.instrumentation.http.{HttpMessage, HttpOperationNameGenerator}
+
+class OkHttpOperationNameGenerator extends HttpOperationNameGenerator {
+ override def name(request: HttpMessage.Request): Option[String] = {
+ Option(request.url)
+ }
+}
diff --git a/instrumentation/kamon-okhttp/src/test/resources/logback.xml b/instrumentation/kamon-okhttp/src/test/resources/logback.xml
new file mode 100644
index 000000000..742815603
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/test/resources/logback.xml
@@ -0,0 +1,12 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/instrumentation/kamon-okhttp/src/test/scala/kamon/okhttp3/instrumentation/OkHttpTracingInstrumentationSpec.scala b/instrumentation/kamon-okhttp/src/test/scala/kamon/okhttp3/instrumentation/OkHttpTracingInstrumentationSpec.scala
new file mode 100644
index 000000000..667976724
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/test/scala/kamon/okhttp3/instrumentation/OkHttpTracingInstrumentationSpec.scala
@@ -0,0 +1,308 @@
+/* =========================================================================================
+ * Copyright © 2013-2020 the kamon project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.okhttp3.instrumentation
+
+import java.io.IOException
+
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+import kamon.Kamon
+import kamon.context.Context
+import kamon.okhttp3.utils.{JettySupport, ServletTestSupport}
+import kamon.tag.Lookups.{plain, plainBoolean, plainLong}
+import kamon.testkit.{Reconfigure, TestSpanReporter}
+import kamon.trace.Span
+import kamon.trace.SpanPropagation.B3
+import okhttp3._
+import org.scalatest.concurrent.Eventually
+import org.scalatest.Matchers
+import org.scalatest.time.SpanSugar
+import org.scalatest.WordSpec
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, OptionValues}
+
+class OkHttpTracingInstrumentationSpec extends WordSpec
+ with Matchers
+ with Eventually
+ with SpanSugar
+ with BeforeAndAfterAll
+ with BeforeAndAfterEach
+ with TestSpanReporter
+ with JettySupport
+ with Reconfigure
+ with OptionValues {
+
+ val customTag = "requestId"
+ val customHeaderName = "X-Request-Id"
+
+ val uriError = "/path/fail"
+
+ "the OkHttp Tracing Instrumentation" should {
+ "propagate the current context and generate a span around an sync request" in {
+ val okSpan = Kamon.spanBuilder("ok-sync-operation-span").start()
+ val client = new OkHttpClient.Builder().build()
+ val uri = "/path/sync"
+ val url = s"http://$host:$port$uri"
+ val request = new Request.Builder()
+ .url(url)
+ .build()
+
+ Kamon.runWithContext(Context.of(Span.Key, okSpan)) {
+ val response = client.newCall(request).execute()
+ response.body().close()
+ }
+
+ val span: Span.Finished = eventually(timeout(3 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+
+ span.operationName shouldBe url
+ span.kind shouldBe Span.Kind.Client
+ span.metricTags.get(plain("component")) shouldBe "okhttp-client"
+ span.metricTags.get(plain("http.method")) shouldBe "GET"
+ span.metricTags.get(plainLong("http.status_code")) shouldBe 200
+ span.metricTags.get(plainBoolean("error")) shouldBe false
+ span.tags.get(plain("http.url")) shouldBe url
+
+ okSpan.id == span.parentId
+
+ testSpanReporter().nextSpan() shouldBe None
+
+ span
+ }
+
+ val requests = consumeSentRequests()
+
+ requests.size should be(1)
+ requests.head.uri should be(uri)
+ requests.head.header(B3.Headers.TraceIdentifier).value should be(span.trace.id.string)
+ requests.head.header(B3.Headers.SpanIdentifier).value should be(span.id.string)
+ requests.head.header(B3.Headers.ParentSpanIdentifier).value should be(span.parentId.string)
+ requests.head.header(B3.Headers.Sampled).value should be("1")
+ }
+
+ "propagate the current context and generate a span around an async request" in {
+ val okAsyncSpan = Kamon.spanBuilder("ok-async-operation-span").start()
+ val client = new OkHttpClient.Builder().build()
+ val uri = "/path/async"
+ val url = s"http://$host:$port$uri"
+ val request = new Request.Builder()
+ .url(url)
+ .build()
+
+ Kamon.runWithContext(Context.of(Span.Key, okAsyncSpan)) {
+ client.newCall(request).enqueue(new Callback() {
+ override def onResponse(call: Call, response: Response): Unit = {}
+
+ override def onFailure(call: Call, e: IOException): Unit =
+ e.printStackTrace()
+ })
+ }
+
+ val span: Span.Finished = eventually(timeout(3 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+
+ span.operationName shouldBe url
+ span.kind shouldBe Span.Kind.Client
+ span.metricTags.get(plain("component")) shouldBe "okhttp-client"
+ span.metricTags.get(plain("http.method")) shouldBe "GET"
+ span.metricTags.get(plainLong("http.status_code")) shouldBe 200
+ span.metricTags.get(plainBoolean("error")) shouldBe false
+ span.tags.get(plain("http.url")) shouldBe url
+
+ okAsyncSpan.id == span.parentId
+
+ testSpanReporter().nextSpan() shouldBe None
+
+ span
+ }
+
+ val requests = consumeSentRequests()
+
+ requests.size should be(1)
+ requests.head.uri should be(uri)
+ requests.head.header(B3.Headers.TraceIdentifier).value should be(span.trace.id.string)
+ requests.head.header(B3.Headers.SpanIdentifier).value should be(span.id.string)
+ requests.head.header(B3.Headers.ParentSpanIdentifier).isEmpty should be(true)
+ requests.head.header(B3.Headers.Sampled).value should be("1")
+ }
+
+ "propagate context tags" in {
+ val okSpan = Kamon.internalSpanBuilder("ok-span-with-extra-tags", "user-app").start()
+ val client = new OkHttpClient.Builder().build()
+ val uri = "/path/sync/with-extra-tags"
+ val url = s"http://$host:$port$uri"
+ val request = new Request.Builder()
+ .url(url)
+ .build()
+
+ Kamon.runWithContext(Context.of(Span.Key, okSpan).withTag(customTag, "1234")) {
+ val response = client.newCall(request).execute()
+ response.body().close()
+ }
+
+ val span: Span.Finished = eventually(timeout(3 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+
+ span.operationName shouldBe url
+ span.kind shouldBe Span.Kind.Client
+ span.metricTags.get(plain("component")) shouldBe "okhttp-client"
+ span.metricTags.get(plain("http.method")) shouldBe "GET"
+ span.metricTags.get(plainLong("http.status_code")) shouldBe 200
+ span.metricTags.get(plainBoolean("error")) shouldBe false
+ span.tags.get(plain("http.url")) shouldBe url
+ span.tags.get(plain(customTag)) shouldBe "1234"
+
+ okSpan.id == span.parentId
+
+ testSpanReporter().nextSpan() shouldBe None
+
+ span
+ }
+
+ val requests = consumeSentRequests()
+
+ requests.size should be(1)
+ requests.head.uri should be(uri)
+ requests.head.header(B3.Headers.TraceIdentifier).value should be(span.trace.id.string)
+ requests.head.header(B3.Headers.SpanIdentifier).value should be(span.id.string)
+ requests.head.header(B3.Headers.ParentSpanIdentifier).value should be(span.parentId.string)
+ requests.head.header(B3.Headers.Sampled).value should be("1")
+ requests.head.header(customHeaderName).value should be("1234")
+ }
+
+ "mark span as failed when server response with 5xx on sync execution" in {
+ val okSpan = Kamon.spanBuilder("ok-sync-operation-span").start()
+ val client = new OkHttpClient.Builder().build()
+ val uri = uriError
+ val url = s"http://$host:$port$uri"
+ val request = new Request.Builder()
+ .url(url)
+ .build()
+
+ Kamon.runWithContext(Context.of(Span.Key, okSpan)) {
+ val response = client.newCall(request).execute()
+ response.body().close()
+ }
+
+ val span: Span.Finished = eventually(timeout(3 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+
+ span.operationName shouldBe url
+ span.kind shouldBe Span.Kind.Client
+ span.metricTags.get(plain("component")) shouldBe "okhttp-client"
+ span.metricTags.get(plain("http.method")) shouldBe "GET"
+ span.metricTags.get(plainBoolean("error")) shouldBe true
+ span.metricTags.get(plainLong("http.status_code")) shouldBe 500
+ span.tags.get(plain("http.url")) shouldBe url
+
+ okSpan.id == span.parentId
+
+ testSpanReporter().nextSpan() shouldBe None
+
+ span
+ }
+ val requests = consumeSentRequests()
+
+ requests.size should be(1)
+ requests.head.uri should be(uri)
+ requests.head.header(B3.Headers.TraceIdentifier).value should be(span.trace.id.string)
+ requests.head.header(B3.Headers.SpanIdentifier).value should be(span.id.string)
+ requests.head.header(B3.Headers.ParentSpanIdentifier).value should be(span.parentId.string)
+ requests.head.header(B3.Headers.Sampled).value should be("1")
+ }
+
+ "mark span as failed when server response with 5xx on async execution" in {
+ val okAsyncSpan = Kamon.spanBuilder("ok-async-operation-span").start()
+ val client = new OkHttpClient.Builder().build()
+ val uri = uriError
+ val url = s"http://$host:$port$uri"
+ val request = new Request.Builder()
+ .url(url)
+ .build()
+
+ Kamon.runWithContext(Context.of(Span.Key, okAsyncSpan)) {
+ client.newCall(request).enqueue(new Callback() {
+ override def onResponse(call: Call, response: Response): Unit = {}
+
+ override def onFailure(call: Call, e: IOException): Unit =
+ e.printStackTrace()
+ })
+ }
+
+ val span: Span.Finished = eventually(timeout(3 seconds)) {
+ val span = testSpanReporter().nextSpan().value
+
+ span.operationName shouldBe url
+ span.kind shouldBe Span.Kind.Client
+ span.metricTags.get(plain("component")) shouldBe "okhttp-client"
+ span.metricTags.get(plain("http.method")) shouldBe "GET"
+ span.metricTags.get(plainBoolean("error")) shouldBe true
+ span.metricTags.get(plainLong("http.status_code")) shouldBe 500
+ span.tags.get(plain("http.url")) shouldBe url
+
+ okAsyncSpan.id == span.parentId
+
+ testSpanReporter().nextSpan() shouldBe None
+
+ span
+ }
+ val requests = consumeSentRequests()
+
+ requests.size should be(1)
+ requests.head.uri should be(uri)
+ requests.head.header(B3.Headers.TraceIdentifier).value should be(span.trace.id.string)
+ requests.head.header(B3.Headers.SpanIdentifier).value should be(span.id.string)
+ requests.head.header(B3.Headers.ParentSpanIdentifier).isEmpty should be(true)
+ requests.head.header(B3.Headers.Sampled).value should be("1")
+ }
+ }
+
+ val servlet: ServletTestSupport = new HttpServlet() with ServletTestSupport {
+ override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
+ addRequest(req)
+ resp.addHeader("Content-Type", "text/plain")
+
+ req.getRequestURI match {
+ case path if path == uriError => resp.setStatus(500)
+ case _ => resp.setStatus(200)
+ }
+ }
+ }
+
+ override protected def beforeAll(): Unit = {
+ applyConfig(
+ s"""
+ |kamon {
+ | propagation.http.default.tags.mappings {
+ | $customTag = $customHeaderName
+ | }
+ | instrumentation.http-client.default.tracing.tags.from-context {
+ | $customTag = span
+ | }
+ |}
+ |""".stripMargin)
+ enableFastSpanFlushing()
+ sampleAlways()
+
+ startServer()
+ }
+
+ override protected def afterAll(): Unit = {
+ stopServer()
+ }
+
+ override protected def beforeEach(): Unit = {
+ consumeSentRequests()
+ }
+}
diff --git a/instrumentation/kamon-okhttp/src/test/scala/kamon/okhttp3/utils/JettySupport.scala b/instrumentation/kamon-okhttp/src/test/scala/kamon/okhttp3/utils/JettySupport.scala
new file mode 100644
index 000000000..0472ff594
--- /dev/null
+++ b/instrumentation/kamon-okhttp/src/test/scala/kamon/okhttp3/utils/JettySupport.scala
@@ -0,0 +1,119 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2020 the kamon project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.okhttp3.utils
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+
+import javax.servlet.Servlet
+import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.{Server, ServerConnector}
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
+import org.slf4j.LoggerFactory
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+/**
+ * Runs a Servlet or a Filter on an embedded Jetty server.
+ */
+private class JettyServer(socketAddress: InetSocketAddress = new InetSocketAddress(0)) {
+ val server = new Server(socketAddress)
+ val context = new ServletContextHandler(server, "/")
+ val _requests: mutable.ListBuffer[HttpServletRequest] = mutable.ListBuffer()
+
+ def start(servlet: Servlet, path: String = "/*"): this.type = {
+ context.addServlet(new ServletHolder(servlet), "/")
+ server.start()
+ this
+ }
+
+ def stop(): this.type = {
+ server.stop()
+ this
+ }
+
+ def join(): this.type = {
+ server.join()
+ this
+ }
+
+ def selectedPort: Int = {
+ server.getConnectors()(0).asInstanceOf[ServerConnector].getLocalPort
+ }
+
+ def host: String = {
+ server.getConnectors()(0).asInstanceOf[ServerConnector].getHost
+ }
+
+ def requests: List[HttpServletRequest] = _requests.toList
+}
+
+case class RequestHolder(uri: String, headers: Map[String, String]) {
+ def header(name: String): Option[String] = headers.get(name)
+}
+
+trait ServletTestSupport extends Servlet {
+
+ import collection.JavaConverters._
+
+ private val _requests: BlockingQueue[RequestHolder] = new LinkedBlockingQueue[RequestHolder]()
+
+ def requests: List[RequestHolder] = {
+ val result: java.util.List[RequestHolder] = new java.util.ArrayList[RequestHolder]
+ _requests.drainTo(result)
+ result.asScala.toList
+ }
+
+ def addRequest(req: HttpServletRequest): Unit = {
+ _requests.offer(RequestHolder(req.getRequestURI, headers(req)))
+ }
+
+ private def headers(req: HttpServletRequest): Map[String, String] = {
+ val headersIterator = req.getHeaderNames
+ val headers = Map.newBuilder[String, String]
+ while (headersIterator.hasMoreElements) {
+ val name = headersIterator.nextElement()
+ headers += (name -> req.getHeader(name))
+ }
+ TreeMap(headers.result().toList: _*)(Ordering.comparatorToOrdering(String.CASE_INSENSITIVE_ORDER))
+ }
+}
+
+trait JettySupport {
+
+ private val logger = LoggerFactory.getLogger(classOf[JettySupport])
+
+ val servlet: ServletTestSupport
+
+ private var jetty = Option.empty[JettyServer]
+
+ def startServer(): Unit = {
+ jetty = Some(new JettyServer().start(servlet))
+ logger.info(s"Jetty started at ${host}:${port}")
+ }
+
+ def stopServer(): Unit = {
+ jetty.foreach(_.stop())
+ }
+
+ def host: String = jetty.get.host
+
+ def port: Int = jetty.get.selectedPort
+
+ def consumeSentRequests(): List[RequestHolder] = servlet.requests
+}