Skip to content

Commit

Permalink
Add KamonRemoteInstrument
Browse files Browse the repository at this point in the history
As of this commit, context propagation looks ok, but there are deserialization
errors happening on HandshakeReq
  • Loading branch information
SimunKaracic committed Dec 15, 2020
1 parent f7ae498 commit 1a50fc0
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 91 deletions.
6 changes: 3 additions & 3 deletions core/kamon-core/src/main/scala/kamon/ContextPropagation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kamon.context.{BinaryPropagation, HttpPropagation, Propagation}
trait ContextPropagation { self: Configuration =>
@volatile private var _propagationComponents: ContextPropagation.Channels = _
@volatile private var _defaultHttpPropagation: Propagation[HeaderReader, HeaderWriter] = _
@volatile private var _defaultBinaryPropagation: Propagation[ByteStreamReader, ByteStreamWriter] = _
@volatile private var _defaultBinaryPropagation: BinaryPropagation[ByteStreamReader, ByteStreamWriter] = _

// Initial configuration and reconfigures
init(self.config)
Expand Down Expand Up @@ -59,7 +59,7 @@ trait ContextPropagation { self: Configuration =>
* Retrieves the default binary propagation channel. Configuration for this channel can be found under the
* kamon.propagation.binary.default configuration section.
*/
def defaultBinaryPropagation(): Propagation[ByteStreamReader, ByteStreamWriter] =
def defaultBinaryPropagation(): BinaryPropagation[ByteStreamReader, ByteStreamWriter] =
_defaultBinaryPropagation

private def init(config: Config): Unit = synchronized {
Expand All @@ -76,7 +76,7 @@ object ContextPropagation {

case class Channels(
httpChannels: Map[String, Propagation[HeaderReader, HeaderWriter]],
binaryChannels: Map[String, Propagation[ByteStreamReader, ByteStreamWriter]]
binaryChannels: Map[String, BinaryPropagation[ByteStreamReader, ByteStreamWriter]]
)

object Channels {
Expand Down
194 changes: 115 additions & 79 deletions core/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package kamon
package context

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}

import com.typesafe.config.Config
import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry, Tags => ColferTags}
import kamon.context.generated.binary.context.{BooleanTag => ColferBooleanTag, LongTag => ColferLongTag, StringTag => ColferStringTag}
import kamon.tag.{Tag, TagSet}
import kamon.trace.Span.TagKeys
import org.slf4j.LoggerFactory

import java.nio.{BufferOverflowException, ByteBuffer}
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import scala.util.Try
Expand All @@ -38,7 +38,17 @@ import scala.util.Try
* Binary propagation uses the ByteStreamReader and ByteStreamWriter abstraction which closely model the APIs from
* InputStream and OutputStream, respectively, but without exposing additional functionality that wouldn't have any
* well defined behavior for Context propagation, e.g. flush or close functions on OutputStreams.
*
* Additionally, it has a function to convert the context into an array of bytes and store it in a buffer. It does so
* by first storing the size of a context as an Int, and then the serialized context as an Array[Byte].
*/

trait BinaryPropagation[ReaderMedium, WriterMedium] extends Propagation[ReaderMedium, WriterMedium] {
// This'll get refactored into something smarter
// but only after I get RemoteInstrument up and running
def storeContextInBuffer(context: Context, buffer: ByteBuffer): ByteBuffer
}

object BinaryPropagation {

/**
Expand Down Expand Up @@ -126,11 +136,10 @@ object BinaryPropagation {
}



/**
* Create a new Binary Propagation instance from the provided configuration.
*/
def from(config: Config): Propagation[ByteStreamReader, ByteStreamWriter] = {
def from(config: Config): BinaryPropagation[ByteStreamReader, ByteStreamWriter] = {
new BinaryPropagation.Default(Settings.from(config))
}

Expand All @@ -139,7 +148,9 @@ object BinaryPropagation {
* entries. Entries are represented as simple pairs of entry name and bytes, which are then processed by the all
* configured entry readers and writers.
*/
class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] {

class Default(settings: Settings) extends BinaryPropagation[ByteStreamReader, ByteStreamWriter] {

private val _logger = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] {
override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
Expand All @@ -150,7 +161,7 @@ object BinaryPropagation {
}

override def read(reader: ByteStreamReader): Context = {
if(reader.available() > 0) {
if (reader.available() > 0) {
val contextData = Try {
val cContext = new ColferContext()
cContext.unmarshal(reader.readAll(), 0)
Expand All @@ -165,7 +176,7 @@ object BinaryPropagation {

// Context tags
val tagsBuilder = Map.newBuilder[String, Any]
if(colferContext.tags != null) {
if (colferContext.tags != null) {
colferContext.tags.strings.foreach(t => tagsBuilder += (t.key -> t.value))
colferContext.tags.longs.foreach(t => tagsBuilder += (t.key -> t.value))
colferContext.tags.booleans.foreach(t => tagsBuilder += (t.key -> t.value))
Expand All @@ -186,117 +197,141 @@ object BinaryPropagation {
contextWithEntry
}.getOrElse(context)
}
} getOrElse(Context.Empty)
} getOrElse (Context.Empty)
} else Context.Empty
}

override def write(context: Context, writer: ByteStreamWriter): Unit = {
if (context.nonEmpty()) {
val contextData = new ColferContext()
val output = _streamPool.get()
val contextOutgoingBuffer = _contextBufferPool.get()

if(context.tags.nonEmpty() || settings.includeUpstreamName) {
val tagsData = new ColferTags()
val strings = Array.newBuilder[ColferStringTag]

if(context.tags.nonEmpty()) {
val longs = Array.newBuilder[ColferLongTag]
val booleans = Array.newBuilder[ColferBooleanTag]

context.tags.iterator().foreach {
case t: Tag.String =>
val st = new ColferStringTag()
st.setKey(t.key)
st.setValue(t.value)
strings += st

case t: Tag.Long =>
val lt = new ColferLongTag()
lt.setKey(t.key)
lt.setValue(t.value)
longs += lt

case t: Tag.Boolean =>
val bt = new ColferBooleanTag()
bt.setKey(t.key)
bt.setValue(t.value)
booleans += bt
}
val contextData: ColferContext = contextToColferContext(context)
val contextOutgoingBuffer: Array[Byte] = _contextBufferPool.get()

tagsData.setLongs(longs.result())
tagsData.setBooleans(booleans.result())
}
try {
val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
writer.write(contextOutgoingBuffer, 0, contextSize)
} catch {
case NonFatal(t) => _logger.warn("Failed to write Context to ByteStreamWriter", t)
}
}
}

override def storeContextInBuffer(context: Context, buffer: ByteBuffer): ByteBuffer = {
val binaryContext = contextToColferContext(context)

val outputBuffer: Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize)
var contextSize = 0
try {
contextSize = binaryContext.marshal(outputBuffer, 0)
} catch {
// not really sure what to do with these failures
// never triggered them though
case _: BufferOverflowException => println("*********Buffer overflow")
case _: IllegalStateException => println("*********Illegal state exception when deserializing my shit")
}
buffer.putInt(contextSize)
buffer.put(outputBuffer.slice(0, contextSize))
}

if(settings.includeUpstreamName) {
val st = new ColferStringTag()
st.setKey(TagKeys.UpstreamName)
st.setValue(Kamon.environment.service)
strings += st
private def contextToColferContext(context: Context): ColferContext = {
val contextData = new ColferContext()
val output = _streamPool.get()

if (context.tags.nonEmpty() || settings.includeUpstreamName) {
val tagsData = new ColferTags()
val strings = Array.newBuilder[ColferStringTag]

if (context.tags.nonEmpty()) {
val longs = Array.newBuilder[ColferLongTag]
val booleans = Array.newBuilder[ColferBooleanTag]

context.tags.iterator().foreach {
case t: Tag.String =>
val st = new ColferStringTag()
st.setKey(t.key)
st.setValue(t.value)
strings += st

case t: Tag.Long =>
val lt = new ColferLongTag()
lt.setKey(t.key)
lt.setValue(t.value)
longs += lt

case t: Tag.Boolean =>
val bt = new ColferBooleanTag()
bt.setKey(t.key)
bt.setValue(t.value)
booleans += bt
}

tagsData.setStrings(strings.result())
contextData.setTags(tagsData)
tagsData.setLongs(longs.result())
tagsData.setBooleans(booleans.result())
}

if (settings.includeUpstreamName) {
val st = new ColferStringTag()
st.setKey(TagKeys.UpstreamName)
st.setValue(Kamon.environment.service)
strings += st
}

val entriesBuilder = Array.newBuilder[ColferEntry]
context.entries().foreach { entry =>
settings.outgoingEntries.get(entry.key).foreach { entryWriter =>
val colferEntry = new ColferEntry()
try {
output.reset()
entryWriter.write(context, output)
tagsData.setStrings(strings.result())
contextData.setTags(tagsData)
}

colferEntry.key = entry.key
colferEntry.value = output.toByteArray()
} catch {
case NonFatal(t) => _logger.warn("Failed to write entry [{}]", entry.key.asInstanceOf[Any], t)
}

entriesBuilder += colferEntry
}
}
val entriesBuilder = Array.newBuilder[ColferEntry]
context.entries().foreach { entry =>
settings.outgoingEntries.get(entry.key).foreach { entryWriter =>
val colferEntry = new ColferEntry()
try {
output.reset()
entryWriter.write(context, output)

contextData.entries = entriesBuilder.result()
colferEntry.key = entry.key
colferEntry.value = output.toByteArray()
} catch {
case NonFatal(t) => _logger.warn("Failed to write entry [{}]", entry.key.asInstanceOf[Any], t)
}

try {
val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
writer.write(contextOutgoingBuffer, 0, contextSize)
} catch {
case NonFatal(t) => _logger.warn("Failed to write Context to ByteStreamWriter", t)
entriesBuilder += colferEntry
}
}

contextData.entries = entriesBuilder.result()
contextData
}
}

object Default {
private class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter {
class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter {
def underlying(): Array[Byte] = this.buf
}

}

case class Settings(
maxOutgoingSize: Int,
includeUpstreamName: Boolean,
incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]],
outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]]
)
maxOutgoingSize: Int,
includeUpstreamName: Boolean,
incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]],
outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]]
)

object Settings {
private val log = LoggerFactory.getLogger(classOf[BinaryPropagation.Settings])

def from(config: Config): BinaryPropagation.Settings = {
def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
def buildInstances[ExpectedType: ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
val instanceMap = Map.newBuilder[String, ExpectedType]

mappings.foreach {
case (contextKey, componentClass) =>
try {
instanceMap += (contextKey -> ClassLoading.createInstance[ExpectedType](componentClass, Nil))
} catch { case exception: Exception => log.warn("Failed to instantiate {} [{}] due to []",
implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
}
} catch {
case exception: Exception => log.warn("Failed to instantiate {} [{}] due to []",
implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
}
}

instanceMap.result()
Expand All @@ -310,4 +345,5 @@ object BinaryPropagation {
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ object Propagation {
*/
def write(context: Context, medium: Medium): Unit
}

}
4 changes: 3 additions & 1 deletion instrumentation/kamon-akka/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import sbt.Tests.{Group, SubProcess}
import Def.Initialize

val `Akka-2.4-version` = "2.4.20"
// latest is 2.5.9
val `Akka-2.5-version` = "2.5.26"
// latest is 2.6.10
val `Akka-2.6-version` = "2.6.0"

/**
Expand Down Expand Up @@ -156,4 +158,4 @@ test in Test := Def.taskDyn {
(test in `Test-Akka-2.5`).value
(test in `Test-Akka-2.6`).value
}
}.value
}.value
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package kamon.instrumentation.akka.instrumentations.akka_25.remote

import akka.actor.ActorSystem
import akka.remote.kamon.instrumentation.akka.instrumentations.akka_25.remote.{ArteryMessageDispatcherAdvice, CaptureContextOnInboundEnvelope, DeserializeForArteryAdvice, SerializeForArteryAdvice}
import akka.kamon.instrumentation.akka.instrumentations.akka_25.remote.{AkkaPduProtobufCodecConstructMessageMethodInterceptor, AkkaPduProtobufCodecDecodeMessage}
import akka.remote.kamon.instrumentation.akka.instrumentations.akka_25.remote.{ArteryMessageDispatcherAdvice, CaptureContextOnInboundEnvelope, DeserializeForArteryAdvice, SerializeForArteryAdvice}
import kamon.Kamon
import kamon.context.Storage
import kamon.context.Storage.Scope
import kamon.instrumentation.akka.AkkaRemoteInstrumentation
import kamon.instrumentation.akka.AkkaRemoteMetrics.SerializationInstruments
Expand Down Expand Up @@ -75,9 +74,7 @@ class RemotingInstrumentation extends InstrumentationBuilder with VersionFilteri

}


object CopyContextOnReusableEnvelope {

@Advice.OnMethodExit
def exit(@Advice.This oldEnvelope: Any, @Advice.Return newEnvelope: Any): Unit =
newEnvelope.asInstanceOf[HasContext].setContext(oldEnvelope.asInstanceOf[HasContext].context)
Expand Down Expand Up @@ -131,7 +128,6 @@ object MeasureSerializationTime {
def enter(): Long = {
if(AkkaRemoteInstrumentation.settings().trackSerializationMetrics) System.nanoTime() else 0L
}

@Advice.OnMethodExit
def exit(@Advice.Argument(0) system: AnyRef, @Advice.Enter startNanoTime: Long): Unit = {
if(startNanoTime != 0L) {
Expand Down
Loading

0 comments on commit 1a50fc0

Please sign in to comment.