Skip to content

Commit

Permalink
Add SendMessageBatch support.
Browse files Browse the repository at this point in the history
  • Loading branch information
Oberon00 committed Jun 23, 2023
1 parent b24e390 commit 8b48a0b
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 10 deletions.
3 changes: 3 additions & 0 deletions instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ dependencies {
tasks {
test {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)

// NB: If you'd like to change these, there is some cleanup work to be done, as most tests ignore this and
// set the value directly (the "library" does not normally query it, only library-autoconfigure)
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

// this class is only used from SqsAccess from method with @NoMuzzle annotation
Expand Down Expand Up @@ -92,30 +94,63 @@ static SdkRequest modifyRequest(
if (request instanceof SendMessageRequest) {
return injectIntoSendMessageRequest(
(SendMessageRequest) request, otelContext, messagingPropagator);
} else if (request instanceof SendMessageBatchRequest) {
return injectIntoSendMessageBatchRequest(
(SendMessageBatchRequest) request, otelContext, messagingPropagator);
}
// TODO: Support SendMessageBatchRequest
}
return null;
}

private static SdkRequest injectIntoSendMessageBatchRequest(
SendMessageBatchRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
ArrayList<SendMessageBatchRequestEntry> entries = new ArrayList<>(request.entries());
for (int i = 0; i < entries.size(); ++i) {
SendMessageBatchRequestEntry entry = entries.get(i);
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());

// TODO: Per https://github.com/open-telemetry/oteps/pull/220, each message should get
// a separate context. We don't support this yet, also because it would be inconsistent
// with the header-based X-Ray propagation
// (probably could override it here by setting the X-Ray message system attribute)
if (injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
entries.set(i, entry.toBuilder().messageAttributes(messageAttributes).build());
}
}
return request.toBuilder().entries(entries).build();
}

private static SdkRequest injectIntoSendMessageRequest(
SendMessageRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
if (!injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
return request;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}

private static boolean injectIntoMessageAttributes(
Map<String, MessageAttributeValue> messageAttributes,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});

if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call.
return request;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
// Return whether the injection resulted in an attribute count that is still supported.
// See
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
return messageAttributes.size() <= 10;
}

private static SdkRequest modifyReceiveMessageRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest imp
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseXrayPropagator(false) // Disable to confirm messaging propagator actually works
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
true
}

@Override
boolean isXrayInjectionEnabled() {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2S
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
return AttributeValue.builder().s(value).build()
}

def isSqsAttributeInjectionEnabled() {
static boolean isSqsAttributeInjectionEnabled() {
// See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor
return ConfigPropertiesUtil.getBoolean("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsBaseClientBuilder
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import spock.lang.Shared

Expand All @@ -35,19 +37,56 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
@Shared
int sqsPort

static Map<String, MessageAttributeValue> dummyMessageAttributes(count) {
(0..<count).collectEntries {
[
"a$it".toString(),
MessageAttributeValue.builder().stringValue("v$it").dataType("String").build()]
}
}

String queueUrl = "http://localhost:$sqsPort/000000000000/testSdkSqs"

ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.queueUrl(queueUrl)
.build()

ReceiveMessageRequest receiveMessageBatchRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(3)
.messageAttributeNames("All")
.waitTimeSeconds(5)
.build()

CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
.queueName("testSdkSqs")
.build()

SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.queueUrl(queueUrl)
.messageBody("{\"type\": \"hello\"}")
.build()

SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(
e -> e.messageBody("e1").id("i1"),
// 8 attributes, injection always possible
e -> e.messageBody("e2").id("i2")
.messageAttributes(dummyMessageAttributes(8)),
// 10 attributes, injection with custom propagator never possible
e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10)))
.build()

boolean isSqsAttributeInjectionEnabled() {
AbstractAws2ClientTest.isSqsAttributeInjectionEnabled()
}

boolean isXrayInjectionEnabled() {
true
}


void configureSdkClient(SqsBaseClientBuilder builder) {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
Expand Down Expand Up @@ -204,4 +243,129 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
resp.messages().size() == 1
assertSqsTraces()
}

def "batch sqs producer-consumer services: sync"() {
setup:
def builder = SqsClient.builder()
configureSdkClient(builder)
def client = builder.build()

client.createQueue(createQueueRequest)

when:
client.sendMessageBatch(sendMessageBatchRequest)

def resp = client.receiveMessage(receiveMessageBatchRequest)
def totalAttrs = resp.messages().sum {it.messageAttributes().size() }

then:
resp.messages().size() == 3
sqsAttributeInjectionEnabled || totalAttrs == 18 // No additional attributes
!sqsAttributeInjectionEnabled || totalAttrs == 18 + 2 // Once not injected due to too many attrs

assertTraces(xrayInjectionEnabled ? 3 : 4) {
trace(0, 1) {

span(0) {
name "Sqs.CreateQueue"
kind CLIENT
}
}
trace(1, xrayInjectionEnabled ? 4 : 3) {
span(0) {
name "Sqs.SendMessageBatch"
kind CLIENT // TODO: Probably this should be producer, but that would be a breaking change
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"aws.requestId" "00000000-0000-0000-0000-000000000000"
"rpc.system" "aws-api"
"rpc.method" "SendMessageBatch"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) {
span(i) {
name "Sqs.ReceiveMessage"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
}
}
/**
* This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
*/
trace(2, 1) {
span(0) {
name "Sqs.ReceiveMessage"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.requestId" "00000000-0000-0000-0000-000000000000"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
}
if (!xrayInjectionEnabled) {
trace(3, 1) {
span(0) {
name "Sqs.ReceiveMessage"
kind CONSUMER

// TODO This is not nice at all, and can also happen if producer is not instrumented
hasNoParent()

attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
}
}
}
}
}

0 comments on commit 8b48a0b

Please sign in to comment.