Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kamon-pekko instrumentation #1264

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,15 @@ lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http")
),
)).dependsOn(`kamon-akka`, `kamon-testkit` % "test")


lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.dependsOn(
`kamon-scala-future` % "compile",
`kamon-testkit` % "test"
)
lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc"))
.enablePlugins(JavaAgent, AkkaGrpcPlugin)
.disablePlugins(AssemblyPlugin)
Expand Down
37 changes: 37 additions & 0 deletions instrumentation/kamon-pekko/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// The Common configuration should always depend on the latest version of Pekko. All code in the Common configuration
// should be source compatible with all Pekko versions.
inConfig(Compile)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
))

val pekkoVersion = "1.0.1"
libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else Seq(
kanelaAgent,
scalatest % Test,
logbackClassic % Test,
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion,
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion,
"org.apache.pekko" %% "pekko-remote" % pekkoVersion,
"org.apache.pekko" %% "pekko-cluster" % pekkoVersion,
"org.apache.pekko" %% "pekko-cluster-sharding" % pekkoVersion,
"org.apache.pekko" %% "pekko-protobuf" % pekkoVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test
)}

exportJars := true

/**
* Test-related settings
*/

lazy val baseTestSettings = Seq(
fork := true,
parallelExecution := false,
javaOptions := (Test / javaOptions).value,
dependencyClasspath += (Compile / packageBin).value
)

inConfig(Test)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
))
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kamon.instrumentation.pekko.instrumentations;

import org.apache.pekko.dispatch.Envelope;
import kamon.context.Context;
import kamon.instrumentation.context.HasContext;
import kamon.instrumentation.context.HasTimestamp;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

final public class ActorCellInvokeAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(
@Advice.This Object cell,
@Advice.Argument(0) Object envelope,
@Advice.Local("stateFromStart") Object stateFromStart,
@Advice.Local("processingStartTimestamp") Long processingStartTimestamp,
@Advice.Local("envelopeTimestamp") Long envelopeTimestamp,
@Advice.Local("context") Context context) {

final ActorMonitor actorMonitor = ((HasActorMonitor) cell).actorMonitor();

processingStartTimestamp = actorMonitor.captureProcessingStartTimestamp();
context = ((HasContext) envelope).context();
envelopeTimestamp = ((HasTimestamp) envelope).timestamp();
stateFromStart = actorMonitor.onMessageProcessingStart(context, envelopeTimestamp, (Envelope) envelope);
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void exit(
@Advice.This Object cell,
@Advice.Local("stateFromStart") Object stateFromStart,
@Advice.Local("processingStartTimestamp") Long processingStartTimestamp,
@Advice.Local("envelopeTimestamp") Long envelopeTimestamp,
@Advice.Local("context") Context context) {

final ActorMonitor actorMonitor = ((HasActorMonitor) cell).actorMonitor();
actorMonitor.onMessageProcessingEnd(context, envelopeTimestamp, processingStartTimestamp, stateFromStart);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package kamon.instrumentation.pekko.instrumentations;

import org.apache.pekko.actor.*;
import org.apache.pekko.dispatch.Mailbox;
import org.apache.pekko.dispatch.sysmsg.SystemMessage;
import org.apache.pekko.pattern.PromiseActorRef;
import org.apache.pekko.routing.RoutedActorCell;
import org.apache.pekko.routing.RoutedActorRef;
import scala.Option;

/**
* This class exposes access to several private[pekko] members that wouldn't be visible from the Scala codebase.
*/
public class PekkoPrivateAccess {

public static boolean isSystemMessage(Object message) {
return message instanceof SystemMessage;
}

public static boolean isPromiseActorRef(ActorRef ref) {
return ref instanceof PromiseActorRef;
}

public static boolean isInternalAndActiveActorRef(ActorRef target) {
return target != null && target instanceof InternalActorRef && !((InternalActorRef) target).isTerminated();
}

public static boolean isRoutedActorRef(ActorRef target) {
return target instanceof RoutedActorRef;
}

public static boolean isRoutedActorCell(Object cell) {
return cell instanceof RoutedActorCell;
}

public static boolean isUnstartedActorCell(Object cell) {
return cell instanceof UnstartedCell;
}

public static Class<?> unstartedActorCellClass() {
return UnstartedCell.class;
}

public static boolean isDeadLettersMailbox(Object cell, Object mailbox) {
final ActorCell actorCell = (ActorCell) cell;
return mailbox == actorCell.dispatcher().mailboxes().deadLetterMailbox();
}

public static long mailboxMessageCount(Object mailbox) {
return ((Mailbox) mailbox).numberOfMessages();
}

public static Option<Props> cellProps(Object cell) {
if(cell != null && cell instanceof Cell)
return Option.apply(((Cell) cell).props());
else
return Option.empty();
}

public static Option<Deploy> lookupDeploy(ActorPath path, ActorSystem system) {
final Deployer deployer = new Deployer(system.settings(), ((ExtendedActorSystem) system).dynamicAccess());
return deployer.lookup(path.$div("$a"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* =========================================================================================
* Copyright © 2013-2022 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.instrumentation.pekko.instrumentations;

import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class SchedulerRunnableAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
runnable = new ContextAwareRunnable(Kamon.currentContext(), runnable);
}

public static class ContextAwareRunnable implements Runnable {
private final Context context;
private final Runnable underlyingRunnable;

public ContextAwareRunnable(Context context, Runnable underlyingRunnable) {
this.context = context;
this.underlyingRunnable = underlyingRunnable;
}

@Override
public void run() {
final Storage.Scope scope = Kamon.storeContext(context);

try {
underlyingRunnable.run();
} finally {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.pekko.actor.instrumentation;

import org.apache.pekko.actor.Cell;
import org.apache.pekko.actor.UnstartedCell;
import org.apache.pekko.actor.instrumentation.CellWrapper;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class ReplaceWithAdvice {

@Advice.OnMethodEnter()
public static Cell enter(@Advice.Argument(value = 0, readOnly = false) Cell cell) {
Cell originalCell = cell;
cell = new CellWrapper(cell);
return originalCell;
}

@Advice.OnMethodExit()
public static void exit(@Advice.This UnstartedCell self, @Advice.Enter Cell originalCell) {
self.self().swapCell(originalCell);
}
}