Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add pika instrumentation #680

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6acb1c5
Added initial code
oxeye-nikolay Sep 13, 2021
b561657
Add all needed spans, and add support of instrumentation and uninstru…
oxeye-nikolay Sep 13, 2021
315b438
Added tests. Ready for PR
oxeye-nikolay Sep 14, 2021
d73b88f
Rename RequestsInstrumentation to RequestsInstrumentor to follow conv…
oxeye-nikolay Sep 19, 2021
15bf875
Add suppress_instrumentation functionality
oxeye-nikolay Sep 19, 2021
12a4746
Fix suppress_instrumentation functionality
oxeye-nikolay Sep 19, 2021
fedd768
Fix CR comments and lint test failures
oxeye-nikolay Sep 19, 2021
e37b5fa
Add usage of wrapt according to CR comments
oxeye-nikolay Sep 20, 2021
a7f118a
Fix according to CR Comments
oxeye-nikolay Sep 22, 2021
76368f2
Move the tracer to be an attribute of the instrumentor instead of the…
oxeye-nikolay Sep 22, 2021
7339ecb
Fix Tests
oxeye-nikolay Sep 22, 2021
c062b0f
Merge branch 'main' into feature/add_pika_instrumentation
lzchen Sep 22, 2021
adad454
Update Changelog and fix failing test
oxeye-nikolay Sep 23, 2021
b59f385
Merge remote-tracking branch 'origin/feature/add_pika_instrumentation…
oxeye-nikolay Sep 23, 2021
da90f11
update code using tox -e generate
oxeye-nikolay Sep 29, 2021
a0e4049
Merge branch 'main' into feature/add_pika_instrumentation
ocelotl Sep 29, 2021
51dc2c3
Update the name of the variable to store the tracer provider.
oxeye-nikolay Oct 6, 2021
1d68aab
Merge branch 'main' into feature/add_pika_instrumentation
owais Oct 6, 2021
d59b5fd
Update the core repo hash in the workflow
oxeye-nikolay Oct 6, 2021
dd0525c
Update the core repo hash in the workflow
oxeye-nikolay Oct 6, 2021
d308ad8
Merge branch 'main' into feature/add_pika_instrumentation
lzchen Oct 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-sdk-extension-aws` Add AWS resource detectors to extension package
([#586](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/586))
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-aiohttp-client`, `openetelemetry-instrumentation-fastapi`,
`opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3` Added `request_hook` and `response_hook` callbacks
`opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3`,
`opentelemetry-instrumentation-pika` Added `request_hook` and `response_hook` callbacks
owais marked this conversation as resolved.
Show resolved Hide resolved
([#576](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/576))

### Changed
Expand Down
70 changes: 70 additions & 0 deletions instrumentation/opentelemetry-instrumentation-pika/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
OpenTelemetry pika Instrumentation
==================================

|pypi|

.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-pika.svg
:target: https://pypi.org/project/opentelemetry-instrumentation-pika/

This library allows tracing requests made by the pika library.

Installation
------------

::

pip install opentelemetry-instrumentation-pika

Usage
-----

* Start broker backend

.. code-block:: python

docker run -p 5672:5672 rabbitmq

* Run instrumented task

.. code-block:: python

import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor

PikaInstrumentor().instrument()

connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')

* PikaInstrumentor also supports instrumentation of a single channel

.. code-block:: python

import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor

connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=channel)


channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')

pika_instrumentation.uninstrument_channel(channel=channel)

* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider

.. code-block:: python

PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)

References
----------

* `OpenTelemetry pika/ Tracing <https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/pika/pika.html>`_
* `OpenTelemetry Project <https://opentelemetry.io/>`_
57 changes: 57 additions & 0 deletions instrumentation/opentelemetry-instrumentation-pika/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[metadata]
name = opentelemetry-instrumentation-pika
description = OpenTelemetry pika instrumentation
long_description = file: README.rst
long_description_content_type = text/x-rst
author = OpenTelemetry Authors
author_email = cncf-opentelemetry-contributors@lists.cncf.io
url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-pika
platforms = any
license = Apache-2.0
classifiers =
Development Status :: 4 - Beta
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8

[options]
python_requires = >=3.6
package_dir=
=src
packages=find_namespace:

install_requires =
opentelemetry-api ~= 1.5
wrapt >= 1.0.0, < 2.0.0
pika >= 1.1.0
lzchen marked this conversation as resolved.
Show resolved Hide resolved

[options.extras_require]
test =
pytest
wrapt >= 1.0.0, < 2.0.0
opentelemetry-test == 0.24b0

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
pika = opentelemetry.instrumentation.pika:PikaInstrumentor
33 changes: 33 additions & 0 deletions instrumentation/opentelemetry-instrumentation-pika/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import setuptools

BASE_DIR = os.path.dirname(__file__)
VERSION_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py",
)
PACKAGE_INFO = {}
with open(VERSION_FILENAME) as f:
exec(f.read(), PACKAGE_INFO)

setuptools.setup(
version=PACKAGE_INFO["__version__"],
entry_points={
"opentelemetry_instrumentor": [
"pika = opentelemetry.instrumentation.pika:PikaInstrumentor"
]
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `pika` to trace RabbitMQ applications.

Usage
-----

* Start broker backend

.. code-block:: python

docker run -p 5672:5672 rabbitmq

* Run instrumented task

.. code-block:: python

import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor

PikaInstrumentor().instrument()

connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')

* PikaInstrumentor also supports instrumentation of a single channel

.. code-block:: python

import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor

connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=channel)


channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')

pika_instrumentation.uninstrument_channel(channel=channel)

* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider

.. code-block:: python

PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)

API
---
"""

# pylint: disable=unused-argument
from .pika_instrumentor import PikaInstrumentor
from .version import __version__
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Collection

_instruments: Collection[str] = ("pika >= 1.1.0",)
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging import getLogger
from typing import Any, Callable, Collection, Dict, Optional

import wrapt
from pika.adapters import BlockingConnection
from pika.channel import Channel

from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pika import utils
from opentelemetry.instrumentation.pika.package import _instruments
from opentelemetry.instrumentation.pika.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import Tracer, TracerProvider

_LOG = getLogger(__name__)
_CTX_KEY = "__otel_task_span"

_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"]


class PikaInstrumentor(BaseInstrumentor): # type: ignore
# pylint: disable=attribute-defined-outside-init
@staticmethod
def _instrument_consumers(
consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer
) -> Any:
for key, callback in consumers_dict.items():
decorated_callback = utils._decorate_callback(
callback, tracer, key
)
setattr(decorated_callback, "_original_callback", callback)
consumers_dict[key] = decorated_callback

@staticmethod
def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None:
original_function = getattr(channel, "basic_publish")
decorated_function = utils._decorate_basic_publish(
original_function, channel, tracer
)
setattr(decorated_function, "_original_function", original_function)
channel.__setattr__("basic_publish", decorated_function)
channel.basic_publish = decorated_function

@staticmethod
def _instrument_channel_functions(
channel: Channel, tracer: Tracer
) -> None:
if hasattr(channel, "basic_publish"):
PikaInstrumentor._instrument_basic_publish(channel, tracer)

@staticmethod
def _uninstrument_channel_functions(channel: Channel) -> None:
for function_name in _FUNCTIONS_TO_UNINSTRUMENT:
if not hasattr(channel, function_name):
continue
function = getattr(channel, function_name)
if hasattr(function, "_original_function"):
channel.__setattr__(function_name, function._original_function)

@staticmethod
def instrument_channel(
channel: Channel, tracer_provider: Optional[TracerProvider] = None,
) -> None:
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
lzchen marked this conversation as resolved.
Show resolved Hide resolved
channel.__setattr__("__opentelemetry_tracer", tracer)
if not hasattr(channel, "_impl"):
_LOG.error("Could not find implementation for provided channel!")
return
if channel._impl._consumers:
PikaInstrumentor._instrument_consumers(
channel._impl._consumers, tracer
)
PikaInstrumentor._instrument_channel_functions(channel, tracer)

@staticmethod
def uninstrument_channel(channel: Channel) -> None:
if not hasattr(channel, "_impl"):
_LOG.error("Could not find implementation for provided channel!")
return
for key, callback in channel._impl._consumers.items():
if hasattr(callback, "_original_callback"):
channel._impl._consumers[key] = callback._original_callback
PikaInstrumentor._uninstrument_channel_functions(channel)
if hasattr(channel, "__opentelemetry_tracer"):
delattr(channel, "__opentelemetry_tracer")

def _decorate_channel_function(
self, tracer_provider: Optional[TracerProvider]
) -> None:
def wrapper(wrapped, instance, args, kwargs):
channel = wrapped(*args, **kwargs)
self.instrument_channel(channel, tracer_provider=tracer_provider)
owais marked this conversation as resolved.
Show resolved Hide resolved
return channel

wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
self._decorate_channel_function(tracer_provider)

def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
unwrap(BlockingConnection, "channel")

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Loading