Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KamonRemoteInstrument #917

Merged
merged 12 commits into from
Jan 20, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package kamon
package context

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}

import com.typesafe.config.Config
import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry, Tags => ColferTags}
import kamon.context.generated.binary.context.{BooleanTag => ColferBooleanTag, LongTag => ColferLongTag, StringTag => ColferStringTag}
import kamon.tag.{Tag, TagSet}
import kamon.trace.Span.TagKeys
import org.slf4j.LoggerFactory

import java.nio.ByteBuffer
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import scala.util.Try
Expand Down Expand Up @@ -123,6 +123,20 @@ object BinaryPropagation {
override def write(byte: Int): Unit =
outputStream.write(byte)
}

/**
* Creates a new [[ByteStreamWriter]] from a ByteBuffer.
*/
def of(byteBuffer: ByteBuffer): ByteStreamWriter = new ByteStreamWriter {
override def write(bytes: Array[Byte]): Unit =
byteBuffer.put(bytes)

override def write(bytes: Array[Byte], offset: Int, count: Int): Unit =
byteBuffer.put(bytes, offset, count)

override def write(byte: Int): Unit =
byteBuffer.put(byte.toByte)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,5 @@ object Propagation {
*/
def write(context: Context, medium: Medium): Unit
}

}
4 changes: 2 additions & 2 deletions instrumentation/kamon-akka/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Def.Initialize

val `Akka-2.4-version` = "2.4.20"
val `Akka-2.5-version` = "2.5.26"
val `Akka-2.6-version` = "2.6.0"
val `Akka-2.6-version` = "2.6.11"

/**
* Compile Configurations
Expand Down Expand Up @@ -156,4 +156,4 @@ test in Test := Def.taskDyn {
(test in `Test-Akka-2.5`).value
(test in `Test-Akka-2.6`).value
}
}.value
}.value
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package kamon.instrumentation.akka.instrumentations.akka_25.remote

import akka.actor.ActorSystem
import akka.remote.kamon.instrumentation.akka.instrumentations.akka_25.remote.{ArteryMessageDispatcherAdvice, CaptureContextOnInboundEnvelope, DeserializeForArteryAdvice, SerializeForArteryAdvice}
import akka.kamon.instrumentation.akka.instrumentations.akka_25.remote.{AkkaPduProtobufCodecConstructMessageMethodInterceptor, AkkaPduProtobufCodecDecodeMessage}
import akka.remote.kamon.instrumentation.akka.instrumentations.akka_25.remote.{ArteryMessageDispatcherAdvice, CaptureContextOnInboundEnvelope, DeserializeForArteryAdvice, SerializeForArteryAdvice}
import kamon.Kamon
import kamon.context.Storage
import kamon.context.Storage.Scope
import kamon.instrumentation.akka.AkkaRemoteInstrumentation
import kamon.instrumentation.akka.AkkaRemoteMetrics.SerializationInstruments
Expand Down Expand Up @@ -75,9 +74,7 @@ class RemotingInstrumentation extends InstrumentationBuilder with VersionFilteri

}


object CopyContextOnReusableEnvelope {

@Advice.OnMethodExit
def exit(@Advice.This oldEnvelope: Any, @Advice.Return newEnvelope: Any): Unit =
newEnvelope.asInstanceOf[HasContext].setContext(oldEnvelope.asInstanceOf[HasContext].context)
Expand Down Expand Up @@ -131,7 +128,6 @@ object MeasureSerializationTime {
def enter(): Long = {
if(AkkaRemoteInstrumentation.settings().trackSerializationMetrics) System.nanoTime() else 0L
}

@Advice.OnMethodExit
def exit(@Advice.Argument(0) system: AnyRef, @Advice.Enter startNanoTime: Long): Unit = {
if(startNanoTime != 0L) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package akka.remote.artery

import akka.actor.{ActorRef, ExtendedActorSystem}
import kamon.Kamon
import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
import kamon.instrumentation.akka.AkkaRemoteMetrics
import kamon.instrumentation.context.HasContext
import kanela.agent.libs.net.bytebuddy.asm.Advice
import org.slf4j.LoggerFactory

import java.nio.ByteBuffer
import scala.util.control.NonFatal

class KamonRemoteInstrument(system: ExtendedActorSystem) extends RemoteInstrument {
private val logger = LoggerFactory.getLogger(classOf[KamonRemoteInstrument])
private val lengthMask: Int = ~(31 << 26)
private val serializationInstruments = AkkaRemoteMetrics.serializationInstruments(system.name)

override def identifier: Byte = 8

override def serializationTimingEnabled: Boolean = true

override def remoteWriteMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = {
val currentContext = Kamon.currentContext()
if (currentContext.nonEmpty()) {
Kamon.defaultBinaryPropagation().write(currentContext, ByteStreamWriter.of(buffer))
}
}

override def remoteReadMetadata(recipient: ActorRef, message: Object, sender: ActorRef, buffer: ByteBuffer): Unit = {
def getLength(kl: Int): Int = kl & lengthMask

try {

// We need to figure out the length of the incoming Context before passing it to BinaryPropagation and
// the only way we can do so at this point is to go back a few positions on `buffer` to read the key/length
// Integer stored by Akka and figure out the length from there.
val keyLength = buffer.getInt(buffer.position() - 4)
val contextLength = getLength(keyLength)
val contextData = Array.ofDim[Byte](contextLength)
buffer.get(contextData)

val incomingContext = Kamon.defaultBinaryPropagation().read(ByteStreamReader.of(contextData))

Option(CaptureCurrentInboundEnvelope.CurrentInboundEnvelope.get())
.foreach(_.asInstanceOf[HasContext].setContext(incomingContext))

} catch {
case NonFatal(t) =>
logger.warn("Failed to deserialized incoming Context", t)
}
}

override def remoteMessageSent(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = {
serializationInstruments.outboundMessageSize.record(size)
serializationInstruments.serializationTime.record(time)
}

override def remoteMessageReceived(recipient: ActorRef, message: Object, sender: ActorRef, size: Int, time: Long): Unit = {
serializationInstruments.inboundMessageSize.record(size)
serializationInstruments.deserializationTime.record(time)
}

/**
* Creates a new [[ByteStreamWriter]] from a ByteBuffer.
*/
def of(byteBuffer: ByteBuffer): ByteStreamWriter = new ByteStreamWriter {
override def write(bytes: Array[Byte]): Unit =
byteBuffer.put(bytes)

override def write(bytes: Array[Byte], offset: Int, count: Int): Unit =
byteBuffer.put(bytes, offset, count)

override def write(byte: Int): Unit =
byteBuffer.put(byte.toByte)
}
}

class CaptureCurrentInboundEnvelope

object CaptureCurrentInboundEnvelope {

val CurrentInboundEnvelope = new ThreadLocal[InboundEnvelope]() {
override def initialValue(): InboundEnvelope = null
}

@Advice.OnMethodEnter
def enter(@Advice.Argument(0) inboundEnvelope: InboundEnvelope): Unit = {
CurrentInboundEnvelope.set(inboundEnvelope)
}

@Advice.OnMethodExit
def exit(): Unit = {
CurrentInboundEnvelope.remove()
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kamon.instrumentation.akka.instrumentations.akka_26

import akka.actor.WrappedMessage
import akka.dispatch.Envelope
import akka.remote.artery.KamonRemoteInstrument
import kamon.instrumentation.akka.instrumentations.{ActorCellInfo, VersionFiltering}
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation.Argument
import org.slf4j.LoggerFactory

import scala.util.control.NonFatal

class ActorMonitorInstrumentation extends InstrumentationBuilder with VersionFiltering {

onAkka("2.6") {
/*
* Changes implementation of extractMessageClass for our ActorMonitor.
* In akka 2.6, all typed messages are converted to AdaptMessage,
* so we're forced to extract the original message type.
*/
onSubTypesOf("kamon.instrumentation.akka.instrumentations.ActorMonitor")
.intercept(method("extractMessageClass"), MessageClassAdvice)
}
}

class MessageClassAdvice
object MessageClassAdvice {
private val logger = LoggerFactory.getLogger(classOf[MessageClassAdvice])

def extractMessageClass(@Argument(0) envelope: Envelope): String = {
try {
envelope.message match {
case message: WrappedMessage => ActorCellInfo.simpleClassName(message.message.getClass)
case _ => ActorCellInfo.simpleClassName(envelope.message.getClass)
}
} catch {
// NoClassDefFound is thrown in early versions of akka 2.6
// so we can safely fallback to the original method
case _: NoClassDefFoundError =>
ActorCellInfo.simpleClassName(envelope.message.getClass)
case NonFatal(e) =>
logger.info(s"Expected NoClassDefFoundError, got: ${e}")
ActorCellInfo.simpleClassName(envelope.message.getClass)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package kamon.instrumentation.akka.instrumentations.akka_26.remote
import akka.actor.ActorSystem
import akka.remote.kamon.instrumentation.akka.instrumentations.akka_26.remote.{CaptureContextOnInboundEnvelope, DeserializeForArteryAdvice, SerializeForArteryAdvice}
import akka.kamon.instrumentation.akka.instrumentations.akka_26.remote.internal.{AkkaPduProtobufCodecConstructMessageMethodInterceptor, AkkaPduProtobufCodecDecodeMessage}
import akka.remote.artery.CaptureCurrentInboundEnvelope
import kamon.Kamon
import kamon.context.Storage
import kamon.context.{Context, Storage}
import kamon.context.Storage.Scope
import kamon.instrumentation.akka.AkkaRemoteInstrumentation
import kamon.instrumentation.akka.AkkaRemoteMetrics.SerializationInstruments
Expand Down Expand Up @@ -46,11 +47,6 @@ class RemotingInstrumentation extends InstrumentationBuilder with VersionFilteri
.mixin(classOf[HasSerializationInstruments.Mixin])
.advise(isConstructor, InitializeActorSystemAdvice)

onType("akka.remote.MessageSerializer$")
.advise(method("serialize"), MeasureSerializationTime)
.advise(method("deserialize"), MeasureDeserializationTime)


/**
* Artery
*/
Expand All @@ -61,13 +57,11 @@ class RemotingInstrumentation extends InstrumentationBuilder with VersionFilteri
onType("akka.remote.artery.Association")
.advise(method("createOutboundEnvelope$1"), CaptureCurrentContextOnReusableEnvelope)

onType("akka.remote.MessageSerializer$")
.advise(method("serializeForArtery"), classOf[SerializeForArteryAdvice])
.advise(method("deserializeForArtery"), classOf[DeserializeForArteryAdvice])
onType("akka.remote.artery.RemoteInstruments")
.advise(method("deserialize"), classOf[CaptureCurrentInboundEnvelope])

onType("akka.remote.artery.ReusableInboundEnvelope")
.mixin(classOf[HasContext.Mixin])
.advise(method("withMessage"), classOf[CaptureContextOnInboundEnvelope])
.advise(method("copyForLane"), CopyContextOnReusableEnvelope)

onType("akka.remote.artery.MessageDispatcher")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,4 @@ object CaptureContextOnInboundEnvelope {
}
}

}
}
19 changes: 11 additions & 8 deletions instrumentation/kamon-akka/src/common/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ kamon.instrumentation.akka {
shard-metrics-sample-interval = ${kamon.metric.tick-interval}
}
}

# Signals to akka that it should load KamonRemoteInstrument
akka.remote.artery.advanced.instruments += "akka.remote.artery.KamonRemoteInstrument"
kanela.modules {

akka {
Expand All @@ -140,16 +141,18 @@ kanela.modules {
"kamon.instrumentation.akka.instrumentations.AskPatternInstrumentation",
"kamon.instrumentation.akka.instrumentations.EventStreamInstrumentation",
"kamon.instrumentation.akka.instrumentations.ActorRefInstrumentation",
"kamon.instrumentation.akka.instrumentations.akka_25.DispatcherInstrumentation"
"kamon.instrumentation.akka.instrumentations.akka_26.DispatcherInstrumentation"
"kamon.instrumentation.akka.instrumentations.akka_25.DispatcherInstrumentation",
"kamon.instrumentation.akka.instrumentations.akka_26.DispatcherInstrumentation",
"kamon.instrumentation.akka.instrumentations.akka_26.ActorMonitorInstrumentation"
]

within = [
"^akka.dispatch..*",
"^akka.event..*",
"^akka.actor..*",
"^akka.pattern..*",
"^akka.routing..*"
"^akka.routing..*",
"kamon.instrumentation.akka.instrumentations..*"
]
}

Expand All @@ -160,16 +163,16 @@ kanela.modules {

instrumentations = [
"kamon.instrumentation.akka.remote.MessageBufferInstrumentation",
"kamon.instrumentation.akka.instrumentations.akka_25.remote.RemotingInstrumentation"
"kamon.instrumentation.akka.instrumentations.akka_25.remote.RemotingInstrumentation",
"kamon.instrumentation.akka.instrumentations.akka_26.remote.RemotingInstrumentation"
]

within = [
"akka.dispatch..*",
"akka.util..*",
"akka.remote..*",
"akka.actor..*"
"akka.cluster..*"
"akka.actor..*",
"akka.cluster..*",
"akka.serialization..*"
]
}
Expand All @@ -192,4 +195,4 @@ kanela.modules {
"akka.serialization..*"
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ object ActorMonitor {
override def cleanup(): Unit =
monitor.cleanup()

private def extractMessageClass(envelope: Envelope): String = {
ActorCellInfo.simpleClassName(envelope.message.getClass)
}

private def buildSpan(cellInfo: ActorCellInfo, context: Context, envelopeTimestamp: Long, envelope: Envelope): Span.Delayed = {
val messageClass = ActorCellInfo.simpleClassName(envelope.message.getClass)
val messageClass = extractMessageClass(envelope)
val parentSpan = context.get(Span.Key)

val spanBuilder = Kamon.internalSpanBuilder(operationName(messageClass, envelope.sender), "akka.actor")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,39 @@ trait VersionFiltering {
* Runs the code block if a version of Akka starting with the provided version is known to be present.
*/
def onAkka(version: String*)(block: => Unit): Unit = {
if(akkaVersion().filter(av => version.exists(v => av.startsWith(v))).isDefined)
if(akkaVersion().exists(av => version.exists(v => av.startsWith(v))))
block
}

/**
* Runs the code block if compatible Akka version is known to be present.
* example major: 2.6, minor: 8 would run the code block on versions
* 2.6.0-2.6.8
*/
def untilAkkaVersion(major: String, minor: Int)(block: => Unit): Unit = {
val version = akkaVersion()
val lastSeparator = version.map(_.lastIndexOf('.'))
val versions: Option[(String, String)] = version.map(_.splitAt(lastSeparator.getOrElse(0)))

if(versions.exists(x => x._1.startsWith(major) && minor >= x._2.drop(1).toInt)) {
block
}
}

/**
* Runs the code block if compatible Akka version is known to be present.
* example major: 2.6, minor: 8 would run the code block on versions 2.6.8 and higher
*/
def afterAkkaVersion(major: String, minor: Int)(block: => Unit): Unit = {
val version = akkaVersion()
val lastSeparator = version.map(_.lastIndexOf('.'))
val versions: Option[(String, String)] = version.map(_.splitAt(lastSeparator.getOrElse(0)))

if(versions.exists(x => x._1.startsWith(major) && minor <= x._2.drop(1).toInt)) {
block
}
}

// This should only succeed when Akka is on the classpath.
private def akkaVersion(): Option[String] = {
try {
Expand Down
Loading