Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument ConnectionSource in Akka/Pekko HTTP Servers #11103

Merged
merged 7 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ muzzle {
pass {
group.set("com.typesafe.akka")
module.set("akka-actor_2.11")
versions.set("[2.5,)")
versions.set("[2.5,2.6)") // Akka's custom ForkJoin was removed in 2.6, replaced by the java.concurrent version
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

akka-actor_2.11 doesn't look like it goes above 2.5.x

maybe instead (or in addition), add

pass {
    group.set("com.typesafe.akka")
    module.set("akka-actor_2.12")
    versions.set("[2.5,2.6)") // Akka's custom ForkJoin was removed in 2.6, replaced by the java.concurrent version
    assertInverse.set(true)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Pushed a new commit with that change.

assertInverse.set(true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public boolean isIndyModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());
return asList(
new HttpExtServerInstrumentation(),
new GraphInterpreterInstrumentation(),
new AkkaHttpServerSourceInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkahttp.server;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.scaladsl.Flow;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class AkkaHttpServerSourceInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("akka.http.scaladsl.Http$IncomingConnection");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("handleWith").and(takesArgument(0, named("akka.stream.scaladsl.Flow"))),
this.getClass().getName() + "$AkkaBindAndHandleAdvice");
}

@SuppressWarnings("unused")
public static class AkkaBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
handler = AkkaFlowWrapper.wrap(handler);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkahttp

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.StatusCodes.Found
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._

import java.util.function.Supplier
import scala.concurrent.Await

object AkkaHttpTestServerSourceWebServer {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

var route = get {
concat(
path(SUCCESS.rawPath()) {
complete(
AbstractHttpServerTest.controller(SUCCESS, supplier(SUCCESS.getBody))
)
},
path(INDEXED_CHILD.rawPath()) {
parameterMap { map =>
val supplier = new Supplier[String] {
def get(): String = {
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
map.get(name).orNull
})
""
}
}
complete(AbstractHttpServerTest.controller(INDEXED_CHILD, supplier))
}
},
path(QUERY_PARAM.rawPath()) {
extractUri { uri =>
complete(
AbstractHttpServerTest
.controller(INDEXED_CHILD, supplier(uri.queryString().orNull))
)
}
},
path(REDIRECT.rawPath()) {
redirect(
AbstractHttpServerTest
.controller(REDIRECT, supplier(REDIRECT.getBody)),
Found
)
},
path(ERROR.rawPath()) {
complete(
500 -> AbstractHttpServerTest
.controller(ERROR, supplier(ERROR.getBody))
)
},
path("path" / LongNumber / "param") { id =>
complete(
AbstractHttpServerTest.controller(PATH_PARAM, supplier(id.toString))
)
},
path(
"test1" / IntNumber / HexIntNumber / LongNumber / HexLongNumber /
DoubleNumber / JavaUUID / Remaining
) { (_, _, _, _, _, _, _) =>
complete(SUCCESS.getBody)
},
pathPrefix("test2") {
concat(
path("first") {
complete(SUCCESS.getBody)
},
path("second") {
complete(SUCCESS.getBody)
}
)
}
)
}

private var binding: ServerBinding = null

def start(port: Int): Unit = synchronized {
if (null == binding) {
import scala.concurrent.duration._
binding = Await.result(
Http()
.bind("localhost", port)
.map(_.handleWith(route))
.to(Sink.ignore)
.run(),
10.seconds
)
}
}

def stop(): Unit = synchronized {
if (null != binding) {
binding.unbind()
system.terminate()
binding = null
}
}

def supplier(string: String): Supplier[String] = {
new Supplier[String] {
def get(): String = {
string
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public boolean isIndyModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());
return asList(
new HttpExtServerInstrumentation(),
new GraphInterpreterInstrumentation(),
new PekkoHttpServerSourceInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.stream.scaladsl.Flow;

public class PekkoHttpServerSourceInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pekko.http.scaladsl.Http$IncomingConnection");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("handleWith").and(takesArgument(0, named("org.apache.pekko.stream.scaladsl.Flow"))),
this.getClass().getName() + "$PekkoBindAndHandleAdvice");
}

@SuppressWarnings("unused")
public static class PekkoBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
handler = PekkoFlowWrapper.wrap(handler);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0

import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.Http.ServerBinding
import org.apache.pekko.http.scaladsl.model.StatusCodes.Found
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.stream.ActorMaterializer
import org.apache.pekko.stream.scaladsl.Sink

import java.util.function.Supplier
import scala.concurrent.Await

object PekkoHttpTestServerSourceWebServer {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

var route = get {
concat(
path(SUCCESS.rawPath()) {
complete(
AbstractHttpServerTest.controller(SUCCESS, supplier(SUCCESS.getBody))
)
},
path(INDEXED_CHILD.rawPath()) {
parameterMap { map =>
val supplier = new Supplier[String] {
def get(): String = {
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
map.get(name).orNull
})
""
}
}
complete(AbstractHttpServerTest.controller(INDEXED_CHILD, supplier))
}
},
path(QUERY_PARAM.rawPath()) {
extractUri { uri =>
complete(
AbstractHttpServerTest
.controller(INDEXED_CHILD, supplier(uri.queryString().orNull))
)
}
},
path(REDIRECT.rawPath()) {
redirect(
AbstractHttpServerTest
.controller(REDIRECT, supplier(REDIRECT.getBody)),
Found
)
},
path(ERROR.rawPath()) {
complete(
500 -> AbstractHttpServerTest
.controller(ERROR, supplier(ERROR.getBody))
)
},
path("path" / LongNumber / "param") { id =>
complete(
AbstractHttpServerTest.controller(PATH_PARAM, supplier(id.toString))
)
},
path(
"test1" / IntNumber / HexIntNumber / LongNumber / HexLongNumber /
DoubleNumber / JavaUUID / Remaining
) { (_, _, _, _, _, _, _) =>
complete(SUCCESS.getBody)
},
pathPrefix("test2") {
concat(
path("first") {
complete(SUCCESS.getBody)
},
path("second") {
complete(SUCCESS.getBody)
}
)
}
)
}

private var binding: ServerBinding = null

def start(port: Int): Unit = synchronized {
if (null == binding) {
import scala.concurrent.duration._
binding = Await.result(
Http()
.bind("localhost", port)
.map(_.handleWith(route))
.to(Sink.ignore)
.run(),
10.seconds
)
}
}

def stop(): Unit = synchronized {
if (null != binding) {
binding.unbind()
system.terminate()
binding = null
}
}

def supplier(string: String): Supplier[String] = {
new Supplier[String] {
def get(): String = {
string
}
}
}
}
Loading