Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws-sdk-2.2.: Support injection into SQS.SendMessageBatch message attributes #8798

Merged
merged 10 commits into from
Jun 29, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ dependencies {
tasks {
test {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)

// NB: If you'd like to change these, there is some cleanup work to be done, as most tests ignore this and
// set the value directly (the "library" does not normally query it, only library-autoconfigure)
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@

import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

// helper class for calling methods that use sqs types in SqsImpl
// if SqsImpl is not present these methods are no op
final class SqsAccess {
private static final Logger logger = Logger.getLogger(SqsAccess.class.getName());

private SqsAccess() {}

private static final boolean enabled = isSqsImplPresent();
Expand All @@ -32,47 +33,32 @@ private static boolean isSqsImplPresent() {
Class.forName(SqsAccess.class.getName().replace(".SqsAccess", ".SqsImpl"));
return true;
} catch (ClassNotFoundException e) {
logger.log(Level.FINE, "SqsImpl not present, probably incompatible version", e);
return false;
}
}

@NoMuzzle
static boolean isSendMessageRequest(SdkRequest request) {
return enabled && request instanceof SendMessageRequest;
}

@NoMuzzle
static SdkRequest injectIntoSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
assert enabled; // enabled checked already in instance check.
return SqsImpl.injectIntoSendMessageRequest(messagingPropagator, rawRequest, otelContext);
}

@NoMuzzle
static boolean isReceiveMessageRequest(SdkRequest request) {
return enabled && request instanceof ReceiveMessageRequest;
}

@NoMuzzle
public static SdkRequest modifyReceiveMessageRequest(
SdkRequest request, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
assert enabled; // enabled checked already in instance check.
return SqsImpl.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
}

@NoMuzzle
static boolean isReceiveMessageResponse(SdkResponse response) {
return enabled && response instanceof ReceiveMessageResponse;
static boolean afterReceiveMessageExecution(
Context.AfterExecution context,
ExecutionAttributes executionAttributes,
TracingExecutionInterceptor config) {
return enabled && SqsImpl.afterReceiveMessageExecution(context, executionAttributes, config);
}

/**
* Returns {@code null} (not the unmodified {@code request}!) if nothing matched, so that other
* handling can be tried.
*/
@Nullable
@NoMuzzle
static void afterReceiveMessageExecution(
TracingExecutionInterceptor config,
Context.AfterExecution context,
ExecutionAttributes executionAttributes) {
assert enabled; // enabled checked already in instance check.
SqsImpl.afterReceiveMessageExecution(config, executionAttributes, context);
static SdkRequest modifyRequest(
SdkRequest request,
io.opentelemetry.context.Context otelContext,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
return enabled
? SqsImpl.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator)
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,58 +11,48 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

// this class is only used from SqsAccess from method with @NoMuzzle annotation
final class SqsImpl {
private SqsImpl() {}

static SdkRequest injectIntoSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
SendMessageRequest request = (SendMessageRequest) rawRequest;
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());

messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});
static boolean afterReceiveMessageExecution(
Context.AfterExecution context,
ExecutionAttributes executionAttributes,
TracingExecutionInterceptor config) {

if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call.
return request;
SdkResponse rawResponse = context.response();
if (!(rawResponse instanceof ReceiveMessageResponse)) {
return false;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}

/** Create and close CONSUMER span for each message consumed. */
static void afterReceiveMessageExecution(
TracingExecutionInterceptor config,
ExecutionAttributes executionAttributes,
Context.AfterExecution context) {
ReceiveMessageResponse response = (ReceiveMessageResponse) context.response();
ReceiveMessageResponse response = (ReceiveMessageResponse) rawResponse;
SdkHttpResponse httpResponse = context.httpResponse();
for (Message message : response.messages()) {
createConsumerSpan(config, message, executionAttributes, httpResponse);
createConsumerSpan(message, httpResponse, executionAttributes, config);
}

return true;
}

private static void createConsumerSpan(
TracingExecutionInterceptor config,
Message message,
SdkHttpResponse httpResponse,
ExecutionAttributes executionAttributes,
SdkHttpResponse httpResponse) {
TracingExecutionInterceptor config) {

io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();

Expand Down Expand Up @@ -91,9 +81,81 @@ private static void createConsumerSpan(
}
}

static SdkRequest modifyReceiveMessageRequest(
SdkRequest rawRequest, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
ReceiveMessageRequest request = (ReceiveMessageRequest) rawRequest;
@Nullable
static SdkRequest modifyRequest(
SdkRequest request,
io.opentelemetry.context.Context otelContext,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
if (request instanceof ReceiveMessageRequest) {
return modifyReceiveMessageRequest(
(ReceiveMessageRequest) request, useXrayPropagator, messagingPropagator);
} else if (messagingPropagator != null) {
if (request instanceof SendMessageRequest) {
return injectIntoSendMessageRequest(
(SendMessageRequest) request, otelContext, messagingPropagator);
} else if (request instanceof SendMessageBatchRequest) {
return injectIntoSendMessageBatchRequest(
(SendMessageBatchRequest) request, otelContext, messagingPropagator);
}
}
return null;
}

private static SdkRequest injectIntoSendMessageBatchRequest(
SendMessageBatchRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
ArrayList<SendMessageBatchRequestEntry> entries = new ArrayList<>(request.entries());
for (int i = 0; i < entries.size(); ++i) {
SendMessageBatchRequestEntry entry = entries.get(i);
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());

// TODO: Per https://github.com/open-telemetry/oteps/pull/220, each message should get
// a separate context. We don't support this yet, also because it would be inconsistent
// with the header-based X-Ray propagation
// (probably could override it here by setting the X-Ray message system attribute)
if (injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
entries.set(i, entry.toBuilder().messageAttributes(messageAttributes).build());
}
}
return request.toBuilder().entries(entries).build();
}

private static SdkRequest injectIntoSendMessageRequest(
SendMessageRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
if (!injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
return request;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}

private static boolean injectIntoMessageAttributes(
Map<String, MessageAttributeValue> messageAttributes,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});

// Return whether the injection resulted in an attribute count that is still supported.
// See
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
return messageAttributes.size() <= 10;
}

private static SdkRequest modifyReceiveMessageRequest(
ReceiveMessageRequest request,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
boolean hasXrayAttribute = true;
List<String> existingAttributeNames = null;
if (useXrayPropagator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ public SdkRequest modifyRequest(
throw throwable;
}

if (SqsAccess.isReceiveMessageRequest(request)) {
return SqsAccess.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
} else if (messagingPropagator != null) {
if (SqsAccess.isSendMessageRequest(request)) {
return SqsAccess.injectIntoSendMessageRequest(messagingPropagator, request, otelContext);
}
// TODO: Support SendMessageBatchRequest (and thus SendMessageBatchRequestEntry)
SdkRequest sqsModifiedRequest =
SqsAccess.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator);
if (sqsModifiedRequest != null) {
return sqsModifiedRequest;
}

// Insert other special handling here, following the same pattern as SQS.

return request;
}

Expand Down Expand Up @@ -225,9 +225,9 @@ private void populateRequestAttributes(
@Override
public void afterExecution(
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
if (SqsAccess.isReceiveMessageResponse(context.response())) {
SqsAccess.afterReceiveMessageExecution(this, context, executionAttributes);
}

// Other special handling could be shortcut-&&ed after this (false is returned if not handled).
SqsAccess.afterReceiveMessageExecution(context, executionAttributes, this);

io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
if (otelContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest imp
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseXrayPropagator(false) // Disable to confirm messaging propagator actually works
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
true
}

@Override
boolean isXrayInjectionEnabled() {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2S
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.build()
.newExecutionInterceptor())
}

@Override
boolean isSqsAttributeInjectionEnabled() {
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
return AttributeValue.builder().s(value).build()
}

def isSqsAttributeInjectionEnabled() {
static boolean isSqsAttributeInjectionEnabled() {
// See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor
return ConfigPropertiesUtil.getBoolean("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false)
}
Expand Down
Loading