Skip to content

Commit

Permalink
aws-sdk-2.2.: Support injection into SQS.SendMessageBatch message att…
Browse files Browse the repository at this point in the history
…ributes (#8798)

Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>
  • Loading branch information
Oberon00 and Mateusz Rzeszutek committed Jun 29, 2023
1 parent eaf11ab commit a1f6917
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 84 deletions.
3 changes: 3 additions & 0 deletions instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ testing {
tasks {
withType<Test> {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)

// NB: If you'd like to change these, there is some cleanup work to be done, as most tests ignore this and
// set the value directly (the "library" does not normally query it, only library-autoconfigure)
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@

import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

// helper class for calling methods that use sqs types in SqsImpl
// if SqsImpl is not present these methods are no op
Expand All @@ -23,42 +20,26 @@ private SqsAccess() {}
private static final boolean enabled = PluginImplUtil.isImplPresent("SqsImpl");

@NoMuzzle
static boolean isSendMessageRequest(SdkRequest request) {
return enabled && request instanceof SendMessageRequest;
}

@NoMuzzle
static SdkRequest injectIntoSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
assert enabled; // enabled checked already in instance check.
return SqsImpl.injectIntoSendMessageRequest(messagingPropagator, rawRequest, otelContext);
}

@NoMuzzle
static boolean isReceiveMessageRequest(SdkRequest request) {
return enabled && request instanceof ReceiveMessageRequest;
}

@NoMuzzle
public static SdkRequest modifyReceiveMessageRequest(
SdkRequest request, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
assert enabled; // enabled checked already in instance check.
return SqsImpl.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
}

@NoMuzzle
static boolean isReceiveMessageResponse(SdkResponse response) {
return enabled && response instanceof ReceiveMessageResponse;
static boolean afterReceiveMessageExecution(
Context.AfterExecution context,
ExecutionAttributes executionAttributes,
TracingExecutionInterceptor config) {
return enabled && SqsImpl.afterReceiveMessageExecution(context, executionAttributes, config);
}

/**
* Returns {@code null} (not the unmodified {@code request}!) if nothing matched, so that other
* handling can be tried.
*/
@Nullable
@NoMuzzle
static void afterReceiveMessageExecution(
TracingExecutionInterceptor config,
Context.AfterExecution context,
ExecutionAttributes executionAttributes) {
assert enabled; // enabled checked already in instance check.
SqsImpl.afterReceiveMessageExecution(config, executionAttributes, context);
static SdkRequest modifyRequest(
SdkRequest request,
io.opentelemetry.context.Context otelContext,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
return enabled
? SqsImpl.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator)
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpResponse;
Expand All @@ -20,6 +22,8 @@
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

// this class is only used from SqsAccess from method with @NoMuzzle annotation
Expand All @@ -33,44 +37,30 @@ final class SqsImpl {

private SqsImpl() {}

static SdkRequest injectIntoSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
SendMessageRequest request = (SendMessageRequest) rawRequest;
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());

messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});
static boolean afterReceiveMessageExecution(
Context.AfterExecution context,
ExecutionAttributes executionAttributes,
TracingExecutionInterceptor config) {

if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call.
return request;
SdkResponse rawResponse = context.response();
if (!(rawResponse instanceof ReceiveMessageResponse)) {
return false;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}

/** Create and close CONSUMER span for each message consumed. */
static void afterReceiveMessageExecution(
TracingExecutionInterceptor config,
ExecutionAttributes executionAttributes,
Context.AfterExecution context) {
ReceiveMessageResponse response = (ReceiveMessageResponse) context.response();
ReceiveMessageResponse response = (ReceiveMessageResponse) rawResponse;
SdkHttpResponse httpResponse = context.httpResponse();
for (Message message : response.messages()) {
createConsumerSpan(config, message, executionAttributes, httpResponse);
createConsumerSpan(message, httpResponse, executionAttributes, config);
}

return true;
}

private static void createConsumerSpan(
TracingExecutionInterceptor config,
Message message,
SdkHttpResponse httpResponse,
ExecutionAttributes executionAttributes,
SdkHttpResponse httpResponse) {
TracingExecutionInterceptor config) {

io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();

Expand Down Expand Up @@ -99,9 +89,81 @@ private static void createConsumerSpan(
}
}

static SdkRequest modifyReceiveMessageRequest(
SdkRequest rawRequest, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
ReceiveMessageRequest request = (ReceiveMessageRequest) rawRequest;
@Nullable
static SdkRequest modifyRequest(
SdkRequest request,
io.opentelemetry.context.Context otelContext,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
if (request instanceof ReceiveMessageRequest) {
return modifyReceiveMessageRequest(
(ReceiveMessageRequest) request, useXrayPropagator, messagingPropagator);
} else if (messagingPropagator != null) {
if (request instanceof SendMessageRequest) {
return injectIntoSendMessageRequest(
(SendMessageRequest) request, otelContext, messagingPropagator);
} else if (request instanceof SendMessageBatchRequest) {
return injectIntoSendMessageBatchRequest(
(SendMessageBatchRequest) request, otelContext, messagingPropagator);
}
}
return null;
}

private static SdkRequest injectIntoSendMessageBatchRequest(
SendMessageBatchRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
ArrayList<SendMessageBatchRequestEntry> entries = new ArrayList<>(request.entries());
for (int i = 0; i < entries.size(); ++i) {
SendMessageBatchRequestEntry entry = entries.get(i);
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());

// TODO: Per https://github.com/open-telemetry/oteps/pull/220, each message should get
// a separate context. We don't support this yet, also because it would be inconsistent
// with the header-based X-Ray propagation
// (probably could override it here by setting the X-Ray message system attribute)
if (injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
entries.set(i, entry.toBuilder().messageAttributes(messageAttributes).build());
}
}
return request.toBuilder().entries(entries).build();
}

private static SdkRequest injectIntoSendMessageRequest(
SendMessageRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
if (!injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
return request;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}

private static boolean injectIntoMessageAttributes(
Map<String, MessageAttributeValue> messageAttributes,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});

// Return whether the injection resulted in an attribute count that is still supported.
// See
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
return messageAttributes.size() <= 10;
}

private static SdkRequest modifyReceiveMessageRequest(
ReceiveMessageRequest request,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
boolean hasXrayAttribute = true;
List<String> existingAttributeNames = null;
if (useXrayPropagator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ public SdkRequest modifyRequest(
throw throwable;
}

if (SqsAccess.isReceiveMessageRequest(request)) {
return SqsAccess.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
} else if (messagingPropagator != null) {
if (SqsAccess.isSendMessageRequest(request)) {
return SqsAccess.injectIntoSendMessageRequest(messagingPropagator, request, otelContext);
}
// TODO: Support SendMessageBatchRequest (and thus SendMessageBatchRequestEntry)
SdkRequest sqsModifiedRequest =
SqsAccess.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator);
if (sqsModifiedRequest != null) {
return sqsModifiedRequest;
}

// Insert other special handling here, following the same pattern as SQS.

return request;
}

Expand Down Expand Up @@ -225,9 +225,9 @@ private void populateRequestAttributes(
@Override
public void afterExecution(
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
if (SqsAccess.isReceiveMessageResponse(context.response())) {
SqsAccess.afterReceiveMessageExecution(this, context, executionAttributes);
}

// Other special handling could be shortcut-&&ed after this (false is returned if not handled).
SqsAccess.afterReceiveMessageExecution(context, executionAttributes, this);

io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
if (otelContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest imp
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseXrayPropagator(false) // Disable to confirm messaging propagator actually works
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
true
}

@Override
boolean isXrayInjectionEnabled() {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2S
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT

@Unroll
abstract class AbstractAws2ClientCoreTest extends InstrumentationSpecification {
def isSqsAttributeInjectionEnabled() {
static boolean isSqsAttributeInjectionEnabled() {
// See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor
return ConfigPropertiesUtil.getBoolean("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false)
}
Expand Down
Loading

0 comments on commit a1f6917

Please sign in to comment.