Skip to content

Commit

Permalink
Merge pull request #898 from SimunKaracic/akka-typed-instrumentation
Browse files Browse the repository at this point in the history
Akka typed actor tag and auto-grouping changes
  • Loading branch information
SimunKaracic authored Nov 24, 2020
2 parents 2e7ac74 + e0879d4 commit bcfc41e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kamon.instrumentation.akka

import kamon.Kamon
import kamon.instrumentation.akka.instrumentations.ActorCellInfo
import kamon.metric.InstrumentGroup
import kamon.tag.TagSet

Expand Down Expand Up @@ -35,13 +36,14 @@ object AkkaMetrics {
description = "Counts the number of processing errors experienced by an Actor"
)

def forActor(path: String, system: String, dispatcher: String, actorClass: String): ActorInstruments =
new ActorInstruments(TagSet.builder()
def forActor(path: String, system: String, dispatcher: String, actorClass: Class[_]): ActorInstruments = {
val tags = TagSet.builder()
.add("path", path)
.add("system", system)
.add("dispatcher", dispatcher)
.add("class", actorClass)
.build())
if (!ActorCellInfo.isTyped(actorClass)) tags.add("class", actorClass.getName)
new ActorInstruments(tags.build())
}

class ActorInstruments(tags: TagSet) extends InstrumentGroup(tags) {
val timeInMailbox = register(ActorTimeInMailbox)
Expand Down Expand Up @@ -85,14 +87,16 @@ object AkkaMetrics {
description = "Counts the number of processing errors experienced by the routees of a router"
)

def forRouter(path: String, system: String, dispatcher: String, routerClass: String, routeeClass: String): RouterInstruments =
new RouterInstruments(TagSet.builder()
def forRouter(path: String, system: String, dispatcher: String, routerClass: Class[_], routeeClass: String): RouterInstruments = {
val tags = TagSet.builder()
.add("path", path)
.add("system", system)
.add("dispatcher", dispatcher)
.add("routerClass", routerClass)
.add("routeeClass", routeeClass)
.build())
if (!ActorCellInfo.isTyped(routerClass)) tags.add("routerClass", routerClass.getName)
new RouterInstruments(tags.build())

}

class RouterInstruments(tags: TagSet) extends InstrumentGroup(tags) {
val routingTime = register(RouterRoutingTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ object ActorCellInfo {
}}
}

def isTyped(className: Class[_]): Boolean = {
simpleClassName(className) == "ActorAdapter"
}

private def hasRouterProps(props: Props): Boolean =
props.deploy.routerConfig != NoRouter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ object ActorMonitor {
val trackingGroups: Seq[ActorGroupInstruments] = if (cell.isRootSupervisor) List() else {
val configuredMatchingGroups = AkkaInstrumentation.matchingActorGroups(cell.path)

if (configuredMatchingGroups.isEmpty && !isTracked && settings.autoGrouping && !cell.isRouter && !cell.isRoutee) {
if (configuredMatchingGroups.isEmpty && !isTracked
&& settings.autoGrouping && !cell.isRouter
&& !cell.isRoutee && !ActorCellInfo.isTyped(cell.actorOrRouterClass)) {
if (!trackedFilter.excludes(cell.path) && Kamon.filter(TrackAutoGroupFilterName).accept(autoGroupingPath))
List(AkkaMetrics.forGroup(autoGroupingPath, system.name))
else
Expand Down Expand Up @@ -130,7 +132,7 @@ object ActorMonitor {
cellInfo.path,
cellInfo.systemName,
cellInfo.dispatcherName,
cellInfo.actorOrRouterClass.getName
cellInfo.actorOrRouterClass
))
}

Expand All @@ -149,8 +151,8 @@ object ActorMonitor {
cellInfo.path,
cellInfo.systemName,
cellInfo.dispatcherName,
cellInfo.actorOrRouterClass.getName,
cellInfo.routeeClass.map(_.getName).getOrElse("Unknown")
cellInfo.actorOrRouterClass,
cellInfo.routeeClass.filterNot(ActorCellInfo.isTyped).map(_.getName).getOrElse("Unknown")
)

new TrackedRoutee(routerMetrics, groupMetrics, cellInfo)
Expand All @@ -177,8 +179,6 @@ object ActorMonitor {
* Wraps another ActorMonitor implementation and provides tracing capabilities on top of it.
*/
class TracedMonitor(cellInfo: ActorCellInfo, startsTrace: Boolean, monitor: ActorMonitor) extends ActorMonitor {
private val _actorClassName = cellInfo.actorOrRouterClass.getName
private val _actorSimpleClassName = ActorCellInfo.simpleClassName(cellInfo.actorOrRouterClass)

override def captureEnvelopeTimestamp(): Long =
monitor.captureEnvelopeTimestamp()
Expand Down Expand Up @@ -225,23 +225,24 @@ object ActorMonitor {
val messageClass = ActorCellInfo.simpleClassName(envelope.message.getClass)
val parentSpan = context.get(Span.Key)

Kamon.internalSpanBuilder(operationName(messageClass, envelope.sender), "akka.actor")
val spanBuilder = Kamon.internalSpanBuilder(operationName(messageClass, envelope.sender), "akka.actor")
.asChildOf(parentSpan)
.doNotTrackMetrics()
.tag("akka.system", cellInfo.systemName)
.tag("akka.actor.path", cellInfo.path)
.tag("akka.actor.class", _actorClassName)
.tag("akka.actor.message-class", messageClass)
.delay(Kamon.clock().toInstant(envelopeTimestamp))
if (!ActorCellInfo.isTyped(cellInfo.actorOrRouterClass)) {
spanBuilder.tag("akka.actor.class", cellInfo.actorOrRouterClass.getName)
}
spanBuilder.delay(Kamon.clock().toInstant(envelopeTimestamp))
}

private def operationName(messageClass: String, sender: ActorRef): String = {
val operationType = if(AkkaPrivateAccess.isPromiseActorRef(sender)) "ask(" else "tell("
val operationType = if(AkkaPrivateAccess.isPromiseActorRef(sender)) "ask" else "tell"

StringBuilder.newBuilder
.append(operationType)
.append(_actorSimpleClassName)
.append(", ")
.append("(")
.append(messageClass)
.append(")")
.result()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ object RouterMonitor {
cell.path,
cell.systemName,
cell.dispatcherName,
cell.actorOrRouterClass.getName,
cell.routeeClass.map(_.getName).getOrElse("Unknown")
cell.actorOrRouterClass,
cell.routeeClass.filterNot(ActorCellInfo.isTyped).map(_.getName).getOrElse("Unknown")
)
)
else NoOpRouterMonitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
spanTags("component") shouldBe "akka.actor"
span.operationName shouldBe("tell(TracingTestActor, String)")
span.operationName shouldBe("tell(String)")
spanTags("akka.actor.path") shouldNot include ("filteredout")
spanTags("akka.actor.path") should be ("MessageTracing/user/traced-probe-1")
}
Expand All @@ -48,7 +48,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
eventually(timeout(2 seconds)) {
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.operationName shouldBe("tell(TracingTestActor, String)")
span.operationName shouldBe("tell(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced"
Expand All @@ -62,7 +62,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
eventually(timeout(2 seconds)) {
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.operationName shouldBe("ask(TracingTestActor, String)")
span.operationName shouldBe("ask(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced"
Expand All @@ -83,7 +83,6 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _

span.operationName should include("tell(TracingTestActor")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-first"
Expand All @@ -97,7 +96,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.parentId shouldBe firstSpanID
span.operationName should include("tell(TracingTestActor, String)")
span.operationName should include("tell(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-second"
Expand All @@ -118,7 +117,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val firstSpanID = eventually(timeout(2 seconds)) {
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.operationName shouldBe("tell(TracingTestActor, Tuple2)")
span.operationName shouldBe("tell(Tuple2)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-chain-first"
Expand All @@ -133,7 +132,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.parentId shouldBe firstSpanID
span.operationName shouldBe("tell(TracingTestActor, String)")
span.operationName shouldBe("tell(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-chain-last"
Expand Down Expand Up @@ -199,4 +198,4 @@ class TracingTestActor extends Actor {
Thread.sleep(50)
sender ! "pong"
}
}
}

0 comments on commit bcfc41e

Please sign in to comment.