From 4fb566910963a186f6117cdc5dee285ccb4b4948 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Mon, 2 May 2022 12:26:46 +0300 Subject: [PATCH 01/11] Add Boto3SQS Instrumentation --- .github/component_owners.yml | 4 + .../README.rst | 23 ++ .../setup.cfg | 58 +++ .../setup.py | 42 +++ .../instrumentation/boto3sqs/__init__.py | 332 ++++++++++++++++++ .../instrumentation/boto3sqs/package.py | 16 + .../instrumentation/boto3sqs/version.py | 15 + .../tests/__init__.py | 0 8 files changed, 490 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/version.py create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/tests/__init__.py diff --git a/.github/component_owners.yml b/.github/component_owners.yml index 669ce38751..8292e2f527 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -7,6 +7,10 @@ components: - oxeye-nikolay - nikosokolik + instrumentation/opentelemetry-instrumentation-boto3sqs: + - oxeye-nikolay + - nikosokolik + propagator/opentelemetry-propagator-aws-xray: - NathanielRN diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst b/instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst new file mode 100644 index 0000000000..06901901f9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst @@ -0,0 +1,23 @@ +OpenTelemetry Boto3 SQS Instrumentation +=========================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-boto3sqs.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-boto3sqs/ + +This library allows tracing requests made by the Boto3 library to the SQS service. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-boto3sqs + + +References +---------- + +* `OpenTelemetry boto3sqs/ Tracing `_ +* `OpenTelemetry Project `_ diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg new file mode 100644 index 0000000000..4ed71bae67 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg @@ -0,0 +1,58 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +# opentelemetry-instrumentation plus the name of the library being instrument e.g +# name = opentelemetry-instrumentation-sqlalchemy +name = opentelemetry-instrumentation-boto3sqs +# a description of the instrumentation e.g +# description = SQLAlchemy tracing for OpenTelemetry +description = Boto3 SQS service tracing for OpenTelemetry +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +# url of the instrumentation e.g +url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-boto3sqs +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: +install_requires = + opentelemetry-api ~= 1.3 + +[options.extras_require] +test = + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + boto3sqs = opentelemetry.instrumentation.boto3sqs:Boto3SQSInstrumentation \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py new file mode 100644 index 0000000000..6faafd8dd4 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py @@ -0,0 +1,42 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + # REPLACE ME: the path to the version file e.g + # BASE_DIR, "src", "opentelemetry", "instrumentation", "sqlalchemy", "version.py" + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "boto3sqs", + "version.py", +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup( + version=PACKAGE_INFO["__version__"], + entry_points={ + "opentelemetry_instrumentor": [ + # boto3sqs: the entrypoint for the instrumentor e.g + # "sqlalchemy = opentelemetry.instrumentation.sqlalchemy:SQLAlchemyInstrumentor" + "boto3sqs = opentelemetry.instrumentation.Boto3SQSInstrumentor" + ] + }, +) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py new file mode 100644 index 0000000000..272de8e2a9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -0,0 +1,332 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Instrument `boto3sqs`_ to trace SQS applications. + +.. _boto3sqs: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html + + +Usage +----- + +.. code:: python + + import boto3 + from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor + + + Boto3SQSInstrumentor().instrument() +""" + +import wrapt +import logging +import boto3 +import botocore.client +from typing import Dict, Collection, List, Optional, Any, Tuple +from opentelemetry import context, propagate, trace +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.boto3sqs.version import __version__ +from opentelemetry.instrumentation.boto3sqs.package import _instruments +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.propagators.textmap import Getter, Setter, CarrierT +from opentelemetry.trace import SpanKind, Tracer, Span, TracerProvider, Link +from opentelemetry.semconv.trace import ( + MessagingOperationValues, + SpanAttributes, + MessagingDestinationKindValues, +) + +logger = logging.getLogger(__name__) +OPENTELEMETRY_ATTRIBUTE_IDENTIFIER = "otel." + + +class Boto3SQSGetter(Getter): + def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: + if not ( + value := carrier.get( + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", {} + ) + ): + return None + return [value.get("StringValue", None)] + + def keys(self, carrier: CarrierT) -> List[str]: + return [ + key.rstrip(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) + if key.startswith(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) + else key + for key in carrier.keys() + ] + + +class Boto3SQSSetter(Setter): + def set(self, carrier: CarrierT, key: str, value: str) -> None: + if len(carrier.items()) < 10: + carrier[f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] = { + "StringValue": value, + "DataType": "String", + } + else: + logger.warning( + "Boto3 SQS instrumentation: cannot set context propagation on SQS/SNS message due to maximum amount of " + "MessageAttributes" + ) + + +boto3sqs_getter = Boto3SQSGetter() +boto3sqs_setter = Boto3SQSSetter() + + +class Boto3SQSInstrumentor(BaseInstrumentor): + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + @staticmethod + def _enrich_span( + span: Span, + queue_name: str, + conversation_id: Optional[str] = None, + operation: Optional[MessagingOperationValues] = None, + message_id: Optional[str] = None, + ) -> None: + if not span.is_recording(): + return + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "aws.sqs") + span.set_attribute( + SpanAttributes.MESSAGING_DESTINATION_KIND, + MessagingDestinationKindValues.QUEUE.value, + ) + if operation: + span.set_attribute( + SpanAttributes.MESSAGING_OPERATION, operation.value + ) + else: + span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_name) + if conversation_id: + span.set_attribute( + SpanAttributes.MESSAGING_CONVERSATION_ID, conversation_id + ) + if message_id: + span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message_id) + + @staticmethod + def _extract_queue_name_from_url(queue_url: str) -> str: + # A Queue name cannot have the `/` char, therefore we can return the part after the last / + return queue_url.split("/")[-1] + + def _wrap_send_message(self) -> None: + def send_wrapper(wrapped, instance, args, kwargs): + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + queue_url = kwargs.get("QueueUrl") + # The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the + # original exception + queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url( + queue_url + ) + span = self._tracer.start_span( + name=f"{queue_name} send", + kind=SpanKind.PRODUCER, + ) + if span.is_recording(): + Boto3SQSInstrumentor._enrich_span(span, queue_name) + with trace.use_span(span, end_on_exit=True): + attributes = kwargs.pop("MessageAttributes", {}) + propagate.inject(attributes, setter=boto3sqs_setter) + retval = wrapped(*args, MessageAttributes=attributes, **kwargs) + if message_id := retval.get("MessageId"): + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, message_id + ) + return retval + + wrapt.wrap_function_wrapper( + self._sqs_class, "send_message", send_wrapper + ) + + def _wrap_send_message_batch(self) -> None: + def send_batch_wrapper(wrapped, instance, args, kwargs): + queue_url = kwargs.get("QueueUrl") + entries = kwargs.get("Entries") + # The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the + # origial exception + if ( + context.get_value(_SUPPRESS_INSTRUMENTATION_KEY) + or not queue_url + or not entries + ): + return wrapped(*args, **kwargs) + queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url( + queue_url + ) + ids_to_spans: Dict[str, Span] = {} + for entry in entries: + entry_id = entry["Id"] + span = self._tracer.start_span( + name=f"{queue_name} send", + kind=SpanKind.PRODUCER, + ) + ids_to_spans[entry_id] = span + if span.is_recording(): + Boto3SQSInstrumentor._enrich_span( + span, queue_name, conversation_id=entry_id + ) + with trace.use_span(span): + if "MessageAttributes" not in entry: + entry["MessageAttributes"] = {} + propagate.inject( + entry["MessageAttributes"], setter=boto3sqs_setter + ) + retval = wrapped(*args, **kwargs) + for successful_messages in retval["Successful"]: + message_identifier = successful_messages["Id"] + if message_span := ids_to_spans.get(message_identifier): + message_span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + successful_messages.get("MessageId"), + ) + for span in ids_to_spans.values(): + span.end() + return retval + + wrapt.wrap_function_wrapper( + self._sqs_class, "send_message_batch", send_batch_wrapper + ) + + def _wrap_receive_message(self) -> None: + def receive_message_wrapper(wrapped, instance, args, kwargs): + queue_url = kwargs.get("QueueUrl") + message_attribute_names = kwargs.pop("MessageAttributeNames", []) + message_attribute_names.append( + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}*" + ) + queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url( + queue_url + ) + with self._tracer.start_as_current_span(name=f"{queue_name} receive", end_on_exit=True) as span: + Boto3SQSInstrumentor._enrich_span( + span, queue_name + ) + retval = wrapped( + *args, MessageAttributeNames=message_attribute_names, **kwargs + ) + messages = retval.get("Messages", []) + for message in messages: + if not (receipt_handle := message["ReceiptHandle"]): + continue + if receipt_handle in self._received_messages_spans: + span, token = self._received_messages_spans[receipt_handle] + context.detach(token) + span.end() + message_attributes = message.get("MessageAttributes", {}) + links = [] + if ctx := propagate.extract( + message_attributes, getter=boto3sqs_getter + ): + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + span = self._tracer.start_span( + name=f"{queue_name} produce", + links=links, + kind=SpanKind.CONSUMER, + ) + with trace.use_span(span): + new_context = trace.set_span_in_context(span, ctx) + token = context.attach(new_context) + message_id = message.get("MessageId", None) + self._received_messages_spans[receipt_handle] = ( + span, + token, + ) + Boto3SQSInstrumentor._enrich_span( + span, queue_name, message_id=message_id + ) + return retval + + wrapt.wrap_function_wrapper( + self._sqs_class, "receive_message", receive_message_wrapper + ) + + def _wrap_delete_message(self) -> None: + def delete_message_wrapper(wrapped, instance, args, kwargs): + if receipt_handle := kwargs.get("ReceiptHandle"): + if receipt_handle in self._received_messages_spans: + span, token = self._received_messages_spans[receipt_handle] + context.detach(token) + span.end() + return wrapped(*args, **kwargs) + + wrapt.wrap_function_wrapper( + self._sqs_class, "delete_message", delete_message_wrapper + ) + + def _wrap_client_creation(self) -> None: + """ + Since botocore creates classes on the fly using schemas, the SQS class is not necesraily created upon the call + of `instrument()`. Therefore we need to wrap the creation of the boto3 client, which triggers the creation of + the SQS client. + """ + + def client_wrapper(wrapped, instance, args, kwargs): + retval = wrapped(*args, **kwargs) + if not self._did_decorate: + self._decorate_sqs() + return retval + + wrapt.wrap_function_wrapper(boto3, "client", client_wrapper) + + def _decorate_sqs(self) -> None: + """ + Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base + class and defines SQS to wrap. + """ + sqs_class = [ + cls + for cls in botocore.client.BaseClient.__subclasses__() + if "botocore.client.SQS" in str(cls) + ] + if sqs_class: + self._sqs_class = sqs_class[0] + self._did_decorate = True + self._wrap_send_message() + self._wrap_send_message_batch() + self._wrap_receive_message() + self._wrap_delete_message() + + def _un_decorate_sqs(self) -> None: + if self._did_decorate: + unwrap(self._sqs_class, "send_message") + unwrap(self._sqs_class, "send_message_batch") + unwrap(self._sqs_class, "receive_message") + unwrap(self._sqs_class, "delete_message") + self._did_decorate = False + + def _instrument(self, **kwargs: Dict[str, Any]) -> None: + self._did_decorate: bool = False + self._received_messages_spans: Dict[str, Tuple[Span, Any]] = {} + self._tracer_provider: Optional[TracerProvider] = kwargs.get( + "tracer_provider" + ) + self._tracer: Tracer = trace.get_tracer( + __name__, __version__, self._tracer_provider + ) + self._wrap_client_creation() + self._decorate_sqs() + + def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: + unwrap(boto3, "client") + self._un_decorate_sqs() diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py new file mode 100644 index 0000000000..2e241cc488 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Collection + +_instruments: Collection[str] = ("boto3 >= 1.21.46",) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/version.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/version.py new file mode 100644 index 0000000000..88015aae34 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.30b1" diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 From c0659855455279ef86d0b254c2b14f8e7857c547 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Mon, 2 May 2022 18:24:43 +0300 Subject: [PATCH 02/11] Add basic tests --- .../instrumentation/boto3sqs/__init__.py | 135 +++++++++++------- .../tests/test_boto3sqs_instrumentation.py | 60 ++++++++ .../tests/test_getter.py | 59 ++++++++ .../tests/test_setter.py | 38 +++++ 4 files changed, 238 insertions(+), 54 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py create mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index 272de8e2a9..ee962df521 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -30,10 +30,10 @@ """ import wrapt -import logging import boto3 +import logging import botocore.client -from typing import Dict, Collection, List, Optional, Any, Tuple +from typing import Dict, Collection, List, Optional, Any from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.utils import unwrap from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -64,7 +64,7 @@ def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: def keys(self, carrier: CarrierT) -> List[str]: return [ - key.rstrip(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) + key[len(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER):] if key.startswith(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) else key for key in carrier.keys() @@ -104,6 +104,7 @@ def _enrich_span( if not span.is_recording(): return span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "aws.sqs") + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_name) span.set_attribute( SpanAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.QUEUE.value, @@ -114,7 +115,6 @@ def _enrich_span( ) else: span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) - span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_name) if conversation_id: span.set_attribute( SpanAttributes.MESSAGING_CONVERSATION_ID, conversation_id @@ -122,6 +122,36 @@ def _enrich_span( if message_id: span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message_id) + def _create_processing_span( + self, queue_name: str, receipt_handle: str, message: Dict[str, Any] + ) -> None: + message_attributes = message.get("MessageAttributes", {}) + links = [] + if ctx := propagate.extract( + message_attributes, getter=boto3sqs_getter + ): + for item in ctx.values(): + if hasattr(item, "get_span_context"): + links.append(Link(context=item.get_span_context())) + span = self._tracer.start_span( + name=f"{queue_name} process", + links=links, + kind=SpanKind.CONSUMER, + ) + with trace.use_span(span): + message_id = message.get("MessageId", None) + self._received_messages_spans[receipt_handle] = span + Boto3SQSInstrumentor._enrich_span( + span, + queue_name, + message_id=message_id, + operation=MessagingOperationValues.PROCESS, + ) + + def _safe_end_processing_span(self, receipt_handle: str): + if receipt_handle in self._received_messages_spans: + self._received_messages_spans[receipt_handle].end() + @staticmethod def _extract_queue_name_from_url(queue_url: str) -> str: # A Queue name cannot have the `/` char, therefore we can return the part after the last / @@ -141,16 +171,16 @@ def send_wrapper(wrapped, instance, args, kwargs): name=f"{queue_name} send", kind=SpanKind.PRODUCER, ) - if span.is_recording(): - Boto3SQSInstrumentor._enrich_span(span, queue_name) + Boto3SQSInstrumentor._enrich_span(span, queue_name) with trace.use_span(span, end_on_exit=True): attributes = kwargs.pop("MessageAttributes", {}) propagate.inject(attributes, setter=boto3sqs_setter) retval = wrapped(*args, MessageAttributes=attributes, **kwargs) if message_id := retval.get("MessageId"): - span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, message_id - ) + if span.is_recording(): + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, message_id + ) return retval wrapt.wrap_function_wrapper( @@ -162,7 +192,7 @@ def send_batch_wrapper(wrapped, instance, args, kwargs): queue_url = kwargs.get("QueueUrl") entries = kwargs.get("Entries") # The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the - # origial exception + # original exception if ( context.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or not queue_url @@ -180,10 +210,9 @@ def send_batch_wrapper(wrapped, instance, args, kwargs): kind=SpanKind.PRODUCER, ) ids_to_spans[entry_id] = span - if span.is_recording(): - Boto3SQSInstrumentor._enrich_span( - span, queue_name, conversation_id=entry_id - ) + Boto3SQSInstrumentor._enrich_span( + span, queue_name, conversation_id=entry_id + ) with trace.use_span(span): if "MessageAttributes" not in entry: entry["MessageAttributes"] = {} @@ -194,10 +223,11 @@ def send_batch_wrapper(wrapped, instance, args, kwargs): for successful_messages in retval["Successful"]: message_identifier = successful_messages["Id"] if message_span := ids_to_spans.get(message_identifier): - message_span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, - successful_messages.get("MessageId"), - ) + if message_span.is_recording(): + message_span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + successful_messages.get("MessageId"), + ) for span in ids_to_spans.values(): span.end() return retval @@ -216,45 +246,29 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url( queue_url ) - with self._tracer.start_as_current_span(name=f"{queue_name} receive", end_on_exit=True) as span: + with self._tracer.start_as_current_span( + name=f"{queue_name} receive", + end_on_exit=True, + kind=SpanKind.CONSUMER, + ) as span: Boto3SQSInstrumentor._enrich_span( - span, queue_name + span, + queue_name, + operation=MessagingOperationValues.RECEIVE, ) retval = wrapped( - *args, MessageAttributeNames=message_attribute_names, **kwargs + *args, + MessageAttributeNames=message_attribute_names, + **kwargs, ) messages = retval.get("Messages", []) for message in messages: if not (receipt_handle := message["ReceiptHandle"]): continue - if receipt_handle in self._received_messages_spans: - span, token = self._received_messages_spans[receipt_handle] - context.detach(token) - span.end() - message_attributes = message.get("MessageAttributes", {}) - links = [] - if ctx := propagate.extract( - message_attributes, getter=boto3sqs_getter - ): - for item in ctx.values(): - if hasattr(item, "get_span_context"): - links.append(Link(context=item.get_span_context())) - span = self._tracer.start_span( - name=f"{queue_name} produce", - links=links, - kind=SpanKind.CONSUMER, + self._safe_end_processing_span(receipt_handle) + self._create_processing_span( + queue_name, receipt_handle, message ) - with trace.use_span(span): - new_context = trace.set_span_in_context(span, ctx) - token = context.attach(new_context) - message_id = message.get("MessageId", None) - self._received_messages_spans[receipt_handle] = ( - span, - token, - ) - Boto3SQSInstrumentor._enrich_span( - span, queue_name, message_id=message_id - ) return retval wrapt.wrap_function_wrapper( @@ -264,16 +278,27 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): def _wrap_delete_message(self) -> None: def delete_message_wrapper(wrapped, instance, args, kwargs): if receipt_handle := kwargs.get("ReceiptHandle"): - if receipt_handle in self._received_messages_spans: - span, token = self._received_messages_spans[receipt_handle] - context.detach(token) - span.end() + self._safe_end_processing_span(receipt_handle) return wrapped(*args, **kwargs) wrapt.wrap_function_wrapper( self._sqs_class, "delete_message", delete_message_wrapper ) + def _wrap_delete_message_batch(self) -> None: + def delete_message_wrapper_batch(wrapped, instance, args, kwargs): + entries = kwargs.get("Entries") + for entry in entries: + if receipt_handle := entry.get("ReceiptHandle"): + self._safe_end_processing_span(receipt_handle) + return wrapped(*args, **kwargs) + + wrapt.wrap_function_wrapper( + self._sqs_class, + "delete_message_batch", + delete_message_wrapper_batch, + ) + def _wrap_client_creation(self) -> None: """ Since botocore creates classes on the fly using schemas, the SQS class is not necesraily created upon the call @@ -297,7 +322,7 @@ class and defines SQS to wrap. sqs_class = [ cls for cls in botocore.client.BaseClient.__subclasses__() - if "botocore.client.SQS" in str(cls) + if hasattr(cls, "send_message_batch") ] if sqs_class: self._sqs_class = sqs_class[0] @@ -306,6 +331,7 @@ class and defines SQS to wrap. self._wrap_send_message_batch() self._wrap_receive_message() self._wrap_delete_message() + self._wrap_delete_message_batch() def _un_decorate_sqs(self) -> None: if self._did_decorate: @@ -313,11 +339,12 @@ def _un_decorate_sqs(self) -> None: unwrap(self._sqs_class, "send_message_batch") unwrap(self._sqs_class, "receive_message") unwrap(self._sqs_class, "delete_message") + unwrap(self._sqs_class, "delete_message_batch") self._did_decorate = False def _instrument(self, **kwargs: Dict[str, Any]) -> None: self._did_decorate: bool = False - self._received_messages_spans: Dict[str, Tuple[Span, Any]] = {} + self._received_messages_spans: Dict[str, Span] = {} self._tracer_provider: Optional[TracerProvider] = kwargs.get( "tracer_provider" ) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py new file mode 100644 index 0000000000..e4f2caac09 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py @@ -0,0 +1,60 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import boto3 +import botocore.client +from unittest import TestCase +from wrapt import BoundFunctionWrapper, FunctionWrapper +from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor + + +class TestBoto3SQSInstrumentor(TestCase): + def define_sqs_mock(self) -> None: + class SQSClientMock(botocore.client.BaseClient): + def send_message(self, *args, **kwargs): + ... + + def send_message_batch(self, *args, **kwargs): + ... + + def receive_message(self, *args, **kwargs): + ... + + def delete_message(self, *args, **kwargs): + ... + + def delete_message_batch(self, *args, **kwargs): + ... + + self._boto_sqs_mock = SQSClientMock + + def test_instrument_api_before_client_init(self) -> None: + instrumentation = Boto3SQSInstrumentor() + + instrumentation.instrument() + self.assertTrue(isinstance(boto3.client, FunctionWrapper)) + instrumentation.uninstrument() + + def test_instrument_api_after_client_init(self) -> None: + self.define_sqs_mock() + instrumentation = Boto3SQSInstrumentor() + + instrumentation.instrument() + self.assertTrue(isinstance(boto3.client, FunctionWrapper)) + self.assertTrue(isinstance(self._boto_sqs_mock.send_message, BoundFunctionWrapper)) + self.assertTrue(isinstance(self._boto_sqs_mock.send_message_batch, BoundFunctionWrapper)) + self.assertTrue(isinstance(self._boto_sqs_mock.receive_message, BoundFunctionWrapper)) + self.assertTrue(isinstance(self._boto_sqs_mock.delete_message, BoundFunctionWrapper)) + self.assertTrue(isinstance(self._boto_sqs_mock.delete_message_batch, BoundFunctionWrapper)) + instrumentation.uninstrument() diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py new file mode 100644 index 0000000000..ec1f592c4a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py @@ -0,0 +1,59 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase +from opentelemetry.instrumentation.boto3sqs import ( + Boto3SQSGetter, + OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, +) + + +class TestBoto3SQSGetter(TestCase): + def setUp(self) -> None: + self.getter = Boto3SQSGetter() + + def test_get_none(self) -> None: + carrier = {} + value = self.getter.get(carrier, "test") + self.assertIsNone(value) + + def test_get_value(self) -> None: + key = "test" + value = "value" + carrier = { + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}": { + "StringValue": value, + "DataType": "String", + } + } + val = self.getter.get(carrier, key) + self.assertEqual(val, [value]) + + def test_keys(self): + key1 = "test1" + value1 = "value1" + key2 = "test2" + value2 = "value2" + carrier = { + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key1}": { + "StringValue": value1, + "DataType": "String", + }, + key2: {"StringValue": value2, "DataType": "String"}, + } + keys = self.getter.keys(carrier) + self.assertEqual(keys, [key1, key2]) + + def test_keys_empty(self): + keys = self.getter.keys({}) + self.assertEqual(keys, []) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py new file mode 100644 index 0000000000..4034950c50 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py @@ -0,0 +1,38 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase +from opentelemetry.instrumentation.boto3sqs import ( + Boto3SQSSetter, + OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, +) + + +class TestBoto3SQSSetter(TestCase): + def setUp(self) -> None: + self.setter = Boto3SQSSetter() + + def test_simple(self): + original_key = "SomeHeader" + original_value = {"NumberValue": 1, "DataType": "Number"} + carrier = {original_key: original_value.copy()} + key = "test" + value = "value" + self.setter.set(carrier, key, value) + # Ensure the original value is not harmed + for k, v in carrier[original_key].items(): + self.assertEqual(original_value[k], v) + # Ensure the new key is added well + self.assertIn(f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys()) + new_value = carrier[f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] + self.assertEqual(new_value["StringValue"], value) From 732c57e93d717f5a2baf893a67d5716bf38f57eb Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 3 May 2022 15:17:52 +0300 Subject: [PATCH 03/11] Add context setting list --- .../instrumentation/boto3sqs/__init__.py | 107 ++++++++++++++---- .../tests/test_boto3sqs_instrumentation.py | 28 ++++- .../tests/test_setter.py | 4 +- 3 files changed, 113 insertions(+), 26 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index ee962df521..3dfcefbfee 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -33,7 +33,7 @@ import boto3 import logging import botocore.client -from typing import Dict, Collection, List, Optional, Any +from typing import Dict, Collection, List, Optional, Any, Generator from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.utils import unwrap from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -49,6 +49,8 @@ ) logger = logging.getLogger(__name__) +# We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming +# existing filters OPENTELEMETRY_ATTRIBUTE_IDENTIFIER = "otel." @@ -64,7 +66,7 @@ def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: def keys(self, carrier: CarrierT) -> List[str]: return [ - key[len(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER):] + key[len(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) :] if key.startswith(OPENTELEMETRY_ATTRIBUTE_IDENTIFIER) else key for key in carrier.keys() @@ -73,6 +75,7 @@ def keys(self, carrier: CarrierT) -> List[str]: class Boto3SQSSetter(Setter): def set(self, carrier: CarrierT, key: str, value: str) -> None: + # This is a limitation defined by AWS for SQS MessageAttributes size if len(carrier.items()) < 10: carrier[f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] = { "StringValue": value, @@ -90,6 +93,51 @@ def set(self, carrier: CarrierT, key: str, value: str) -> None: class Boto3SQSInstrumentor(BaseInstrumentor): + received_messages_spans: Dict[str, Span] = {} + current_span_related_to_token: Span = None + current_context_token = None + + class ContextableList(list): + """ + Since the classic way to process SQS messages is using a `for` loop, without a well defined scope like a + callback - we are doing something similar to the instrumentaiton of Kafka-python and instrumenting the + `__iter__` functions and the `__getitem__` functions to set the span context of the addressed message. Since + the return value from an `SQS.ReceiveMessage` returns a builtin list, we cannot wrap it and change all of the + calls for `list.__iter__` and `list.__getitem__` - therefore we use ContextableList. It is bound to the + received_messages_spans dict + """ + + def __getitem__( + self, *args: List[Any], **kwargs: Dict[str, Any] + ) -> Any: + retval = super( + Boto3SQSInstrumentor.ContextableList, self + ).__getitem__(*args, **kwargs) + if isinstance(retval, dict): + if receipt_handle := retval.get("ReceiptHandle", None): + if started_span := Boto3SQSInstrumentor.received_messages_spans.get( + receipt_handle, None + ): + if Boto3SQSInstrumentor.current_context_token: + context.detach( + Boto3SQSInstrumentor.current_context_token + ) + Boto3SQSInstrumentor.current_context_token = ( + context.attach( + trace.set_span_in_context(started_span) + ) + ) + Boto3SQSInstrumentor.current_span_related_to_token = ( + started_span + ) + return retval + + def __iter__(self) -> Generator: + i = 0 + while i < len(self): + yield self[i] + i = i + 1 + def instrumentation_dependencies(self) -> Collection[str]: return _instruments @@ -122,6 +170,24 @@ def _enrich_span( if message_id: span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message_id) + @staticmethod + def _safe_end_processing_span(receipt_handle: str) -> None: + if started_span := Boto3SQSInstrumentor.received_messages_spans.pop( + receipt_handle, None + ): + if ( + Boto3SQSInstrumentor.current_span_related_to_token + == started_span + ): + context.detach(Boto3SQSInstrumentor.current_context_token) + Boto3SQSInstrumentor.current_context_token = None + started_span.end() + + @staticmethod + def _extract_queue_name_from_url(queue_url: str) -> str: + # A Queue name cannot have the `/` char, therefore we can return the part after the last / + return queue_url.split("/")[-1] + def _create_processing_span( self, queue_name: str, receipt_handle: str, message: Dict[str, Any] ) -> None: @@ -140,7 +206,7 @@ def _create_processing_span( ) with trace.use_span(span): message_id = message.get("MessageId", None) - self._received_messages_spans[receipt_handle] = span + Boto3SQSInstrumentor.received_messages_spans[receipt_handle] = span Boto3SQSInstrumentor._enrich_span( span, queue_name, @@ -148,15 +214,6 @@ def _create_processing_span( operation=MessagingOperationValues.PROCESS, ) - def _safe_end_processing_span(self, receipt_handle: str): - if receipt_handle in self._received_messages_spans: - self._received_messages_spans[receipt_handle].end() - - @staticmethod - def _extract_queue_name_from_url(queue_url: str) -> str: - # A Queue name cannot have the `/` char, therefore we can return the part after the last / - return queue_url.split("/")[-1] - def _wrap_send_message(self) -> None: def send_wrapper(wrapped, instance, args, kwargs): if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): @@ -261,14 +318,22 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): MessageAttributeNames=message_attribute_names, **kwargs, ) - messages = retval.get("Messages", []) + if not (messages := retval.get("Messages", [])): + return retval for message in messages: - if not (receipt_handle := message["ReceiptHandle"]): + if not ( + receipt_handle := message.get("ReceiptHandle", None) + ): continue - self._safe_end_processing_span(receipt_handle) + Boto3SQSInstrumentor._safe_end_processing_span( + receipt_handle + ) self._create_processing_span( queue_name, receipt_handle, message ) + retval["Messages"] = Boto3SQSInstrumentor.ContextableList( + messages + ) return retval wrapt.wrap_function_wrapper( @@ -278,7 +343,7 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): def _wrap_delete_message(self) -> None: def delete_message_wrapper(wrapped, instance, args, kwargs): if receipt_handle := kwargs.get("ReceiptHandle"): - self._safe_end_processing_span(receipt_handle) + Boto3SQSInstrumentor._safe_end_processing_span(receipt_handle) return wrapped(*args, **kwargs) wrapt.wrap_function_wrapper( @@ -289,8 +354,10 @@ def _wrap_delete_message_batch(self) -> None: def delete_message_wrapper_batch(wrapped, instance, args, kwargs): entries = kwargs.get("Entries") for entry in entries: - if receipt_handle := entry.get("ReceiptHandle"): - self._safe_end_processing_span(receipt_handle) + if receipt_handle := entry.get("ReceiptHandle", None): + Boto3SQSInstrumentor._safe_end_processing_span( + receipt_handle + ) return wrapped(*args, **kwargs) wrapt.wrap_function_wrapper( @@ -317,8 +384,9 @@ def client_wrapper(wrapped, instance, args, kwargs): def _decorate_sqs(self) -> None: """ Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base - class and defines SQS to wrap. + class and is SQS to wrap. """ + # We define SQS client as the only client that implements send_message_batch sqs_class = [ cls for cls in botocore.client.BaseClient.__subclasses__() @@ -344,7 +412,6 @@ def _un_decorate_sqs(self) -> None: def _instrument(self, **kwargs: Dict[str, Any]) -> None: self._did_decorate: bool = False - self._received_messages_spans: Dict[str, Span] = {} self._tracer_provider: Optional[TracerProvider] = kwargs.get( "tracer_provider" ) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py index e4f2caac09..ea6021bd63 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py @@ -52,9 +52,27 @@ def test_instrument_api_after_client_init(self) -> None: instrumentation.instrument() self.assertTrue(isinstance(boto3.client, FunctionWrapper)) - self.assertTrue(isinstance(self._boto_sqs_mock.send_message, BoundFunctionWrapper)) - self.assertTrue(isinstance(self._boto_sqs_mock.send_message_batch, BoundFunctionWrapper)) - self.assertTrue(isinstance(self._boto_sqs_mock.receive_message, BoundFunctionWrapper)) - self.assertTrue(isinstance(self._boto_sqs_mock.delete_message, BoundFunctionWrapper)) - self.assertTrue(isinstance(self._boto_sqs_mock.delete_message_batch, BoundFunctionWrapper)) + self.assertTrue( + isinstance(self._boto_sqs_mock.send_message, BoundFunctionWrapper) + ) + self.assertTrue( + isinstance( + self._boto_sqs_mock.send_message_batch, BoundFunctionWrapper + ) + ) + self.assertTrue( + isinstance( + self._boto_sqs_mock.receive_message, BoundFunctionWrapper + ) + ) + self.assertTrue( + isinstance( + self._boto_sqs_mock.delete_message, BoundFunctionWrapper + ) + ) + self.assertTrue( + isinstance( + self._boto_sqs_mock.delete_message_batch, BoundFunctionWrapper + ) + ) instrumentation.uninstrument() diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py index 4034950c50..5785147a4a 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py @@ -33,6 +33,8 @@ def test_simple(self): for k, v in carrier[original_key].items(): self.assertEqual(original_value[k], v) # Ensure the new key is added well - self.assertIn(f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys()) + self.assertIn( + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys() + ) new_value = carrier[f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] self.assertEqual(new_value["StringValue"], value) From e8c28ccf42390772c0b7e48c88566a883f212350 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 3 May 2022 16:44:15 +0300 Subject: [PATCH 04/11] Fix linting --- CHANGELOG.md | 3 ++ .../instrumentation/boto3sqs/__init__.py | 35 +++++++++++-------- .../instrumentation/boto3sqs/package.py | 2 +- .../tests/test_boto3sqs_instrumentation.py | 6 +++- .../tests/test_getter.py | 3 +- .../tests/test_setter.py | 7 ++-- 6 files changed, 35 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c892e6af8..f07cb084cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrument` and `opentelemetry-bootstrap` now include a `--version` flag ([#1065](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1065)) +- `opentelemetry-instrumentation-boto3sqs` added AWS's SQS instrumentation. + ([#1081](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1081)) + ## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21 diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index 3dfcefbfee..28baeefbc4 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -29,29 +29,33 @@ Boto3SQSInstrumentor().instrument() """ -import wrapt -import boto3 import logging +from typing import Any, Collection, Dict, Generator, List, Optional + +import boto3 import botocore.client -from typing import Dict, Collection, List, Optional, Any, Generator +import wrapt + from opentelemetry import context, propagate, trace -from opentelemetry.instrumentation.utils import unwrap -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.instrumentation.boto3sqs.version import __version__ from opentelemetry.instrumentation.boto3sqs.package import _instruments -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.propagators.textmap import Getter, Setter, CarrierT -from opentelemetry.trace import SpanKind, Tracer, Span, TracerProvider, Link +from opentelemetry.instrumentation.boto3sqs.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import ( + _SUPPRESS_INSTRUMENTATION_KEY, + unwrap, +) +from opentelemetry.propagators.textmap import CarrierT, Getter, Setter from opentelemetry.semconv.trace import ( + MessagingDestinationKindValues, MessagingOperationValues, SpanAttributes, - MessagingDestinationKindValues, ) +from opentelemetry.trace import Link, Span, SpanKind, Tracer, TracerProvider logger = logging.getLogger(__name__) # We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming # existing filters -OPENTELEMETRY_ATTRIBUTE_IDENTIFIER = "otel." +OPENTELEMETRY_ATTRIBUTE_IDENTIFIER: str = "otel." class Boto3SQSGetter(Getter): @@ -92,6 +96,7 @@ def set(self, carrier: CarrierT, key: str, value: str) -> None: boto3sqs_setter = Boto3SQSSetter() +# pylint: disable=attribute-defined-outside-init class Boto3SQSInstrumentor(BaseInstrumentor): received_messages_spans: Dict[str, Span] = {} current_span_related_to_token: Span = None @@ -133,10 +138,10 @@ def __getitem__( return retval def __iter__(self) -> Generator: - i = 0 - while i < len(self): - yield self[i] - i = i + 1 + index = 0 + while index < len(self): + yield self[index] + index = index + 1 def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py index 2e241cc488..981ab4c44c 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py @@ -13,4 +13,4 @@ # limitations under the License. from typing import Collection -_instruments: Collection[str] = ("boto3 >= 1.21.46",) +_instruments: Collection[str] = ("boto~=2.0",) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py index ea6021bd63..5e79feadf2 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py @@ -12,15 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from unittest import TestCase + import boto3 import botocore.client -from unittest import TestCase from wrapt import BoundFunctionWrapper, FunctionWrapper + from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor +# pylint: disable=attribute-defined-outside-init class TestBoto3SQSInstrumentor(TestCase): def define_sqs_mock(self) -> None: + # pylint: disable=R0201 class SQSClientMock(botocore.client.BaseClient): def send_message(self, *args, **kwargs): ... diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py index ec1f592c4a..41c53fd408 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. from unittest import TestCase + from opentelemetry.instrumentation.boto3sqs import ( - Boto3SQSGetter, OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, + Boto3SQSGetter, ) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py index 5785147a4a..7a971b118b 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. from unittest import TestCase + from opentelemetry.instrumentation.boto3sqs import ( - Boto3SQSSetter, OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, + Boto3SQSSetter, ) @@ -30,8 +31,8 @@ def test_simple(self): value = "value" self.setter.set(carrier, key, value) # Ensure the original value is not harmed - for k, v in carrier[original_key].items(): - self.assertEqual(original_value[k], v) + for dict_key, dict_val in carrier[original_key].items(): + self.assertEqual(original_value[dict_key], dict_val) # Ensure the new key is added well self.assertIn( f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys() From 3afb803b76990bf8c370f2bee8b67c82aa3f1d06 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 3 May 2022 17:31:05 +0300 Subject: [PATCH 05/11] CR and lint fixes --- .../README.rst | 2 +- .../instrumentation/boto3sqs/__init__.py | 33 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst b/instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst index 06901901f9..7013643620 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/README.rst @@ -1,5 +1,5 @@ OpenTelemetry Boto3 SQS Instrumentation -=========================== +======================================= |pypi| diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index 28baeefbc4..bcbb349f9c 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -118,23 +118,22 @@ def __getitem__( retval = super( Boto3SQSInstrumentor.ContextableList, self ).__getitem__(*args, **kwargs) - if isinstance(retval, dict): - if receipt_handle := retval.get("ReceiptHandle", None): - if started_span := Boto3SQSInstrumentor.received_messages_spans.get( - receipt_handle, None - ): - if Boto3SQSInstrumentor.current_context_token: - context.detach( - Boto3SQSInstrumentor.current_context_token - ) - Boto3SQSInstrumentor.current_context_token = ( - context.attach( - trace.set_span_in_context(started_span) - ) - ) - Boto3SQSInstrumentor.current_span_related_to_token = ( - started_span - ) + if not isinstance(retval, dict): + return retval + if not (receipt_handle := retval.get("ReceiptHandle", None)): + return retval + if not ( + started_span := Boto3SQSInstrumentor.received_messages_spans.get( + receipt_handle, None + ) + ): + return retval + if Boto3SQSInstrumentor.current_context_token: + context.detach(Boto3SQSInstrumentor.current_context_token) + Boto3SQSInstrumentor.current_context_token = context.attach( + trace.set_span_in_context(started_span) + ) + Boto3SQSInstrumentor.current_span_related_to_token = started_span return retval def __iter__(self) -> Generator: From 080ae0cf45d4af82bf16f71dd7d23d9588f3dc78 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 3 May 2022 17:32:02 +0300 Subject: [PATCH 06/11] Add newline --- .../opentelemetry-instrumentation-boto3sqs/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg index 4ed71bae67..25d38a0292 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.cfg @@ -55,4 +55,4 @@ where = src [options.entry_points] opentelemetry_instrumentor = - boto3sqs = opentelemetry.instrumentation.boto3sqs:Boto3SQSInstrumentation \ No newline at end of file + boto3sqs = opentelemetry.instrumentation.boto3sqs:Boto3SQSInstrumentation From 81a3a2c8b1634c8b6f18846c8bd503fdec553dc6 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 3 May 2022 18:04:10 +0300 Subject: [PATCH 07/11] Run tox generate --- instrumentation/README.md | 1 + .../setup.py | 77 ++++++++++++++++--- .../setup.cfg | 1 + .../instrumentation/bootstrap_gen.py | 4 + 4 files changed, 73 insertions(+), 10 deletions(-) diff --git a/instrumentation/README.md b/instrumentation/README.md index 20caa144bb..9c85974c23 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -7,6 +7,7 @@ | [opentelemetry-instrumentation-asyncpg](./opentelemetry-instrumentation-asyncpg) | asyncpg >= 0.12.0 | | [opentelemetry-instrumentation-aws-lambda](./opentelemetry-instrumentation-aws-lambda) | aws_lambda | | [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 | +| [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto~=2.0 | | [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | | [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | | [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py index 6faafd8dd4..ee667abc81 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/setup.py @@ -11,14 +11,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + +# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. +# RUN `python scripts/generate_setup.py` TO REGENERATE. + + +import distutils.cmd +import json import os +from configparser import ConfigParser import setuptools +config = ConfigParser() +config.read("setup.cfg") + +# We provide extras_require parameter to setuptools.setup later which +# overwrites the extras_require section from setup.cfg. To support extras_require +# section in setup.cfg, we load it here and merge it with the extras_require param. +extras_require = {} +if "options.extras_require" in config: + for key, value in config["options.extras_require"].items(): + extras_require[key] = [v for v in value.split("\n") if v.strip()] + BASE_DIR = os.path.dirname(__file__) +PACKAGE_INFO = {} + VERSION_FILENAME = os.path.join( - # REPLACE ME: the path to the version file e.g - # BASE_DIR, "src", "opentelemetry", "instrumentation", "sqlalchemy", "version.py" BASE_DIR, "src", "opentelemetry", @@ -26,17 +46,54 @@ "boto3sqs", "version.py", ) -PACKAGE_INFO = {} with open(VERSION_FILENAME, encoding="utf-8") as f: exec(f.read(), PACKAGE_INFO) +PACKAGE_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "boto3sqs", + "package.py", +) +with open(PACKAGE_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +# Mark any instruments/runtime dependencies as test dependencies as well. +extras_require["instruments"] = PACKAGE_INFO["_instruments"] +test_deps = extras_require.get("test", []) +for dep in extras_require["instruments"]: + test_deps.append(dep) + +extras_require["test"] = test_deps + + +class JSONMetadataCommand(distutils.cmd.Command): + + description = ( + "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", + "auto-generate code in other places", + ) + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + metadata = { + "name": config["metadata"]["name"], + "version": PACKAGE_INFO["__version__"], + "instruments": PACKAGE_INFO["_instruments"], + } + print(json.dumps(metadata)) + + setuptools.setup( + cmdclass={"meta": JSONMetadataCommand}, version=PACKAGE_INFO["__version__"], - entry_points={ - "opentelemetry_instrumentor": [ - # boto3sqs: the entrypoint for the instrumentor e.g - # "sqlalchemy = opentelemetry.instrumentation.sqlalchemy:SQLAlchemyInstrumentor" - "boto3sqs = opentelemetry.instrumentation.Boto3SQSInstrumentor" - ] - }, + extras_require=extras_require, ) diff --git a/opentelemetry-contrib-instrumentations/setup.cfg b/opentelemetry-contrib-instrumentations/setup.cfg index f921cfc2e3..44ad235035 100644 --- a/opentelemetry-contrib-instrumentations/setup.cfg +++ b/opentelemetry-contrib-instrumentations/setup.cfg @@ -34,6 +34,7 @@ install_requires = opentelemetry-instrumentation-asyncpg==0.30b1 opentelemetry-instrumentation-aws-lambda==0.30b1 opentelemetry-instrumentation-boto==0.30b1 + opentelemetry-instrumentation-boto3sqs==0.30b1 opentelemetry-instrumentation-botocore==0.30b1 opentelemetry-instrumentation-celery==0.30b1 opentelemetry-instrumentation-dbapi==0.30b1 diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 45dbb772b3..69f28e8d0e 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -36,6 +36,10 @@ "library": "boto~=2.0", "instrumentation": "opentelemetry-instrumentation-boto==0.30b1", }, + "boto": { + "library": "boto~=2.0", + "instrumentation": "opentelemetry-instrumentation-boto3sqs==0.30b1", + }, "botocore": { "library": "botocore ~= 1.0", "instrumentation": "opentelemetry-instrumentation-botocore==0.30b1", From f4ab349ea672daa33f23bc07b3fa28af0ee4da25 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 3 May 2022 19:00:02 +0300 Subject: [PATCH 08/11] Change the dependency version --- instrumentation/README.md | 2 +- .../src/opentelemetry/instrumentation/boto3sqs/package.py | 2 +- .../src/opentelemetry/instrumentation/bootstrap_gen.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/README.md b/instrumentation/README.md index 9c85974c23..e4347f03ac 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -7,7 +7,7 @@ | [opentelemetry-instrumentation-asyncpg](./opentelemetry-instrumentation-asyncpg) | asyncpg >= 0.12.0 | | [opentelemetry-instrumentation-aws-lambda](./opentelemetry-instrumentation-aws-lambda) | aws_lambda | | [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 | -| [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto~=2.0 | +| [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | | [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | | [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | | [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py index 981ab4c44c..1b7b62254a 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/package.py @@ -13,4 +13,4 @@ # limitations under the License. from typing import Collection -_instruments: Collection[str] = ("boto~=2.0",) +_instruments: Collection[str] = ("boto3 ~= 1.0",) diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 69f28e8d0e..23e2338a88 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -36,8 +36,8 @@ "library": "boto~=2.0", "instrumentation": "opentelemetry-instrumentation-boto==0.30b1", }, - "boto": { - "library": "boto~=2.0", + "boto3": { + "library": "boto3 ~= 1.0", "instrumentation": "opentelemetry-instrumentation-boto3sqs==0.30b1", }, "botocore": { From 5a9eaa97bb23b8ce77d7722e7b0fd48b627305a9 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 3 May 2022 19:36:14 +0300 Subject: [PATCH 09/11] Fix linting --- .../src/opentelemetry/instrumentation/boto3sqs/__init__.py | 6 +++--- .../tests/test_boto3sqs_instrumentation.py | 2 ++ .../tests/test_getter.py | 3 +++ .../tests/test_setter.py | 3 +++ 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index bcbb349f9c..1d4fa36bd6 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -28,7 +28,6 @@ Boto3SQSInstrumentor().instrument() """ - import logging from typing import Any, Collection, Dict, Generator, List, Optional @@ -37,8 +36,6 @@ import wrapt from opentelemetry import context, propagate, trace -from opentelemetry.instrumentation.boto3sqs.package import _instruments -from opentelemetry.instrumentation.boto3sqs.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -52,6 +49,9 @@ ) from opentelemetry.trace import Link, Span, SpanKind, Tracer, TracerProvider +from .package import _instruments +from .version import __version__ + logger = logging.getLogger(__name__) # We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming # existing filters diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py index 5e79feadf2..c6be9212ac 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=no-name-in-module + from unittest import TestCase import boto3 diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py index 41c53fd408..4310e9b0b2 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +# pylint: disable=no-name-in-module + from unittest import TestCase from opentelemetry.instrumentation.boto3sqs import ( diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py index 7a971b118b..32e20deb32 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +# pylint: disable=no-name-in-module + from unittest import TestCase from opentelemetry.instrumentation.boto3sqs import ( From 5d2995ae0cce185040bec97abaa3858e4ef43a20 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Mon, 16 May 2022 12:32:15 +0200 Subject: [PATCH 10/11] PR fixes. Remove walrus operator and use the `start_as_current_span` --- .../instrumentation/boto3sqs/__init__.py | 66 +++++++++-------- .../tests/test_boto3sqs_instrumentation.py | 70 ++++++++++++++++++- .../tests/test_getter.py | 63 ----------------- .../tests/test_setter.py | 44 ------------ 4 files changed, 101 insertions(+), 142 deletions(-) delete mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py delete mode 100644 instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index 1d4fa36bd6..c92d8fc63a 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -60,11 +60,8 @@ class Boto3SQSGetter(Getter): def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: - if not ( - value := carrier.get( - f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", {} - ) - ): + value = carrier.get(f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", {}) + if not value: return None return [value.get("StringValue", None)] @@ -120,13 +117,13 @@ def __getitem__( ).__getitem__(*args, **kwargs) if not isinstance(retval, dict): return retval - if not (receipt_handle := retval.get("ReceiptHandle", None)): + receipt_handle = retval.get("ReceiptHandle", None) + if not receipt_handle: return retval - if not ( - started_span := Boto3SQSInstrumentor.received_messages_spans.get( - receipt_handle, None - ) - ): + started_span = Boto3SQSInstrumentor.received_messages_spans.get( + receipt_handle, None + ) + if not started_span: return retval if Boto3SQSInstrumentor.current_context_token: context.detach(Boto3SQSInstrumentor.current_context_token) @@ -176,9 +173,10 @@ def _enrich_span( @staticmethod def _safe_end_processing_span(receipt_handle: str) -> None: - if started_span := Boto3SQSInstrumentor.received_messages_spans.pop( + started_span = Boto3SQSInstrumentor.received_messages_spans.pop( receipt_handle, None - ): + ) + if started_span: if ( Boto3SQSInstrumentor.current_span_related_to_token == started_span @@ -197,18 +195,14 @@ def _create_processing_span( ) -> None: message_attributes = message.get("MessageAttributes", {}) links = [] - if ctx := propagate.extract( - message_attributes, getter=boto3sqs_getter - ): + ctx = propagate.extract(message_attributes, getter=boto3sqs_getter) + if ctx: for item in ctx.values(): if hasattr(item, "get_span_context"): links.append(Link(context=item.get_span_context())) - span = self._tracer.start_span( - name=f"{queue_name} process", - links=links, - kind=SpanKind.CONSUMER, - ) - with trace.use_span(span): + with self._tracer.start_as_current_span( + name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER + ) as span: message_id = message.get("MessageId", None) Boto3SQSInstrumentor.received_messages_spans[receipt_handle] = span Boto3SQSInstrumentor._enrich_span( @@ -228,16 +222,17 @@ def send_wrapper(wrapped, instance, args, kwargs): queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url( queue_url ) - span = self._tracer.start_span( + with self._tracer.start_as_current_span( name=f"{queue_name} send", kind=SpanKind.PRODUCER, - ) - Boto3SQSInstrumentor._enrich_span(span, queue_name) - with trace.use_span(span, end_on_exit=True): + end_on_exit=True, + ) as span: + Boto3SQSInstrumentor._enrich_span(span, queue_name) attributes = kwargs.pop("MessageAttributes", {}) propagate.inject(attributes, setter=boto3sqs_setter) retval = wrapped(*args, MessageAttributes=attributes, **kwargs) - if message_id := retval.get("MessageId"): + message_id = retval.get("MessageId") + if message_id: if span.is_recording(): span.set_attribute( SpanAttributes.MESSAGING_MESSAGE_ID, message_id @@ -283,7 +278,8 @@ def send_batch_wrapper(wrapped, instance, args, kwargs): retval = wrapped(*args, **kwargs) for successful_messages in retval["Successful"]: message_identifier = successful_messages["Id"] - if message_span := ids_to_spans.get(message_identifier): + message_span = ids_to_spans.get(message_identifier) + if message_span: if message_span.is_recording(): message_span.set_attribute( SpanAttributes.MESSAGING_MESSAGE_ID, @@ -322,12 +318,12 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): MessageAttributeNames=message_attribute_names, **kwargs, ) - if not (messages := retval.get("Messages", [])): + messages = retval.get("Messages", []) + if not messages: return retval for message in messages: - if not ( - receipt_handle := message.get("ReceiptHandle", None) - ): + receipt_handle = message.get("ReceiptHandle", None) + if not receipt_handle: continue Boto3SQSInstrumentor._safe_end_processing_span( receipt_handle @@ -346,7 +342,8 @@ def receive_message_wrapper(wrapped, instance, args, kwargs): def _wrap_delete_message(self) -> None: def delete_message_wrapper(wrapped, instance, args, kwargs): - if receipt_handle := kwargs.get("ReceiptHandle"): + receipt_handle = kwargs.get("ReceiptHandle") + if receipt_handle: Boto3SQSInstrumentor._safe_end_processing_span(receipt_handle) return wrapped(*args, **kwargs) @@ -358,7 +355,8 @@ def _wrap_delete_message_batch(self) -> None: def delete_message_wrapper_batch(wrapped, instance, args, kwargs): entries = kwargs.get("Entries") for entry in entries: - if receipt_handle := entry.get("ReceiptHandle", None): + receipt_handle = entry.get("ReceiptHandle", None) + if receipt_handle: Boto3SQSInstrumentor._safe_end_processing_span( receipt_handle ) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py index c6be9212ac..30788b78ee 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_boto3sqs_instrumentation.py @@ -20,7 +20,12 @@ import botocore.client from wrapt import BoundFunctionWrapper, FunctionWrapper -from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor +from opentelemetry.instrumentation.boto3sqs import ( + OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, + Boto3SQSGetter, + Boto3SQSInstrumentor, + Boto3SQSSetter, +) # pylint: disable=attribute-defined-outside-init @@ -82,3 +87,66 @@ def test_instrument_api_after_client_init(self) -> None: ) ) instrumentation.uninstrument() + + +class TestBoto3SQSGetter(TestCase): + def setUp(self) -> None: + self.getter = Boto3SQSGetter() + + def test_get_none(self) -> None: + carrier = {} + value = self.getter.get(carrier, "test") + self.assertIsNone(value) + + def test_get_value(self) -> None: + key = "test" + value = "value" + carrier = { + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}": { + "StringValue": value, + "DataType": "String", + } + } + val = self.getter.get(carrier, key) + self.assertEqual(val, [value]) + + def test_keys(self): + key1 = "test1" + value1 = "value1" + key2 = "test2" + value2 = "value2" + carrier = { + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key1}": { + "StringValue": value1, + "DataType": "String", + }, + key2: {"StringValue": value2, "DataType": "String"}, + } + keys = self.getter.keys(carrier) + self.assertEqual(keys, [key1, key2]) + + def test_keys_empty(self): + keys = self.getter.keys({}) + self.assertEqual(keys, []) + + +class TestBoto3SQSSetter(TestCase): + def setUp(self) -> None: + self.setter = Boto3SQSSetter() + + def test_simple(self): + original_key = "SomeHeader" + original_value = {"NumberValue": 1, "DataType": "Number"} + carrier = {original_key: original_value.copy()} + key = "test" + value = "value" + self.setter.set(carrier, key, value) + # Ensure the original value is not harmed + for dict_key, dict_val in carrier[original_key].items(): + self.assertEqual(original_value[dict_key], dict_val) + # Ensure the new key is added well + self.assertIn( + f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys() + ) + new_value = carrier[f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] + self.assertEqual(new_value["StringValue"], value) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py deleted file mode 100644 index 4310e9b0b2..0000000000 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_getter.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint: disable=no-name-in-module - -from unittest import TestCase - -from opentelemetry.instrumentation.boto3sqs import ( - OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, - Boto3SQSGetter, -) - - -class TestBoto3SQSGetter(TestCase): - def setUp(self) -> None: - self.getter = Boto3SQSGetter() - - def test_get_none(self) -> None: - carrier = {} - value = self.getter.get(carrier, "test") - self.assertIsNone(value) - - def test_get_value(self) -> None: - key = "test" - value = "value" - carrier = { - f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}": { - "StringValue": value, - "DataType": "String", - } - } - val = self.getter.get(carrier, key) - self.assertEqual(val, [value]) - - def test_keys(self): - key1 = "test1" - value1 = "value1" - key2 = "test2" - value2 = "value2" - carrier = { - f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key1}": { - "StringValue": value1, - "DataType": "String", - }, - key2: {"StringValue": value2, "DataType": "String"}, - } - keys = self.getter.keys(carrier) - self.assertEqual(keys, [key1, key2]) - - def test_keys_empty(self): - keys = self.getter.keys({}) - self.assertEqual(keys, []) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py deleted file mode 100644 index 32e20deb32..0000000000 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/tests/test_setter.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint: disable=no-name-in-module - -from unittest import TestCase - -from opentelemetry.instrumentation.boto3sqs import ( - OPENTELEMETRY_ATTRIBUTE_IDENTIFIER, - Boto3SQSSetter, -) - - -class TestBoto3SQSSetter(TestCase): - def setUp(self) -> None: - self.setter = Boto3SQSSetter() - - def test_simple(self): - original_key = "SomeHeader" - original_value = {"NumberValue": 1, "DataType": "Number"} - carrier = {original_key: original_value.copy()} - key = "test" - value = "value" - self.setter.set(carrier, key, value) - # Ensure the original value is not harmed - for dict_key, dict_val in carrier[original_key].items(): - self.assertEqual(original_value[dict_key], dict_val) - # Ensure the new key is added well - self.assertIn( - f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys() - ) - new_value = carrier[f"{OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] - self.assertEqual(new_value["StringValue"], value) From e7a66833c389bb205782204c42ba66fca3eb402d Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Thu, 19 May 2022 10:29:34 +0200 Subject: [PATCH 11/11] Run lint and generate --- .../src/opentelemetry/instrumentation/boto3sqs/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py index 5a48531d76..49909088b2 100644 --- a/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-boto3sqs/src/opentelemetry/instrumentation/boto3sqs/__init__.py @@ -199,7 +199,9 @@ def _create_processing_span( for item in ctx.values(): if hasattr(item, "get_span_context"): links.append(Link(context=item.get_span_context())) - span = self._tracer.start_span(name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER) + span = self._tracer.start_span( + name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER + ) with trace.use_span(span): message_id = message.get("MessageId") Boto3SQSInstrumentor.received_messages_spans[receipt_handle] = span