Skip to content

Commit

Permalink
sqs-messaging-system from current lumigo's main
Browse files Browse the repository at this point in the history
  • Loading branch information
CircleCI committed Oct 18, 2022
1 parent c8020cb commit c8314a8
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
_BotoResultT,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.span import Span

_SUPPORTED_OPERATIONS = ["SendMessage", "SendMessageBatch", "ReceiveMessage"]


class _SqsExtension(_AwsSdkExtension):
Expand All @@ -24,3 +29,27 @@ def extract_attributes(self, attributes: _AttributeMapT):
if queue_url:
# TODO: update when semantic conventions exist
attributes["aws.queue_url"] = queue_url
attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.sqs"
attributes[SpanAttributes.MESSAGING_URL] = queue_url
attributes[SpanAttributes.MESSAGING_DESTINATION] = queue_url.split(
"/"
)[-1]

def on_success(self, span: Span, result: _BotoResultT):
operation = self._call_context.operation
if operation in _SUPPORTED_OPERATIONS:
if operation == "SendMessage":
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
result.get("MessageId"),
)
elif operation == "SendMessageBatch" and result.get("Successful"):
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
result["Successful"][0]["MessageId"],
)
elif operation == "ReceiveMessage" and result.get("Messages"):
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
result["Messages"][0]["MessageId"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import botocore.session
from moto import mock_sqs

from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase


class TestDynamoDbExtension(TestBase):
def setUp(self):
super().setUp()
BotocoreInstrumentor().instrument()

session = botocore.session.get_session()
session.set_credentials(
access_key="access-key", secret_key="secret-key"
)
self.region = "us-west-2"
self.client = session.create_client("sqs", region_name=self.region)

def tearDown(self):
super().tearDown()
BotocoreInstrumentor().uninstrument()

@mock_sqs
def test_sqs_messaging_send_message(self):
create_queue_result = self.client.create_queue(
QueueName="test_queue_name"
)
queue_url = create_queue_result["QueueUrl"]
response = self.client.send_message(
QueueUrl=queue_url, MessageBody="content"
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
span = spans[1]
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], queue_url
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
"test_queue_name",
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID],
response["MessageId"],
)

@mock_sqs
def test_sqs_messaging_send_message_batch(self):
create_queue_result = self.client.create_queue(
QueueName="test_queue_name"
)
queue_url = create_queue_result["QueueUrl"]
response = self.client.send_message_batch(
QueueUrl=queue_url,
Entries=[
{"Id": "1", "MessageBody": "content"},
{"Id": "2", "MessageBody": "content2"},
],
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
span = spans[1]
self.assertEqual(span.attributes["rpc.method"], "SendMessageBatch")
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], queue_url
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
"test_queue_name",
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID],
response["Successful"][0]["MessageId"],
)

@mock_sqs
def test_sqs_messaging_receive_message(self):
create_queue_result = self.client.create_queue(
QueueName="test_queue_name"
)
queue_url = create_queue_result["QueueUrl"]
self.client.send_message(QueueUrl=queue_url, MessageBody="content")
message_result = self.client.receive_message(
QueueUrl=create_queue_result["QueueUrl"]
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 3)
span = spans[-1]
self.assertEqual(span.attributes["rpc.method"], "ReceiveMessage")
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], queue_url
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
"test_queue_name",
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID],
message_result["Messages"][0]["MessageId"],
)

@mock_sqs
def test_sqs_messaging_failed_operation(self):
with self.assertRaises(Exception):
self.client.send_message(
QueueUrl="non-existing", MessageBody="content"
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.attributes["rpc.method"], "SendMessage")
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], "non-existing"
)

0 comments on commit c8314a8

Please sign in to comment.