Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement basic tracing for the AWS Java SDK 2.x #1167

Merged
merged 4 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: [push, pull_request]

jobs:
ci:
runs-on: ubuntu-latest
runs-on: self-hosted
env:
JAVA_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
JVM_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
Expand Down
21 changes: 20 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ val instrumentationProjects = Seq[ProjectReference](
`kamon-caffeine`,
`kamon-lagom`,
`kamon-finagle`,
`kamon-aws-sdk`
)

lazy val instrumentation = (project in file("instrumentation"))
Expand Down Expand Up @@ -597,6 +598,23 @@ lazy val `kamon-finagle` = (project in file("instrumentation/kamon-finagle"))
)
).dependsOn(`kamon-core`, `kamon-instrumentation-common`, `kamon-testkit` % "test")

lazy val `kamon-aws-sdk` = (project in file("instrumentation/kamon-aws-sdk"))
.disablePlugins(AssemblyPlugin)
.enablePlugins(JavaAgent)
.settings(instrumentationSettings)
.settings(
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"com.amazonaws" % "aws-java-sdk-lambda" % "1.12.225" % "provided",
"software.amazon.awssdk" % "dynamodb" % "2.17.191" % "provided",
"software.amazon.awssdk" % "sqs" % "2.17.191" % "provided",

scalatest % "test",
logbackClassic % "test",
"org.testcontainers" % "dynalite" % "1.17.1"
)
).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test")

/**
* Reporters
*/
Expand Down Expand Up @@ -871,7 +889,8 @@ lazy val `kamon-bundle-dependencies-all` = (project in file("bundle/kamon-bundle
`kamon-redis`,
`kamon-okhttp`,
`kamon-caffeine`,
`kamon-lagom`
`kamon-lagom`,
`kamon-aws-sdk`
)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kamon.instrumentation.aws.sdk.AwsSdkRequestHandler
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kamon.instrumentation.aws.sdk.AwsSdkClientExecutionInterceptor
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* ==========================================================================================
* Copyright © 2013-2022 The Kamon Project <https://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* ==========================================================================================
*/

package kamon.instrumentation.aws.sdk

import kamon.Kamon
import kamon.trace.Span
import software.amazon.awssdk.core.interceptor.{Context, ExecutionAttribute, ExecutionAttributes, ExecutionInterceptor, SdkExecutionAttribute}

/**
* Execution Interceptor for the AWS Java SDK Version 2.x
*
* Bare-bones interceptor that creates Spans for all client requests made with the AWS SDK. There is no need to
* add this interceptor by hand anywhere, the AWS SDK will pick it up automatically from the classpath because it is
* included in the "software/amazon/awssdk/global/handlers/execution.interceptors" file shipped with this module.
*/
class AwsSdkClientExecutionInterceptor extends ExecutionInterceptor {
private val ClientSpanAttribute = new ExecutionAttribute[Span]("SdkClientSpan")

override def afterMarshalling(context: Context.AfterMarshalling, executionAttributes: ExecutionAttributes): Unit = {
val operationName = executionAttributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME)
val serviceName = executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME)
val clientType = executionAttributes.getAttribute(SdkExecutionAttribute.CLIENT_TYPE)

val clientSpan = Kamon.clientSpanBuilder(operationName, serviceName)
.tag("aws.sdk.client_type", clientType.name())
.start()

executionAttributes.putAttribute(ClientSpanAttribute, clientSpan)
}

override def afterExecution(context: Context.AfterExecution, executionAttributes: ExecutionAttributes): Unit = {
val kamonSpan = executionAttributes.getAttribute(ClientSpanAttribute)
if(kamonSpan != null) {
kamonSpan.finish()
}
}

override def onExecutionFailure(context: Context.FailedExecution, executionAttributes: ExecutionAttributes): Unit = {
val kamonSpan = executionAttributes.getAttribute(ClientSpanAttribute)
if(kamonSpan != null) {
kamonSpan.fail(context.exception()).finish()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* ==========================================================================================
* Copyright © 2013-2022 The Kamon Project <https://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* ==========================================================================================
*/

package kamon.instrumentation.aws.sdk

import com.amazonaws.{Request, Response}
import com.amazonaws.handlers.{HandlerContextKey, RequestHandler2}
import kamon.Kamon
import kamon.trace.Span

/**
* RequestHandler for the AWS Java SDK version 1.x
*
* Bare-bones request handler that creates Spans for all client requests made with the AWS SDK. There is no need to
* add this interceptor by hand anywhere, the AWS SDK will pick it up automatically from the classpath because it is
* included in the "software/amazon/awssdk/global/handlers/execution.interceptors" file shipped with this module.
*/
class AwsSdkRequestHandler extends RequestHandler2 {
val SpanContextKey = new HandlerContextKey[Span](classOf[Span].getName)

override def beforeRequest(request: Request[_]): Unit = {
val serviceName = request.getServiceName
val originalRequestName = {
// Remove the "Request" part of the request class name, if present
var requestClassName = request.getOriginalRequest.getClass.getSimpleName
if(requestClassName.endsWith("Request"))
requestClassName = requestClassName.substring(0, requestClassName.length - 7)
requestClassName
}

val operationName = serviceName + "." + originalRequestName

val clientSpan = serviceName match {
case "AmazonSQS" => Kamon.producerSpanBuilder(operationName, serviceName).start()
case _ => Kamon.clientSpanBuilder(operationName, serviceName).start()
}

request.addHandlerContext(SpanContextKey, clientSpan)
}

override def afterResponse(request: Request[_], response: Response[_]): Unit = {
val requestSpan = request.getHandlerContext(SpanContextKey)
if(requestSpan != null) {
requestSpan.finish()
}
}

override def afterError(request: Request[_], response: Response[_], e: Exception): Unit = {
val requestSpan = request.getHandlerContext(SpanContextKey)
if(requestSpan != null) {
requestSpan
.fail(e)
.finish()
}
}
}
14 changes: 14 additions & 0 deletions instrumentation/kamon-aws-sdk/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="kamon.log.LogbackFilter"/>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* ==========================================================================================
* Copyright © 2013-2022 The Kamon Project <https://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* ==========================================================================================
*/

package kamon.instrumentation.aws.sdk

import kamon.tag.Lookups.plain
import kamon.testkit.{InitAndStopKamonAfterAll, TestSpanReporter}
import org.scalatest.OptionValues
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.testcontainers.containers.GenericContainer
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, CreateTableRequest, KeySchemaElement, KeyType, ProvisionedThroughput}

import scala.concurrent.duration._
import java.net.URI

class DynamoDBTracingSpec extends AnyWordSpec with Matchers with OptionValues with InitAndStopKamonAfterAll with Eventually with TestSpanReporter {
val dynamoPort = 8000
val dynamoContainer: GenericContainer[_] = new GenericContainer("amazon/dynamodb-local:latest")
.withExposedPorts(dynamoPort)

lazy val client = DynamoDbClient.builder()
.endpointOverride(URI.create(s"http://${dynamoContainer.getHost}:${dynamoContainer.getMappedPort(dynamoPort)}"))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("dummy", "dummy")))
.region(Region.US_EAST_1)
.build()


"the DynamoDB instrumentation on the SDK" should {
"create a Span for simple calls" in {
client.createTable(CreateTableRequest.builder()
.tableName("people")
.attributeDefinitions(AttributeDefinition.builder().attributeName("customerId").attributeType("S").build())
.keySchema(KeySchemaElement.builder().attributeName("customerId").keyType(KeyType.HASH).build())
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5l).writeCapacityUnits(5l).build())
.build())


eventually(timeout(5 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "CreateTable"
span.metricTags.get(plain("component")) shouldBe "DynamoDb"
}
}
}

override protected def beforeAll(): Unit = {
super.beforeAll()
dynamoContainer.start()
}
}