Skip to content

Commit

Permalink
feat: collecting messaging.* span attributes in botocore instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
saartochner-lumigo committed Nov 13, 2022
1 parent 5f85a5b commit aa4a59f
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1424](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1424))
- `opentelemetry-instrumentation-fastapi` Add support for regular expression matching and sanitization of HTTP headers.
([#1403](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1403))
- `opentelemetry-instrumentation-botocore` add support for `messaging.*` in the sqs extension.
([#1350](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1350))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
_BotoResultT,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.span import Span

_SUPPORTED_OPERATIONS = ["SendMessage", "SendMessageBatch", "ReceiveMessage"]

_logger = logging.getLogger(__name__)


class _SqsExtension(_AwsSdkExtension):
Expand All @@ -24,3 +32,38 @@ def extract_attributes(self, attributes: _AttributeMapT):
if queue_url:
# TODO: update when semantic conventions exist
attributes["aws.queue_url"] = queue_url
attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.sqs"
attributes[SpanAttributes.MESSAGING_URL] = queue_url
try:
attributes[
SpanAttributes.MESSAGING_DESTINATION
] = queue_url.split("/")[-1]
except IndexError:
_logger.error(
"Could not extract messaging destination from '%s'",
queue_url,
)

def on_success(self, span: Span, result: _BotoResultT):
operation = self._call_context.operation
if operation in _SUPPORTED_OPERATIONS:
try:
if operation == "SendMessage":
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
result.get("MessageId"),
)
elif operation == "SendMessageBatch" and result.get(
"Successful"
):
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
result["Successful"][0]["MessageId"],
)
elif operation == "ReceiveMessage" and result.get("Messages"):
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
result["Messages"][0]["MessageId"],
)
except (IndexError, KeyError):
_logger.error("Could not extract the messaging message ID")
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import botocore.session
from moto import mock_sqs

from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase


class TestSqsExtension(TestBase):
def setUp(self):
super().setUp()
BotocoreInstrumentor().instrument()

session = botocore.session.get_session()
session.set_credentials(
access_key="access-key", secret_key="secret-key"
)
self.region = "us-west-2"
self.client = session.create_client("sqs", region_name=self.region)

def tearDown(self):
super().tearDown()
BotocoreInstrumentor().uninstrument()

@mock_sqs
def test_sqs_messaging_send_message(self):
create_queue_result = self.client.create_queue(
QueueName="test_queue_name"
)
queue_url = create_queue_result["QueueUrl"]
response = self.client.send_message(
QueueUrl=queue_url, MessageBody="content"
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
span = spans[1]
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], queue_url
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
"test_queue_name",
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID],
response["MessageId"],
)

@mock_sqs
def test_sqs_messaging_send_message_batch(self):
create_queue_result = self.client.create_queue(
QueueName="test_queue_name"
)
queue_url = create_queue_result["QueueUrl"]
response = self.client.send_message_batch(
QueueUrl=queue_url,
Entries=[
{"Id": "1", "MessageBody": "content"},
{"Id": "2", "MessageBody": "content2"},
],
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
span = spans[1]
self.assertEqual(span.attributes["rpc.method"], "SendMessageBatch")
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], queue_url
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
"test_queue_name",
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID],
response["Successful"][0]["MessageId"],
)

@mock_sqs
def test_sqs_messaging_receive_message(self):
create_queue_result = self.client.create_queue(
QueueName="test_queue_name"
)
queue_url = create_queue_result["QueueUrl"]
self.client.send_message(QueueUrl=queue_url, MessageBody="content")
message_result = self.client.receive_message(
QueueUrl=create_queue_result["QueueUrl"]
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 3)
span = spans[-1]
self.assertEqual(span.attributes["rpc.method"], "ReceiveMessage")
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], queue_url
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
"test_queue_name",
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_MESSAGE_ID],
message_result["Messages"][0]["MessageId"],
)

@mock_sqs
def test_sqs_messaging_failed_operation(self):
with self.assertRaises(Exception):
self.client.send_message(
QueueUrl="non-existing", MessageBody="content"
)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.attributes["rpc.method"], "SendMessage")
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_SYSTEM], "aws.sqs"
)
self.assertEqual(
span.attributes[SpanAttributes.MESSAGING_URL], "non-existing"
)

0 comments on commit aa4a59f

Please sign in to comment.