Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dynamic resolution of SQS to allow working without it. #2421

Merged
merged 3 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ testSets {
setFailOnNoMatchingTests(false)
}
}

// We test SQS separately since we have special logic for it and want to make sure the presence of
// SQS on the classpath doesn't conflict with tests for usage of the core SDK.
testSqs
}

configurations {
Expand All @@ -49,31 +53,26 @@ dependencies {

library group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.11.0'

compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106'

// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent')
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-rds', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-ec2', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-dynamodb', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106'

// Make sure doesn't add HTTP headers
testSqsImplementation group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106'
// needed for SQS - using emq directly as localstack references emq v0.15.7 ie WITHOUT AWS trace header propagation
testSqsImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.12', version: '1.0.0'

// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent')

// needed for kinesis:
testImplementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: versions.jackson

// needed for SQS - using emq directly as localstack references emq v0.15.7 ie WITHOUT AWS trace header propagation
testLibrary group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.12', version: '1.0.0'

test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.0')
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-rds', version: '1.11.0')
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-ec2', version: '1.11.0')
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.0')
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.0')
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-dynamodb', version: '1.11.0')
}

Expand All @@ -82,7 +81,7 @@ test {
}

if (!testLatestDeps) {
test.dependsOn test_before_1_11_106
check.dependsOn test_before_1_11_106, testSqs
}

tasks.withType(Test) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;

import static java.lang.invoke.MethodType.methodType;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Reflective access to aws-sdk-java-sqs class Message.
*
* <p>We currently don't have a good pattern of instrumenting a core library with various plugins
* that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*/
final class SqsMessageAccess {

@Nullable private static final MethodHandle GET_ATTRIBUTES;

static {
Class<?> messageClass = null;
try {
messageClass = Class.forName("com.amazonaws.services.sqs.model.Message");
} catch (Throwable t) {
// Ignore.
}
if (messageClass != null) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle getAttributes = null;
try {
getAttributes = lookup.findVirtual(messageClass, "getAttributes", methodType(Map.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
GET_ATTRIBUTES = getAttributes;
} else {
GET_ATTRIBUTES = null;
}
}

@SuppressWarnings("unchecked")
static Map<String, String> getAttributes(Object message) {
if (GET_ATTRIBUTES == null) {
return Collections.emptyMap();
}
try {
return (Map<String, String>) GET_ATTRIBUTES.invoke(message);
} catch (Throwable t) {
return Collections.emptyMap();
}
}

private SqsMessageAccess() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;

import static java.lang.invoke.MethodType.methodType;

import com.amazonaws.AmazonWebServiceRequest;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import javax.annotation.Nullable;

/**
* Reflective access to aws-sdk-java-sqs class ReceiveMessageRequest.
*
* <p>We currently don't have a good pattern of instrumenting a core library with various plugins
* that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*/
final class SqsReceiveMessageRequestAccess {

@Nullable private static final MethodHandle WITH_ATTRIBUTE_NAMES;

static {
Class<?> receiveMessageRequestClass = null;
try {
receiveMessageRequestClass =
Class.forName("com.amazonaws.services.sqs.model.ReceiveMessageRequest");
} catch (Throwable t) {
// Ignore.
}
if (receiveMessageRequestClass != null) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle withAttributeNames = null;
try {
withAttributeNames =
lookup.findVirtual(
receiveMessageRequestClass,
"withAttributeNames",
methodType(receiveMessageRequestClass, String[].class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
WITH_ATTRIBUTE_NAMES = withAttributeNames;
} else {
WITH_ATTRIBUTE_NAMES = null;
}
}

static boolean isInstance(AmazonWebServiceRequest request) {
return request
.getClass()
.getName()
.equals("com.amazonaws.services.sqs.model.ReceiveMessageRequest");
}

static void withAttributeNames(AmazonWebServiceRequest request, String name) {
if (WITH_ATTRIBUTE_NAMES == null) {
return;
}
try {
WITH_ATTRIBUTE_NAMES.invoke(request, name);
} catch (Throwable throwable) {
// Ignore
}
}

private SqsReceiveMessageRequestAccess() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;

import static java.lang.invoke.MethodType.methodType;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

/**
* Reflective access to aws-sdk-java-sqs class ReceiveMessageResult.
*
* <p>We currently don't have a good pattern of instrumenting a core library with various plugins
* that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*/
final class SqsReceiveMessageResultAccess {

@Nullable private static final MethodHandle GET_MESSAGES;

static {
Class<?> receiveMessageResultClass = null;
try {
receiveMessageResultClass =
Class.forName("com.amazonaws.services.sqs.model.ReceiveMessageResult");
} catch (Throwable t) {
// Ignore.
}
if (receiveMessageResultClass != null) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle getMessages = null;
try {
getMessages =
lookup.findVirtual(receiveMessageResultClass, "getMessages", methodType(List.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
GET_MESSAGES = getMessages;
} else {
GET_MESSAGES = null;
}
}

static List<?> getMessages(Object result) {
if (GET_MESSAGES == null) {
return Collections.emptyList();
}
try {
return (List<?>) GET_MESSAGES.invoke(result);
} catch (Throwable t) {
return Collections.emptyList();
}
}

private SqsReceiveMessageResultAccess() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
Expand Down Expand Up @@ -48,23 +44,25 @@ public void beforeRequest(Request<?> request) {
}

private boolean isSqsProducer(AmazonWebServiceRequest request) {
return (request instanceof SendMessageRequest);
return request
.getClass()
.getName()
.equals("com.amazonaws.services.sqs.model.SendMessageRequest");
}

@Override
public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) {
if (isSqsConsumer(request)) {
ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) request;
receiveMessageRequest.withAttributeNames(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE);
if (SqsReceiveMessageRequestAccess.isInstance(request)) {
SqsReceiveMessageRequestAccess.withAttributeNames(
request, SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE);
}
return request;
}

@Override
public void afterResponse(Request<?> request, Response<?> response) {
if (isSqsConsumer(request.getOriginalRequest())) {
afterConsumerResponse(
(Request<ReceiveMessageRequest>) request, (Response<ReceiveMessageResult>) response);
if (SqsReceiveMessageRequestAccess.isInstance(request.getOriginalRequest())) {
afterConsumerResponse(request, response);
}
// close outstanding "client" span
ContextScopePair scope = request.getHandlerContext(CONTEXT_SCOPE_PAIR_CONTEXT_KEY);
Expand All @@ -76,22 +74,18 @@ public void afterResponse(Request<?> request, Response<?> response) {
tracer().end(scope.getContext(), response);
}

private boolean isSqsConsumer(AmazonWebServiceRequest request) {
return (request instanceof ReceiveMessageRequest);
}

/** Create and close CONSUMER span for each message consumed. */
private void afterConsumerResponse(
Request<ReceiveMessageRequest> request, Response<ReceiveMessageResult> response) {
ReceiveMessageResult receiveMessageResult = response.getAwsResponse();
List<Message> messages = receiveMessageResult.getMessages();
for (Message message : messages) {
private void afterConsumerResponse(Request<?> request, Response<?> response) {
Object receiveMessageResult = response.getAwsResponse();
List<?> messages = SqsReceiveMessageResultAccess.getMessages(receiveMessageResult);
for (Object message : messages) {
createConsumerSpan(message, request, response);
}
}

private void createConsumerSpan(Message message, Request<?> request, Response<?> response) {
Context parentContext = SqsParentContext.ofSystemAttributes(message.getAttributes());
private void createConsumerSpan(Object message, Request<?> request, Response<?> response) {
Context parentContext =
SqsParentContext.ofSystemAttributes(SqsMessageAccess.getAttributes(message));
AmazonWebServiceRequest originalRequest = request.getOriginalRequest();
RequestMeta requestMeta = contextStore.get(originalRequest);
Context context = tracer().startSpan(SpanKind.CONSUMER, parentContext, request, requestMeta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import com.amazonaws.services.rds.AmazonRDSClientBuilder
import com.amazonaws.services.rds.model.DeleteOptionGroupRequest
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.sqs.AmazonSQSClientBuilder
import com.amazonaws.services.sqs.model.CreateQueueRequest
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.api.trace.Span
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
Expand Down Expand Up @@ -169,22 +166,6 @@ class Aws1ClientTest extends AgentInstrumentationSpecification {
"S3" | "GetObject" | "GET" | "/someBucket/someKey" | 1 | AmazonS3ClientBuilder.standard().withPathStyleAccessEnabled(true).withEndpointConfiguration(endpoint).withCredentials(credentialsProvider).build() | { client -> client.getObject("someBucket", "someKey") } | ["aws.bucket.name": "someBucket"] | ""
"DynamoDBv2" | "CreateTable" | "POST" | "/" | 1 | AmazonDynamoDBClientBuilder.standard().withEndpointConfiguration(endpoint).withCredentials(credentialsProvider).build() | { c -> c.createTable(new CreateTableRequest("sometable", null)) } | ["aws.table.name": "sometable"] | ""
"Kinesis" | "DeleteStream" | "POST" | "/" | 1 | AmazonKinesisClientBuilder.standard().withEndpointConfiguration(endpoint).withCredentials(credentialsProvider).build() | { c -> c.deleteStream(new DeleteStreamRequest().withStreamName("somestream")) } | ["aws.stream.name": "somestream"] | ""
"SQS" | "CreateQueue" | "POST" | "/" | 4 | AmazonSQSClientBuilder.standard().withEndpointConfiguration(endpoint).withCredentials(credentialsProvider).build() | { c -> c.createQueue(new CreateQueueRequest("somequeue")) } | ["aws.queue.name": "somequeue"] | """
<CreateQueueResponse>
<CreateQueueResult><QueueUrl>https://queue.amazonaws.com/123456789012/MyQueue</QueueUrl></CreateQueueResult>
<ResponseMetadata><RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId></ResponseMetadata>
</CreateQueueResponse>
"""
"SQS" | "SendMessage" | "POST" | "/someurl" | 4 | AmazonSQSClientBuilder.standard().withEndpointConfiguration(endpoint).withCredentials(credentialsProvider).build() | { c -> c.sendMessage(new SendMessageRequest("someurl", "")) } | ["aws.queue.url": "someurl"] | """
<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>d41d8cd98f00b204e9800998ecf8427e</MD5OfMessageBody>
<MD5OfMessageAttributes>3ae8f24a165a8cedc005670c81a27295</MD5OfMessageAttributes>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
</SendMessageResult>
<ResponseMetadata><RequestId>27daac76-34dd-47df-bd01-1f6e873584a0</RequestId></ResponseMetadata>
</SendMessageResponse>
"""
"EC2" | "AllocateAddress" | "POST" | "/" | 4 | AmazonEC2ClientBuilder.standard().withEndpointConfiguration(endpoint).withCredentials(credentialsProvider).build() | { client -> client.allocateAddress() } | [:] | """
<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
Expand Down