diff --git a/CHANGELOG.md b/CHANGELOG.md index 95da07b6d7..939d528f95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#781](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/781)) - `opentelemetry-instrumentation-aws-lambda` Add instrumentation for AWS Lambda Service - Implementation (Part 2/2) ([#777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/777)) +- `opentelemetry-instrumentation-pymongo` Add `request_hook`, `response_hook` and `failed_hook` callbacks passed as arguments to the instrument method + ([#793](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/793)) - `opentelemetry-instrumentation-pymysql` Add support for PyMySQL 1.x series ([#792](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/792)) diff --git a/instrumentation/opentelemetry-instrumentation-pymongo/src/opentelemetry/instrumentation/pymongo/__init__.py b/instrumentation/opentelemetry-instrumentation-pymongo/src/opentelemetry/instrumentation/pymongo/__init__.py index a89747936d..1f3df6e76b 100644 --- a/instrumentation/opentelemetry-instrumentation-pymongo/src/opentelemetry/instrumentation/pymongo/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pymongo/src/opentelemetry/instrumentation/pymongo/__init__.py @@ -26,7 +26,6 @@ from pymongo import MongoClient from opentelemetry.instrumentation.pymongo import PymongoInstrumentor - PymongoInstrumentor().instrument() client = MongoClient() db = client["MongoDB_Database"] @@ -35,9 +34,47 @@ API --- -""" +The `instrument` method accepts the following keyword args: + +tracer_provider (TracerProvider) - an optional tracer provider +request_hook (Callable) - +a function with extra user-defined logic to be performed before querying mongodb +this function signature is: def request_hook(span: Span, event: CommandStartedEvent) -> None +response_hook (Callable) - +a function with extra user-defined logic to be performed after the query returns with a successful response +this function signature is: def response_hook(span: Span, event: CommandSucceededEvent) -> None +failed_hook (Callable) - +a function with extra user-defined logic to be performed after the query returns with a failed response +this function signature is: def failed_hook(span: Span, event: CommandFailedEvent) -> None + +for example: + +.. code: python + + from opentelemetry.instrumentation.pymongo import PymongoInstrumentor + from pymongo import MongoClient + + def request_hook(span, event): + # request hook logic -from typing import Collection + def response_hook(span, event): + # response hook logic + + def failed_hook(span, event): + # failed hook logic + + # Instrument pymongo with hooks + PymongoInstrumentor().instrument(request_hook=request_hook, response_hooks=response_hook, failed_hook=failed_hook) + + # This will create a span with pymongo specific attributes, including custom attributes added from the hooks + client = MongoClient() + db = client["MongoDB_Database"] + collection = db["MongoDB_Collection"] + collection.find_one() + +""" +from logging import getLogger +from typing import Callable, Collection from pymongo import monitoring @@ -48,14 +85,34 @@ from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes from opentelemetry.trace import SpanKind, get_tracer +from opentelemetry.trace.span import Span from opentelemetry.trace.status import Status, StatusCode +_LOG = getLogger(__name__) + +RequestHookT = Callable[[Span, monitoring.CommandStartedEvent], None] +ResponseHookT = Callable[[Span, monitoring.CommandSucceededEvent], None] +FailedHookT = Callable[[Span, monitoring.CommandFailedEvent], None] + + +def dummy_callback(span, event): + ... + class CommandTracer(monitoring.CommandListener): - def __init__(self, tracer): + def __init__( + self, + tracer, + request_hook: RequestHookT = dummy_callback, + response_hook: ResponseHookT = dummy_callback, + failed_hook: FailedHookT = dummy_callback, + ): self._tracer = tracer self._span_dict = {} self.is_enabled = True + self.start_hook = request_hook + self.success_hook = response_hook + self.failed_hook = failed_hook def started(self, event: monitoring.CommandStartedEvent): """ Method to handle a pymongo CommandStartedEvent """ @@ -85,6 +142,10 @@ def started(self, event: monitoring.CommandStartedEvent): span.set_attribute( SpanAttributes.NET_PEER_PORT, event.connection_id[1] ) + try: + self.start_hook(span, event) + except Exception as hook_exception: # noqa pylint: disable=broad-except + _LOG.exception(hook_exception) # Add Span to dictionary self._span_dict[_get_span_dict_key(event)] = span @@ -103,6 +164,11 @@ def succeeded(self, event: monitoring.CommandSucceededEvent): span = self._pop_span(event) if span is None: return + if span.is_recording(): + try: + self.success_hook(span, event) + except Exception as hook_exception: # noqa pylint: disable=broad-except + _LOG.exception(hook_exception) span.end() def failed(self, event: monitoring.CommandFailedEvent): @@ -116,6 +182,10 @@ def failed(self, event: monitoring.CommandFailedEvent): return if span.is_recording(): span.set_status(Status(StatusCode.ERROR, event.failure)) + try: + self.failed_hook(span, event) + except Exception as hook_exception: # noqa pylint: disable=broad-except + _LOG.exception(hook_exception) span.end() def _pop_span(self, event): @@ -150,12 +220,20 @@ def _instrument(self, **kwargs): """ tracer_provider = kwargs.get("tracer_provider") + request_hook = kwargs.get("request_hook", dummy_callback) + response_hook = kwargs.get("response_hook", dummy_callback) + failed_hook = kwargs.get("failed_hook", dummy_callback) # Create and register a CommandTracer only the first time if self._commandtracer_instance is None: tracer = get_tracer(__name__, __version__, tracer_provider) - self._commandtracer_instance = CommandTracer(tracer) + self._commandtracer_instance = CommandTracer( + tracer, + request_hook=request_hook, + response_hook=response_hook, + failed_hook=failed_hook, + ) monitoring.register(self._commandtracer_instance) # If already created, just enable it diff --git a/instrumentation/opentelemetry-instrumentation-pymongo/tests/test_pymongo.py b/instrumentation/opentelemetry-instrumentation-pymongo/tests/test_pymongo.py index ea6f9cc3d0..f51f58e6b3 100644 --- a/instrumentation/opentelemetry-instrumentation-pymongo/tests/test_pymongo.py +++ b/instrumentation/opentelemetry-instrumentation-pymongo/tests/test_pymongo.py @@ -29,6 +29,9 @@ class TestPymongo(TestBase): def setUp(self): super().setUp() self.tracer = self.tracer_provider.get_tracer(__name__) + self.start_callback = mock.MagicMock() + self.success_callback = mock.MagicMock() + self.failed_callback = mock.MagicMock() def test_pymongo_instrumentor(self): mock_register = mock.Mock() @@ -44,7 +47,9 @@ def test_started(self): command_attrs = { "command_name": "find", } - command_tracer = CommandTracer(self.tracer) + command_tracer = CommandTracer( + self.tracer, request_hook=self.start_callback + ) mock_event = MockEvent( command_attrs, ("test.com", "1234"), "test_request_id" ) @@ -66,10 +71,15 @@ def test_started(self): span.attributes[SpanAttributes.NET_PEER_NAME], "test.com" ) self.assertEqual(span.attributes[SpanAttributes.NET_PEER_PORT], "1234") + self.start_callback.assert_called_once_with(span, mock_event) def test_succeeded(self): mock_event = MockEvent({}) - command_tracer = CommandTracer(self.tracer) + command_tracer = CommandTracer( + self.tracer, + request_hook=self.start_callback, + response_hook=self.success_callback, + ) command_tracer.started(event=mock_event) command_tracer.succeeded(event=mock_event) spans_list = self.memory_exporter.get_finished_spans() @@ -77,6 +87,8 @@ def test_succeeded(self): span = spans_list[0] self.assertIs(span.status.status_code, trace_api.StatusCode.UNSET) self.assertIsNotNone(span.end_time) + self.start_callback.assert_called_once() + self.success_callback.assert_called_once() def test_not_recording(self): mock_tracer = mock.Mock() @@ -119,7 +131,11 @@ def test_suppression_key(self): def test_failed(self): mock_event = MockEvent({}) - command_tracer = CommandTracer(self.tracer) + command_tracer = CommandTracer( + self.tracer, + request_hook=self.start_callback, + failed_hook=self.failed_callback, + ) command_tracer.started(event=mock_event) command_tracer.failed(event=mock_event) @@ -132,6 +148,8 @@ def test_failed(self): ) self.assertEqual(span.status.description, "failure") self.assertIsNotNone(span.end_time) + self.start_callback.assert_called_once() + self.failed_callback.assert_called_once() def test_multiple_commands(self): first_mock_event = MockEvent({}, ("firstUrl", "123"), "first")