Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka typed actor tag and auto-grouping changes #898

Merged
merged 4 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kamon.instrumentation.akka

import kamon.Kamon
import kamon.instrumentation.akka.instrumentations.ActorCellInfo
import kamon.metric.InstrumentGroup
import kamon.tag.TagSet

Expand Down Expand Up @@ -35,13 +36,14 @@ object AkkaMetrics {
description = "Counts the number of processing errors experienced by an Actor"
)

def forActor(path: String, system: String, dispatcher: String, actorClass: String): ActorInstruments =
new ActorInstruments(TagSet.builder()
def forActor(path: String, system: String, dispatcher: String, actorClass: Class[_]): ActorInstruments = {
val tags = TagSet.builder()
.add("path", path)
.add("system", system)
.add("dispatcher", dispatcher)
.add("class", actorClass)
.build())
if (!ActorCellInfo.isTyped(actorClass)) tags.add("class", actorClass.getName)
new ActorInstruments(tags.build())
}

class ActorInstruments(tags: TagSet) extends InstrumentGroup(tags) {
val timeInMailbox = register(ActorTimeInMailbox)
Expand Down Expand Up @@ -85,14 +87,16 @@ object AkkaMetrics {
description = "Counts the number of processing errors experienced by the routees of a router"
)

def forRouter(path: String, system: String, dispatcher: String, routerClass: String, routeeClass: String): RouterInstruments =
new RouterInstruments(TagSet.builder()
def forRouter(path: String, system: String, dispatcher: String, routerClass: Class[_], routeeClass: String): RouterInstruments = {
val tags = TagSet.builder()
.add("path", path)
.add("system", system)
.add("dispatcher", dispatcher)
.add("routerClass", routerClass)
.add("routeeClass", routeeClass)
.build())
if (!ActorCellInfo.isTyped(routerClass)) tags.add("routerClass", routerClass.getName)
SimunKaracic marked this conversation as resolved.
Show resolved Hide resolved
new RouterInstruments(tags.build())

}

class RouterInstruments(tags: TagSet) extends InstrumentGroup(tags) {
val routingTime = register(RouterRoutingTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ object ActorCellInfo {
}}
}

def isTyped(className: Class[_]): Boolean = {
simpleClassName(className) == "ActorAdapter"
}

private def hasRouterProps(props: Props): Boolean =
props.deploy.routerConfig != NoRouter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ object ActorMonitor {
val trackingGroups: Seq[ActorGroupInstruments] = if (cell.isRootSupervisor) List() else {
val configuredMatchingGroups = AkkaInstrumentation.matchingActorGroups(cell.path)

if (configuredMatchingGroups.isEmpty && !isTracked && settings.autoGrouping && !cell.isRouter && !cell.isRoutee) {
if (configuredMatchingGroups.isEmpty && !isTracked
&& settings.autoGrouping && !cell.isRouter
&& !cell.isRoutee && !ActorCellInfo.isTyped(cell.actorOrRouterClass)) {
if (!trackedFilter.excludes(cell.path) && Kamon.filter(TrackAutoGroupFilterName).accept(autoGroupingPath))
List(AkkaMetrics.forGroup(autoGroupingPath, system.name))
else
Expand Down Expand Up @@ -130,7 +132,7 @@ object ActorMonitor {
cellInfo.path,
cellInfo.systemName,
cellInfo.dispatcherName,
cellInfo.actorOrRouterClass.getName
cellInfo.actorOrRouterClass
))
}

Expand All @@ -149,8 +151,8 @@ object ActorMonitor {
cellInfo.path,
cellInfo.systemName,
cellInfo.dispatcherName,
cellInfo.actorOrRouterClass.getName,
cellInfo.routeeClass.map(_.getName).getOrElse("Unknown")
cellInfo.actorOrRouterClass,
cellInfo.routeeClass.filterNot(ActorCellInfo.isTyped).map(_.getName).getOrElse("Unknown")
)

new TrackedRoutee(routerMetrics, groupMetrics, cellInfo)
Expand All @@ -177,8 +179,6 @@ object ActorMonitor {
* Wraps another ActorMonitor implementation and provides tracing capabilities on top of it.
*/
class TracedMonitor(cellInfo: ActorCellInfo, startsTrace: Boolean, monitor: ActorMonitor) extends ActorMonitor {
private val _actorClassName = cellInfo.actorOrRouterClass.getName
private val _actorSimpleClassName = ActorCellInfo.simpleClassName(cellInfo.actorOrRouterClass)

override def captureEnvelopeTimestamp(): Long =
monitor.captureEnvelopeTimestamp()
Expand Down Expand Up @@ -225,23 +225,24 @@ object ActorMonitor {
val messageClass = ActorCellInfo.simpleClassName(envelope.message.getClass)
val parentSpan = context.get(Span.Key)

Kamon.internalSpanBuilder(operationName(messageClass, envelope.sender), "akka.actor")
val spanBuilder = Kamon.internalSpanBuilder(operationName(messageClass, envelope.sender), "akka.actor")
.asChildOf(parentSpan)
.doNotTrackMetrics()
.tag("akka.system", cellInfo.systemName)
.tag("akka.actor.path", cellInfo.path)
.tag("akka.actor.class", _actorClassName)
.tag("akka.actor.message-class", messageClass)
.delay(Kamon.clock().toInstant(envelopeTimestamp))
if (!ActorCellInfo.isTyped(cellInfo.actorOrRouterClass)) {
spanBuilder.tag("akka.actor.class", cellInfo.actorOrRouterClass.getName)
}
spanBuilder.delay(Kamon.clock().toInstant(envelopeTimestamp))
}

private def operationName(messageClass: String, sender: ActorRef): String = {
val operationType = if(AkkaPrivateAccess.isPromiseActorRef(sender)) "ask(" else "tell("
val operationType = if(AkkaPrivateAccess.isPromiseActorRef(sender)) "ask" else "tell"

StringBuilder.newBuilder
.append(operationType)
.append(_actorSimpleClassName)
.append(", ")
.append("(")
.append(messageClass)
.append(")")
.result()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ object RouterMonitor {
cell.path,
cell.systemName,
cell.dispatcherName,
cell.actorOrRouterClass.getName,
cell.routeeClass.map(_.getName).getOrElse("Unknown")
cell.actorOrRouterClass,
cell.routeeClass.filterNot(ActorCellInfo.isTyped).map(_.getName).getOrElse("Unknown")
)
)
else NoOpRouterMonitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
spanTags("component") shouldBe "akka.actor"
span.operationName shouldBe("tell(TracingTestActor, String)")
span.operationName shouldBe("tell(String)")
spanTags("akka.actor.path") shouldNot include ("filteredout")
spanTags("akka.actor.path") should be ("MessageTracing/user/traced-probe-1")
}
Expand All @@ -48,7 +48,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
eventually(timeout(2 seconds)) {
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.operationName shouldBe("tell(TracingTestActor, String)")
span.operationName shouldBe("tell(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced"
Expand All @@ -62,7 +62,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
eventually(timeout(2 seconds)) {
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.operationName shouldBe("ask(TracingTestActor, String)")
span.operationName shouldBe("ask(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced"
Expand All @@ -83,7 +83,6 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _

span.operationName should include("tell(TracingTestActor")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-first"
Expand All @@ -97,7 +96,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.parentId shouldBe firstSpanID
span.operationName should include("tell(TracingTestActor, String)")
span.operationName should include("tell(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-second"
Expand All @@ -118,7 +117,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val firstSpanID = eventually(timeout(2 seconds)) {
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.operationName shouldBe("tell(TracingTestActor, Tuple2)")
span.operationName shouldBe("tell(Tuple2)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-chain-first"
Expand All @@ -133,7 +132,7 @@ class MessageTracingSpec extends TestKit(ActorSystem("MessageTracing")) with Wor
val span = testSpanReporter.nextSpan().value
val spanTags = stringTag(span) _
span.parentId shouldBe firstSpanID
span.operationName shouldBe("tell(TracingTestActor, String)")
span.operationName shouldBe("tell(String)")
spanTags("component") shouldBe "akka.actor"
spanTags("akka.system") shouldBe "MessageTracing"
spanTags("akka.actor.path") shouldBe "MessageTracing/user/traced-chain-last"
Expand Down Expand Up @@ -199,4 +198,4 @@ class TracingTestActor extends Actor {
Thread.sleep(50)
sender ! "pong"
}
}
}