Skip to content

Commit

Permalink
Kanela support (#6) (#7)
Browse files Browse the repository at this point in the history
* Context Propagation Throught Kanela instrumentation for ExecutorService(submit|execute|invokeAll|invokeAny)`
* JMH Instrumentation Benchmark module
* remove akka.dispatch package from instrumentation
* * Introduce Runnable|Callable module
* * Disable All Intrumentations by default
* Kanela instrumentation + update to kamon 1.2.0-M1 (#9)
* update to kamon-core 1.2.0-M1, latest kamon-sbt-umbrella and sbt 1.2.6
  • Loading branch information
dpsoft committed Nov 27, 2018
1 parent ba47b14 commit bec8359
Show file tree
Hide file tree
Showing 20 changed files with 611 additions and 27 deletions.
48 changes: 35 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* =========================================================================================
* Copyright © 2013-2016 the kamon project <http://kamon.io/>
* Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -12,21 +12,43 @@
* and limitations under the License.
* =========================================================================================
*/
scalaVersion := "2.11.8"
crossScalaVersions := Seq("2.10.6", "2.11.8", "2.12.2")

resolvers += Resolver.bintrayRepo("kamon-io", "snapshots")
val kamonCore = "io.kamon" %% "kamon-core" % "1.0.1"
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "1.0.1"
val kamonCore = "io.kamon" %% "kamon-core" % "1.2.0-M1"
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "1.2.0-M1"

val kanelaScala = "io.kamon" %% "kanela-scala-extension" % "0.0.14"

val guava = "com.google.guava" % "guava" % "24.1-jre"

lazy val root = (project in file("."))
.settings(name := "kamon-executors")
.settings(aspectJSettings: _*)
.settings(noPublishing: _*)
.aggregate(executors, benchmark)


val commonSettings = Seq(
scalaVersion := "2.12.6",
resolvers += Resolver.mavenLocal,
crossScalaVersions := Seq("2.12.6", "2.11.12", "2.10.7")
)

lazy val executors = (project in file("kamon-executors"))
.enablePlugins(JavaAgent)
.settings(moduleName := "kamon-executors")
.settings(commonSettings: _*)
.settings(javaAgents += "io.kamon" % "kanela-agent" % "0.0.15" % "compile;test")
.settings(
libraryDependencies ++=
compileScope(kamonCore) ++
testScope(scalatest, kamonTestkit) ++
providedScope(aspectJ)
libraryDependencies ++=
compileScope(kamonCore, kanelaScala) ++
testScope(scalatest, logbackClassic, kamonTestkit, guava)
)

fork := true
lazy val benchmark = (project in file("kamon-executors-bench"))
.enablePlugins(JmhPlugin)
.settings(
moduleName := "kamon-executors-bench",
resolvers += Resolver.mavenLocal,
fork in Test := true)
.settings(noPublishing: _*)
.settings(commonSettings: _*)
.settings(libraryDependencies ++= compileScope(guava))
.dependsOn(executors)
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* =========================================================================================
* Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/


package kamon.executors.bench

import java.util.concurrent.{Executor, ExecutorService, TimeUnit}

import com.google.common.util.concurrent.MoreExecutors
import kamon.Kamon
import kamon.executors.util.ContextAwareRunnable
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

class ExecutorInstrumentationBenchmark {

/**
* This benchmark attempts to measure the performance without any context propagation.
*
* @param blackhole a { @link Blackhole} object supplied by JMH
*/
@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Fork
def none(blackhole: Blackhole): Unit = {
MoreExecutors.directExecutor.execute(new BlackholeRunnable(blackhole))
}

/**
* This benchmark attempts to measure the performance with manual context propagation.
*
* @param blackhole a { @link Blackhole} object supplied by JMH
*/
@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Fork
def manual(blackhole: Blackhole): Unit = {
MoreExecutors.directExecutor.execute(new ContextAwareRunnable(new BlackholeRunnable(blackhole)))
}

/**
* This benchmark attempts to measure the performance with automatic context propagation.
*
* @param blackhole a { @link Blackhole} object supplied by JMH
*/
@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Fork(jvmArgsAppend = Array("-javaagent:/home/diego/.m2/repository/io/kamon/kanela-agent/0.0.15/kanela-agent-0.0.15.jar"))
def automatic(blackhole: Blackhole): Unit = {
MoreExecutors.directExecutor.execute(new BlackholeRunnable(blackhole))
}
}

private class BlackholeRunnable(blackhole: Blackhole) extends Runnable {
override def run(): Unit = {
blackhole.consume(Kamon.currentContext())
}
}

object DirectExecutor extends Executor {
override def execute(command: Runnable): Unit =
command.run()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/* =========================================================================================
* Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.executors.instrumentation;

import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage.Scope;
import kamon.executors.instrumentation.ExecutorsInstrumentationAdvisors.CallableCollectionWrapperAdvisor;
import kamon.executors.instrumentation.ExecutorsInstrumentationAdvisors.CallableWrapperAdvisor;
import kamon.executors.instrumentation.ExecutorsInstrumentationAdvisors.RunnableWrapperAdvisor;
import kanela.agent.api.instrumentation.KanelaInstrumentation;
import kanela.agent.bootstrap.context.ContextHandler;
import kanela.agent.bootstrap.context.ContextProvider;

import java.util.Collection;
import java.util.concurrent.Callable;

public final class ExecutorInstrumentation extends KanelaInstrumentation {

public ExecutorInstrumentation() {
/**
* Set the ContextProvider
*/
ContextHandler.setContexProvider(new KamonContextProvider());

/**
* Instrument all implementations of:
*
* java.util.concurrent.Executor::execute
*
*/
forSubtypeOf(() -> "java.util.concurrent.Executor", builder ->
builder
.withAdvisorFor(method("execute").and(withArgument(Runnable.class)), () -> RunnableWrapperAdvisor.class)
.build());


/**
* Instrument all implementations of:
*
* java.util.concurrent.ExecutorService::submit(Runnable)
* java.util.concurrent.ExecutorService::submit(Callable)
* java.util.concurrent.ExecutorService::[invokeAny|invokeAll](Collection[Callable])
*
*/
forSubtypeOf(() -> "java.util.concurrent.ExecutorService", builder ->
builder
.withAdvisorFor(method("submit").and(withArgument(Runnable.class)), () -> RunnableWrapperAdvisor.class)
.withAdvisorFor(method("submit").and(withArgument(Callable.class)), () -> CallableWrapperAdvisor.class)
.withAdvisorFor(anyMethods("invokeAny", "invokeAll").and(withArgument(Collection.class)), () -> CallableCollectionWrapperAdvisor.class)
.build());
}

/**
* Runs a Runnable within Kamon Context
*/
private static class ContextAwareRunnable implements Runnable {

private final Runnable underlying;
private final Context context;

ContextAwareRunnable(Runnable r) {
this.context = Kamon.currentContext();
this.underlying = r;
}

@Override
public void run() {
final Scope scope = Kamon.storeContext(context);
try {
underlying.run();
} finally {
scope.close();
}
}
}

/**
* Runs a Callable within Kamon Context
*/
private static class ContextAwareCallable<A> implements Callable<A> {

private final Callable<A> underlying;
private final Context context;

ContextAwareCallable(Callable<A> c) {
this.context = Kamon.currentContext();
this.underlying = c;
}

public A call() throws Exception {
final Scope scope = Kamon.storeContext(context);
try {
return underlying.call();
} finally {
scope.close();
}
}
}

/**
* implementation of kanela.agent.bootstrap.context.ContextProvider
*/
private static class KamonContextProvider implements ContextProvider {
@Override
public Runnable wrapInContextAware(Runnable runnable) {
return new ContextAwareRunnable(runnable);
}

@Override
public <A> Callable wrapInContextAware(Callable<A> callable) {
return new ContextAwareCallable<>(callable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/* =========================================================================================
* Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.executors.instrumentation;

import kanela.agent.bootstrap.context.ContextHandler;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;

final class ExecutorsInstrumentationAdvisors {

public static class RunnableWrapperAdvisor {
/**
* Wraps a {@link Runnable} so that it executes with the current context.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapParam(@Advice.Argument(value = 0, readOnly = false) Runnable runnable) {
runnable = ContextHandler.wrapInContextAware(runnable);
}
}

public static class CallableWrapperAdvisor {
/**
* Wraps a {@link Callable} so that it executes with the current context.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapParam(@Advice.Argument(value = 0, readOnly = false) Callable<?> callable) {
callable = ContextHandler.wrapInContextAware(callable);
}
}

public static class CallableCollectionWrapperAdvisor {
/**
* Wraps all elements of a list of {@link Callable}'s so that it executes with the current context.
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapParam(@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
final Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (Callable<?> task : tasks) {
if(task != null) wrappedTasks.add(ContextHandler.wrapInContextAware(task));
}
tasks = wrappedTasks;
}
}
}
41 changes: 41 additions & 0 deletions kamon-executors/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
kamon.executors {

# Interval at which all registered executor metrics will be sampled.
sample-interval = 500 milliseconds
}

kanela {
show-banner = true
log-level = "ERROR"

modules {
runnable-callable-module {
stopable = true
enabled = false
name = "Runnable|Callable Module"
instrumentations = [
"kamon.executors.instrumentation.RunnableOrCallableInstrumentation"
]
}

executors-module {
inject-in-bootstrap = true
stopable = true
enabled = false
disable-class-format-changes = true
name = "Kamon Executors Module"
instrumentations = [
"kamon.executors.instrumentation.ExecutorInstrumentation"
]
within = [
"com.google.common.util.concurrent..*",
"java.util.concurrent..*",
"scala.concurrent..*",
"scala.concurrent.impl..*"
"scala.concurrent.forkjoin.ForkJoinPool",
"akka.actor..*",
"play.api.libs.streams..*",
]
}
}
}

0 comments on commit bec8359

Please sign in to comment.