diff --git a/CHANGELOG.md b/CHANGELOG.md index db85c114f8..f336be0cba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1350](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1350)) - `opentelemetry-instrumentation-starlette` Add support for regular expression matching and sanitization of HTTP headers. ([#1404](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1404)) +- `opentelemetry-instrumentation-botocore` Add support for SNS `publish` and `publish_batch`. + ([#1409](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1409)) - Strip leading comments from SQL queries when generating the span name. ([#1434](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1434)) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py index 6b67767281..85a4904022 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py @@ -34,6 +34,7 @@ def loader(): _KNOWN_EXTENSIONS = { "dynamodb": _lazy_load(".dynamodb", "_DynamoDbExtension"), "lambda": _lazy_load(".lmbd", "_LambdaExtension"), + "sns": _lazy_load(".sns", "_SnsExtension"), "sqs": _lazy_load(".sqs", "_SqsExtension"), } diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/_messaging.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/_messaging.py new file mode 100644 index 0000000000..271a8475e6 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/_messaging.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Any, MutableMapping + +from opentelemetry.propagate import get_global_textmap, inject +from opentelemetry.propagators.textmap import CarrierT, Setter + +_logger = logging.getLogger(__name__) + +_MAX_MESSAGE_ATTRIBUTES = 10 + + +class MessageAttributesSetter(Setter[CarrierT]): + def set(self, carrier: CarrierT, key: str, value: str): + carrier[key] = { + "DataType": "String", + "StringValue": value, + } + + +message_attributes_setter = MessageAttributesSetter() + + +def inject_propagation_context( + carrier: MutableMapping[str, Any] +) -> MutableMapping[str, Any]: + if carrier is None: + carrier = {} + + fields = get_global_textmap().fields + if len(carrier.keys()) + len(fields) <= _MAX_MESSAGE_ATTRIBUTES: + inject(carrier, setter=message_attributes_setter) + else: + _logger.warning( + "botocore instrumentation: cannot set context propagation on " + "SQS/SNS message due to maximum amount of MessageAttributes" + ) + + return carrier diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sns.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sns.py new file mode 100644 index 0000000000..7849daa286 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/sns.py @@ -0,0 +1,166 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import inspect +from typing import Any, Dict, MutableMapping, Optional, Tuple + +from opentelemetry.instrumentation.botocore.extensions._messaging import ( + inject_propagation_context, +) +from opentelemetry.instrumentation.botocore.extensions.types import ( + _AttributeMapT, + _AwsSdkCallContext, + _AwsSdkExtension, +) +from opentelemetry.semconv.trace import ( + MessagingDestinationKindValues, + SpanAttributes, +) +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import Span + +################################################################################ +# SNS operations +################################################################################ + + +class _SnsOperation(abc.ABC): + @classmethod + @abc.abstractmethod + def operation_name(cls) -> str: + pass + + @classmethod + def span_kind(cls) -> SpanKind: + return SpanKind.CLIENT + + @classmethod + def extract_attributes( + cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT + ): + pass + + @classmethod + def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span): + pass + + +class _OpPublish(_SnsOperation): + _arn_arg_names = ("TopicArn", "TargetArn") + _phone_arg_name = "PhoneNumber" + + @classmethod + def operation_name(cls) -> str: + return "Publish" + + @classmethod + def span_kind(cls) -> SpanKind: + return SpanKind.PRODUCER + + @classmethod + def extract_attributes( + cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT + ): + destination_name, is_phone_number = cls._extract_destination_name( + call_context + ) + attributes[ + SpanAttributes.MESSAGING_DESTINATION_KIND + ] = MessagingDestinationKindValues.TOPIC.value + attributes[SpanAttributes.MESSAGING_DESTINATION] = destination_name + + call_context.span_name = ( + f"{'phone_number' if is_phone_number else destination_name} send" + ) + + @classmethod + def _extract_destination_name( + cls, call_context: _AwsSdkCallContext + ) -> Tuple[str, bool]: + arn = cls._extract_input_arn(call_context) + if arn: + return arn.rsplit(":", 1)[-1], False + + if cls._phone_arg_name: + phone_number = call_context.params.get(cls._phone_arg_name) + if phone_number: + return phone_number, True + + return "unknown", False + + @classmethod + def _extract_input_arn( + cls, call_context: _AwsSdkCallContext + ) -> Optional[str]: + for input_arn in cls._arn_arg_names: + arn = call_context.params.get(input_arn) + if arn: + return arn + return None + + @classmethod + def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span): + cls._inject_span_into_entry(call_context.params) + + @classmethod + def _inject_span_into_entry(cls, entry: MutableMapping[str, Any]): + entry["MessageAttributes"] = inject_propagation_context( + entry.get("MessageAttributes") + ) + + +class _OpPublishBatch(_OpPublish): + _arn_arg_names = ("TopicArn",) + _phone_arg_name = None + + @classmethod + def operation_name(cls) -> str: + return "PublishBatch" + + @classmethod + def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span): + for entry in call_context.params.get("PublishBatchRequestEntries", ()): + cls._inject_span_into_entry(entry) + + +################################################################################ +# SNS extension +################################################################################ + +_OPERATION_MAPPING = { + op.operation_name(): op + for op in globals().values() + if inspect.isclass(op) + and issubclass(op, _SnsOperation) + and not inspect.isabstract(op) +} # type: Dict[str, _SnsOperation] + + +class _SnsExtension(_AwsSdkExtension): + def __init__(self, call_context: _AwsSdkCallContext): + super().__init__(call_context) + self._op = _OPERATION_MAPPING.get(call_context.operation) + if self._op: + call_context.span_kind = self._op.span_kind() + + def extract_attributes(self, attributes: _AttributeMapT): + attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.sns" + + if self._op: + self._op.extract_attributes(self._call_context, attributes) + + def before_service_call(self, span: Span): + if self._op: + self._op.before_service_call(self._call_context, span) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_messaging.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_messaging.py new file mode 100644 index 0000000000..d8a92e0cf9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_messaging.py @@ -0,0 +1,52 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry.instrumentation.botocore.extensions._messaging import ( + inject_propagation_context, + message_attributes_setter, +) +from opentelemetry.test.test_base import TestBase + + +class TestMessageAttributes(TestBase): + def test_message_attributes_setter(self): + carrier = {} + + message_attributes_setter.set(carrier, "key", "value") + self.assertEqual( + {"key": {"DataType": "String", "StringValue": "value"}}, carrier + ) + + def test_inject_propagation_context(self): + carrier = { + "key1": {"DataType": "String", "StringValue": "value1"}, + "key2": {"DataType": "String", "StringValue": "value2"}, + } + + tracer = self.tracer_provider.get_tracer("test-tracer") + with tracer.start_as_current_span("span"): + inject_propagation_context(carrier) + + self.assertGreater(len(carrier), 2) + + def test_inject_propagation_context_too_many_attributes(self): + carrier = { + f"key{idx}": {"DataType": "String", "StringValue": f"value{idx}"} + for idx in range(10) + } + tracer = self.tracer_provider.get_tracer("test-tracer") + with tracer.start_as_current_span("span"): + inject_propagation_context(carrier) + + self.assertEqual(10, len(carrier)) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sns.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sns.py new file mode 100644 index 0000000000..33f2531027 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_sns.py @@ -0,0 +1,189 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +from typing import Any, Dict +from unittest import mock + +import botocore.session +from botocore.awsrequest import AWSResponse +from moto import mock_sns + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.semconv.trace import ( + MessagingDestinationKindValues, + SpanAttributes, +) +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import Span + + +class TestSnsExtension(TestBase): + def setUp(self): + super().setUp() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.client = session.create_client("sns", region_name="us-west-2") + self.topic_name = "my-topic" + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + + def _create_topic(self, name: str = None) -> str: + if name is None: + name = self.topic_name + + response = self.client.create_topic(Name=name) + + self.memory_exporter.clear() + return response["TopicArn"] + + @contextlib.contextmanager + def _mocked_aws_endpoint(self, response): + response_func = self._make_aws_response_func(response) + with mock.patch( + "botocore.endpoint.Endpoint.make_request", new=response_func + ): + yield + + @staticmethod + def _make_aws_response_func(response): + def _response_func(*args, **kwargs): + return AWSResponse("http://127.0.0.1", 200, {}, "{}"), response + + return _response_func + + def assert_span(self, name: str) -> Span: + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + self.assertEqual(SpanKind.PRODUCER, span.kind) + self.assertEqual(name, span.name) + self.assertEqual( + "aws.sns", span.attributes[SpanAttributes.MESSAGING_SYSTEM] + ) + + return span + + def assert_injected_span(self, message_attrs: Dict[str, Any], span: Span): + # traceparent: --- + trace_parent = message_attrs["traceparent"]["StringValue"].split("-") + span_context = span.get_span_context() + + self.assertEqual(span_context.trace_id, int(trace_parent[1], 16)) + self.assertEqual(span_context.span_id, int(trace_parent[2], 16)) + + @mock_sns + def test_publish_to_topic_arn(self): + self._test_publish_to_arn("TopicArn") + + @mock_sns + def test_publish_to_target_arn(self): + self._test_publish_to_arn("TargetArn") + + def _test_publish_to_arn(self, arg_name: str): + target_arn = self._create_topic(self.topic_name) + + self.client.publish( + **{ + arg_name: target_arn, + "Message": "Hello message", + } + ) + + span = self.assert_span(f"{self.topic_name} send") + self.assertEqual( + MessagingDestinationKindValues.TOPIC.value, + span.attributes[SpanAttributes.MESSAGING_DESTINATION_KIND], + ) + self.assertEqual( + self.topic_name, + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + ) + + @mock_sns + def test_publish_to_phone_number(self): + phone_number = "+10000000000" + self.client.publish( + PhoneNumber=phone_number, + Message="Hello SNS", + ) + + span = self.assert_span("phone_number send") + self.assertEqual( + phone_number, span.attributes[SpanAttributes.MESSAGING_DESTINATION] + ) + + @mock_sns + def test_publish_injects_span(self): + message_attrs = {} + topic_arn = self._create_topic() + self.client.publish( + TopicArn=topic_arn, + Message="Hello Message", + MessageAttributes=message_attrs, + ) + + span = self.assert_span(f"{self.topic_name} send") + self.assert_injected_span(message_attrs, span) + + def test_publish_batch_to_topic(self): + topic_arn = f"arn:aws:sns:region:000000000:{self.topic_name}" + message1_attrs = {} + message2_attrs = {} + mock_response = { + "Successful": [ + {"Id": "1", "MessageId": "11", "SequenceNumber": "1"}, + {"Id": "2", "MessageId": "22", "SequenceNumber": "2"}, + ], + "Failed": [], + } + + # publish_batch not implemented by moto so mock the endpoint instead + with self._mocked_aws_endpoint(mock_response): + self.client.publish_batch( + TopicArn=topic_arn, + PublishBatchRequestEntries=[ + { + "Id": "1", + "Message": "Hello message 1", + "MessageAttributes": message1_attrs, + }, + { + "Id": "2", + "Message": "Hello message 2", + "MessageAttributes": message2_attrs, + }, + ], + ) + + span = self.assert_span(f"{self.topic_name} send") + self.assertEqual( + MessagingDestinationKindValues.TOPIC.value, + span.attributes[SpanAttributes.MESSAGING_DESTINATION_KIND], + ) + self.assertEqual( + self.topic_name, + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + ) + + self.assert_injected_span(message1_attrs, span) + self.assert_injected_span(message2_attrs, span)