Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature sttp client3 #1

Merged
merged 7 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import akka.grpc.gen.BuildInfo.akkaVersion

/* =========================================================================================
* Copyright © 2013-2021 the kamon project <http://kamon.io/>
*
Expand Down Expand Up @@ -143,6 +145,7 @@ val instrumentationProjects = Seq[ProjectReference](
`kamon-caffeine`,
`kamon-lagom`,
`kamon-finagle`,
`kamon-sttp-client3`,
)

lazy val instrumentation = (project in file("instrumentation"))
Expand Down Expand Up @@ -519,6 +522,24 @@ lazy val `kamon-okhttp` = (project in file("instrumentation/kamon-okhttp"))
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

lazy val `kamon-sttp-client3` = (project in file("instrumentation/kamon-sttp-client3"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(
instrumentationSettings,
crossScalaVersions := Seq(
`scala_2.11_version`,
`scala_2.12_version`,
`scala_2.13_version`
),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"com.softwaremill.sttp.client3" %% "core" % "3.3.0-RC2" % "provided",
scalatest % "test",
logbackClassic % "test",
)
).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test")

lazy val `kamon-tapir` = (project in file("instrumentation/kamon-tapir"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
Expand Down Expand Up @@ -868,7 +889,9 @@ lazy val `kamon-bundle-dependencies-all` = (project in file("bundle/kamon-bundle
`kamon-redis`,
`kamon-okhttp`,
`kamon-caffeine`,
`kamon-lagom`
`kamon-lagom`,
`kamon-sttp-client3`

)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ class AkkaHttpClientInstrumentation extends InstrumentationBuilder with VersionF
.advise(method("singleRequestImpl"), classOf[HttpExtSingleRequestAdvice])
}

onAkkaHttp("10.2") {
onType("akka.http.scaladsl.HttpExt")
.advise(method("singleRequest"), classOf[HttpExtSingleRequestAdvice])
}

onType("akka.http.impl.engine.client.PoolMaster")
.advise(method("dispatchRequest"), classOf[PoolMasterDispatchRequestAdvice])
}

object AkkaHttpClientInstrumentation {

@volatile var httpClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation
@volatile var httpClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation()

private[http] def rebuildHttpClientInstrumentation(): HttpClientInstrumentation = {
val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.akka.http.client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object InstrumentNewExecutorServiceOnAkka24 {
val systemTags = TagSet.of("akka.system", actorSystemName)


if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) {
if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(scheduledActionName)) {
val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext

if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) {
Expand All @@ -147,7 +147,7 @@ object InstrumentNewExecutorServiceOnAkka25 {
val scheduledActionName = actorSystemName + "/" + dispatcherName
val systemTags = TagSet.of("akka.system", actorSystemName)

if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) {
if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(scheduledActionName)) {
val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext

if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object InstrumentNewExecutorServiceOnAkka26 {
val scheduledActionName = actorSystemName + "/" + dispatcherName
val systemTags = TagSet.of("akka.system", actorSystemName)

if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) {
if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(scheduledActionName)) {
val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext

if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.sttp;

import kamon.Kamon;
import kamon.context.Storage;
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler;
import kamon.trace.Span;
import kamon.trace.SpanBuilder;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import sttp.client3.RequestT;
import sttp.client3.Response;


public class RequestSendAdvice {

@Advice.OnMethodEnter()
public static RequestHandler<RequestT> enter(@Advice.Argument(value = 0, readOnly = false) RequestT request,
@Advice.Local("scope") Storage.Scope scope) {
RequestHandler<RequestT> handler = SttpClientInstrumentation.getHandler(request);
String path = request.uri().toString();
SpanBuilder spanBuilder = Kamon.spanBuilder(path)
.tag("protocol", "http2->1")
.tag("component", "sttp.client3")
.tag("http.method", request.method().toString())
.tag("path", path);

if (!Kamon.currentSpan().isEmpty()) {
spanBuilder.asChildOf(Kamon.currentSpan());
}
Span span = spanBuilder.start().takeSamplingDecision();
scope = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key(), span));
request = handler.request();
return handler;

}

@Advice.OnMethodExit
public static <T> void exit(@Advice.Enter RequestHandler<RequestT> handler,
@Advice.Return(readOnly = false) Response<T> response,
@Advice.Local("scope") Storage.Scope scope) {
try {

handler.processResponse(SttpClientInstrumentation.toResponseBuilder(response));

} catch (Exception e) {
handler.span().fail(e);
throw e;
} finally {
handler.span().finish();
scope.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# =================================== #
# kamon-sttp-client3 reference configuration #
# =================================== #

kanela.modules.sttp-client3 {
name = "Sttp client3"
description = "The Scala HTTP client you always wanted!"
instrumentations = [
"kamon.instrumentation.sttp.SttpClientInstrumentation"
]
within = [
"^sttp.client3.*"
]
}

kamon.instrumentation.sttp.client3 {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kamon.instrumentation.sttp

import kamon.context.Context
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
import kamon.instrumentation.sttp.SttpClientInstrumentation.Request
import kamon.Kamon
import kamon.trace.{ Identifier, Span }
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation.{ Argument, SuperCall }
import sttp.capabilities.Effect
import sttp.client3.{ Identity, RequestT, Response }

import java.util.concurrent.Callable
import scala.concurrent.Future

/**
*
* @author guobin.li@ascendex.io
* @version 1.0,2022/4/21
*/
class AsyncHttpClientBackendInterceptor

object AsyncHttpClientBackendInterceptor {

def send[T, R >: Any with Effect[Future]](@Argument(0) arg: Any, @SuperCall superCall: Callable[Future[Response[T]]]): Future[Response[T]] = {
var span: Span = null
val traceKey: Context.Key[String] = Context.key[String]("parentTraceId", "undefined")
var traceIdVal: Option[String] = None
var requestHandler: RequestHandler[Request[T, R]] = null
val response = arg match {
case request: RequestT[Identity, T, R] =>
val zcall = superCall.call()
requestHandler = SttpClientInstrumentation.getHandler[T, R](request, Kamon.currentSpan())
val traceId = request.headers.find(header => header.name == "traceid")
val parentSpanId = request.headers.find(header => header.name == "spanid")
val parentSpanIdVal = parentSpanId.map(_.value)

traceIdVal = traceId.map(_.value)

val path = s"${request.uri.path.mkString("/")}"
val spanBuilder = Kamon.spanBuilder(path)
.tag("protocol", "http2->1")
.tag("component", "sttp.client3")
.tag("http.method", request.method.method)
.tag("path", path)

traceIdVal.foreach(tid => {
spanBuilder
.traceId(Identifier.Scheme.Single.traceIdFactory.from(tid))
.setParentId(parentSpanIdVal)
})

if (!Kamon.currentSpan().isEmpty) {
spanBuilder.asChildOf(Kamon.currentSpan())
}
span = spanBuilder.start().takeSamplingDecision()
zcall
case x =>
superCall.call()
}

Kamon.runWithSpan(Kamon.currentSpan(), finishSpan = false) {
Kamon.runWithContextEntry(traceKey, traceIdVal.getOrElse("undefined")) {
SttpClientInstrumentation.handleResponse[T, R](response, requestHandler)
}
}
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package kamon.instrumentation.sttp

import kamon.instrumentation.http.HttpMessage
import sttp.client3._

/**
* @author guobin.li@ascendex.io
* @version 1.0, 2022/4/20
*/
private[sttp] trait RequestReader[T, R] extends HttpMessage.Request {

def request: RequestT[Identity, T, R]

override def url: String = request.uri.toString

override def path: String = request.uri.path.mkString("/")

override def method: String = request.method.method

override def host: String = request.uri.authority.map(_.host).orNull

override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(80)

override def read(header: String): Option[String] = request.headers(header).headOption

override def readAll(): Map[String, String] = {
val builder = Map.newBuilder[String, String]
request.headers.foreach(h => builder += (h.name -> h.value))
builder.result()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package kamon.instrumentation.sttp

import kamon.Kamon
import kamon.instrumentation.http._
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler
import kamon.util.CallingThreadExecutionContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import sttp.client3
import sttp.client3.{ Identity, RequestT, Response }
import sttp.model.Header

import kamon.trace.Span

import scala.concurrent.Future
import scala.util.{ Failure, Success }

/**
*
* @author guobin.li@ascendex.io
* @version 1.0,2022/4/20
*/
class SttpClientInstrumentation extends InstrumentationBuilder {

onType("sttp.client3.HttpURLConnectionBackend")
.advise(method("send"), classOf[RequestSendAdvice])

onType("sttp.client3.asynchttpclient.AsyncHttpClientBackend")
.intercept(method("send"), classOf[AsyncHttpClientBackendInterceptor])
}


object SttpClientInstrumentation {

type Request[T, -R] = RequestT[Identity, T, R]


@volatile var httpClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation()

Kamon.onReconfigure(_ => httpClientInstrumentation = rebuildHttpClientInstrumentation())

private[sttp] def rebuildHttpClientInstrumentation(): HttpClientInstrumentation = {
val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.sttp.client3")
httpClientInstrumentation = HttpClientInstrumentation.from(httpClientConfig, "sttp.client3")
httpClientInstrumentation
}

def getHandler[T, R](request: Request[T, R]): RequestHandler[Request[T, R]] = {
httpClientInstrumentation.createHandler[Request[T, R]](toRequestBuilder[T, R](request), Kamon.currentContext())
}

def getHandler[T, R](request: Request[T, R], span: Span): RequestHandler[Request[T, R]] = {
httpClientInstrumentation.createHandler[Request[T, R]](toRequestBuilder[T, R](request),
Kamon.currentContext().withEntry(Span.Key, span))
}

def toResponseBuilder[T](response: Response[T]): HttpMessage.Response = new HttpMessage.Response {
override def statusCode: Int = response.code.code
}

def toRequestBuilder[T, R](httpRequest: Request[T, R]): HttpMessage.RequestBuilder[Request[T, R]] =
new RequestReader[T, R] with HttpMessage.RequestBuilder[Request[T, R]] {

private var _extraHeaders = List.empty[Header]

override val request: Request[T, R] = httpRequest

override def write(header: String, value: String): Unit =
_extraHeaders = Header(header, value) :: _extraHeaders

override def build(): Request[T, R] = request.copy(headers = request.headers ++ _extraHeaders)
}

def handleResponse[T, R](response: Future[Response[T]], handler: RequestHandler[Request[T, R]]): Future[client3.Response[T]] = {
response.onComplete {
case Success(res) => handler.processResponse(toResponseBuilder(res))
case Failure(t) => handler.span.fail(t).finish()
}(CallingThreadExecutionContext)

response
}
}
Loading