Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context Tags, HTTP Propagation and HTTP Server Instrumentation #552

Merged
merged 20 commits into from
Oct 13, 2018
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ language: scala
script:
- sbt +test
scala:
- 2.12.4
- 2.12.6
jdk:
- oraclejdk8
before_script:
Expand Down
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ lazy val kamon = (project in file("."))
.aggregate(core, testkit, coreTests)

val commonSettings = Seq(
scalaVersion := "2.12.4",
scalaVersion := "2.12.6",
javacOptions += "-XDignore.symbol.file",
resolvers += Resolver.mavenLocal,
crossScalaVersions := Seq("2.12.4", "2.11.8", "2.10.6"),
crossScalaVersions := Seq("2.12.6", "2.11.8", "2.10.6"),
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
scalacOptions ++= Seq(
"-deprecation",
Expand Down Expand Up @@ -71,6 +71,8 @@ lazy val coreTests = (project in file("kamon-core-tests"))
.settings(
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
"ch.qos.logback" % "logback-classic" % "1.2.2" % "test"
"ch.qos.logback" % "logback-classic" % "1.2.2" % "test",
"com.squareup.okhttp3" % "okhttp" % "3.11.0" % "test",
"com.typesafe.akka" % "akka-http-core_2.12" % "10.1.4" % "test"
)
).dependsOn(testkit)
33 changes: 33 additions & 0 deletions kamon-core-tests/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,37 @@ kamon {
context.codecs.string-keys {
request-id = "X-Request-ID"
}
}



kamon {

trace {
sampler = always
}

propagation.http.default {
tags.mappings {
"correlation-id" = "x-correlation-id"
}
}

instrumentation {
http-server {
default {
tracing.trace-id-tag = "correlation-id"
}

no-span-metrics {
tracing.span-metrics = off
}

noop {
propagation.enabled = no
metrics.enabled = no
tracing.enabled = no
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.scalatest.time.SpanSugar._

class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{

"the Kamon lifecycle" should {
"the Kamon lifecycle" ignore {
"keep the JVM running if reporters are running" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter"))
Thread.sleep(5000)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package kamon.context

import java.io.ByteArrayOutputStream

import com.typesafe.config.ConfigFactory
import kamon.Kamon
import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
import kamon.context.Propagation.{EntryReader, EntryWriter}
import org.scalatest.{Matchers, OptionValues, WordSpec}

import scala.util.Random

class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {

"The Binary Context Propagation" should {
"return an empty context if there is no data to read from" in {
val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0)))
context.isEmpty() shouldBe true
}

"not write any data to the medium if the context is empty" in {
val writer = inspectableByteStreamWriter()
binaryPropagation.write(Context.Empty, writer)
writer.size() shouldBe 0
}

"handle malformed data in when reading a context" in {
val randomBytes = Array.ofDim[Byte](42)
Random.nextBytes(randomBytes)

val context = binaryPropagation.read(ByteStreamReader.of(randomBytes))
context.isEmpty() shouldBe true
}

"handle read failures in an entry reader" in {
val context = Context.of(
BinaryPropagationSpec.StringKey, "string-value",
BinaryPropagationSpec.FailStringKey, "fail-read"
)
val writer = inspectableByteStreamWriter()
binaryPropagation.write(context, writer)

val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
rtContext.tags shouldBe empty
rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
}

"handle write failures in an entry writer" in {
val context = Context.of(
BinaryPropagationSpec.StringKey, "string-value",
BinaryPropagationSpec.FailStringKey, "fail-write"
)
val writer = inspectableByteStreamWriter()
binaryPropagation.write(context, writer)

val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
rtContext.tags shouldBe empty
rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
}

"handle write failures in an entry writer when the context is too big" in {
val context = Context.of(BinaryPropagationSpec.StringKey, "string-value" * 20)
val writer = inspectableByteStreamWriter()
binaryPropagation.write(context, writer)

val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
rtContext shouldBe empty
}

"round trip a Context that only has tags" in {
val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
val writer = inspectableByteStreamWriter()
binaryPropagation.write(context, writer)

val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
rtContext.entries shouldBe empty
rtContext.tags should contain theSameElementsAs (context.tags)
}

"round trip a Context that only has entries" in {
val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42)
val writer = inspectableByteStreamWriter()
binaryPropagation.write(context, writer)

val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
rtContext.tags shouldBe empty
rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
}

"round trip a Context that with tags and entries" in {
val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
.withKey(BinaryPropagationSpec.StringKey, "string-value")
.withKey(BinaryPropagationSpec.IntegerKey, 42)

val writer = inspectableByteStreamWriter()
binaryPropagation.write(context, writer)
val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))

rtContext.tags should contain theSameElementsAs (context.tags)
rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
}
}

val binaryPropagation = BinaryPropagation.from(
ConfigFactory.parseString(
"""
|max-outgoing-size = 64
|entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
|entries.incoming.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
|entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
|entries.outgoing.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
|
""".stripMargin
).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon)


def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter

}

object BinaryPropagationSpec {

val StringKey = Context.key[String]("string", null)
val FailStringKey = Context.key[String]("failString", null)
val IntegerKey = Context.key[Int]("integer", 0)

class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {

override def read(medium: ByteStreamReader, context: Context): Context = {
val valueData = medium.readAll()

if(valueData.length > 0) {
context.withKey(StringKey, new String(valueData))
} else context
}

override def write(context: Context, medium: ByteStreamWriter): Unit = {
val value = context.get(StringKey)
if(value != null) {
medium.write(value.getBytes)
}
}
}

class FailStringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {

override def read(medium: ByteStreamReader, context: Context): Context = {
val valueData = medium.readAll()

if(valueData.length > 0) {
val stringValue = new String(valueData)
if(stringValue == "fail-read") {
sys.error("The fail string entry reader has triggered")
}

context.withKey(FailStringKey, stringValue)
} else context
}

override def write(context: Context, medium: ByteStreamWriter): Unit = {
val value = context.get(FailStringKey)
if(value != null && value != "fail-write") {
medium.write(value.getBytes)
} else {
medium.write(42) // malformed data on purpose
sys.error("The fail string entry writer has triggered")
}
}
}
}

This file was deleted.

Loading