From 827a1ce5080941909dfdbd82797efdcfe1944514 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Fri, 11 Dec 2020 16:25:01 +0100 Subject: [PATCH 1/5] pika: instrument Channel.basic_publish --- instana/__init__.py | 1 + instana/agent/host.py | 2 +- instana/instrumentation/pika.py | 59 ++++++++++++++++++ instana/span.py | 2 +- tests/clients/test_pika.py | 102 ++++++++++++++++++++++++++++++++ tests/requirements.txt | 3 +- 6 files changed, 166 insertions(+), 3 deletions(-) create mode 100644 instana/instrumentation/pika.py create mode 100644 tests/clients/test_pika.py diff --git a/instana/__init__.py b/instana/__init__.py index d0f9732a..39673179 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -157,6 +157,7 @@ def boot_agent(): from .instrumentation.tornado import client from .instrumentation.tornado import server from .instrumentation import logging + from .instrumentation import pika from .instrumentation import pymysql from .instrumentation import psycopg2 from .instrumentation import redis diff --git a/instana/agent/host.py b/instana/agent/host.py index 39c078f4..214bb6e4 100644 --- a/instana/agent/host.py +++ b/instana/agent/host.py @@ -253,7 +253,7 @@ def report_data_payload(self, payload): data=to_json(payload['profiles']), headers={"Content-Type": "application/json"}, timeout=0.8) - + if response is not None and 200 <= response.status_code <= 204: self.last_seen = datetime.now() diff --git a/instana/instrumentation/pika.py b/instana/instrumentation/pika.py new file mode 100644 index 00000000..60301a6e --- /dev/null +++ b/instana/instrumentation/pika.py @@ -0,0 +1,59 @@ +# coding: utf-8 + +from __future__ import absolute_import + +import wrapt +import opentracing +import pdb + +from ..log import logger +from ..singletons import tracer + +try: + import pika + + def _bind_publish_args(exchange, routing_key, body, properties=None, *args, **kwargs): + return (exchange, routing_key, body, properties, args, kwargs) + + def basic_publish_with_instana(wrapped, instance, args, kwargs): + parent_span = tracer.active_span + + if parent_span is None: + return wrapped(*args, **kwargs) + + (exchange, routing_key, body, properties, args, kwargs) = (_bind_publish_args(*args, **kwargs)) + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + try: + conn = instance.connection.params + + scope.span.set_tag("address", "%s:%d" % (conn.host, conn.port)) + scope.span.set_tag("sort", "publish") + scope.span.set_tag("exchange", exchange) + scope.span.set_tag("key", routing_key) + except: + logger.debug("basic_publish_with_instana: ", exc_info=True) + + # context propagation + properties = properties or pika.BasicProperties() + properties.headers = properties.headers or {} + + tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, properties.headers) + args = (exchange, routing_key, body, properties) + args + + try: + rv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + def _bind_get_args(queue, callback, *args, **kwargs): + return (queue, callback, args, kwargs) + + wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_publish', basic_publish_with_instana) + + logger.debug("Instrumenting pika") +except ImportError: + pass diff --git a/instana/span.py b/instana/span.py index d1b72ae2..05f1ab13 100644 --- a/instana/span.py +++ b/instana/span.py @@ -110,7 +110,7 @@ def __init__(self, span, source, service_name, **kwargs): self.sy = span.synthetic self.__dict__.update(kwargs) - + def _validate_tags(self, tags): """ This method will loop through a set of tags to validate each key and value. diff --git a/tests/clients/test_pika.py b/tests/clients/test_pika.py new file mode 100644 index 00000000..b54386aa --- /dev/null +++ b/tests/clients/test_pika.py @@ -0,0 +1,102 @@ +from __future__ import absolute_import + +import os +import pika +import unittest +import mock + +from ..helpers import testenv +from instana.singletons import tracer + +class TestPika(unittest.TestCase): + @staticmethod + @mock.patch('pika.connection.Connection') + def _create_connection(connection_class_mock=None): + return connection_class_mock() + + def setUp(self): + self.recorder = tracer.recorder + self.recorder.clear_spans() + + self.connection = self._create_connection() + self._on_openok_callback = mock.Mock() + self.obj = pika.channel.Channel(self.connection, 1, + self._on_openok_callback) + + def tearDown(self): + del self.connection + del self._on_openok_callback + del self.obj + + @mock.patch('pika.spec.Basic.Publish') + @mock.patch('pika.channel.Channel._send_method') + def test_Channel_basic_publish(self, send_method, _unused): + self.obj._set_state(self.obj.OPEN) + + with tracer.start_active_span("testing"): + self.obj.basic_publish("test.exchange", "test.queue", "Hello!") + + spans = self.recorder.queued_spans() + self.assertEqual(2, len(spans)) + + rabbitmq_span = spans[0] + test_span = spans[1] + + self.assertIsNone(tracer.active_span) + + # Same traceId + self.assertEqual(test_span.t, rabbitmq_span.t) + + # Parent relationships + self.assertEqual(rabbitmq_span.p, test_span.s) + + # Error logging + self.assertIsNone(test_span.ec) + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertEqual("test.exchange", rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual('publish', rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + send_method.assert_called_once_with( + pika.spec.Basic.Publish( + exchange="test.exchange", + routing_key="test.queue"), (pika.BasicProperties(headers={ + "X-Instana-T": rabbitmq_span.t, + "X-Instana-S": rabbitmq_span.s, + "X-Instana-L": "1" + }), b"Hello!")) + + @mock.patch('pika.spec.Basic.Publish') + @mock.patch('pika.channel.Channel._send_method') + def test_Channel_basic_publish_with_headers(self, send_method, _unused): + self.obj._set_state(self.obj.OPEN) + + with tracer.start_active_span("testing"): + self.obj.basic_publish("test.exchange", + "test.queue", + "Hello!", + pika.BasicProperties(headers={ + "X-Custom-1": "test" + })) + + spans = self.recorder.queued_spans() + self.assertEqual(2, len(spans)) + + rabbitmq_span = spans[0] + test_span = spans[1] + + send_method.assert_called_once_with( + pika.spec.Basic.Publish( + exchange="test.exchange", + routing_key="test.queue"), (pika.BasicProperties(headers={ + "X-Custom-1": "test", + "X-Instana-T": rabbitmq_span.t, + "X-Instana-S": rabbitmq_span.s, + "X-Instana-L": "1" + }), b"Hello!")) diff --git a/tests/requirements.txt b/tests/requirements.txt index 3e016f44..b01a6f86 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -17,6 +17,7 @@ nose>=1.0 PyMySQL[rsa]>=0.9.1 pyOpenSSL>=16.1.0;python_version<="2.7" psycopg2>=2.7.1 +pika>=1.1.0 pymongo>=3.7.0 pyramid>=1.2 pytest>=4.6 @@ -28,4 +29,4 @@ spyne>=2.9,<=2.12.14 suds-jurko>=0.6 tornado>=4.5.3,<6.0 uvicorn>=0.12.2;python_version>="3.6" -urllib3[secure]!=1.25.0,!=1.25.1,<1.26,>=1.21.1 \ No newline at end of file +urllib3[secure]!=1.25.0,!=1.25.1,<1.26,>=1.21.1 From 6694e6c1d67354fe248242a896d14d9f01398390 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Tue, 15 Dec 2020 21:04:02 +0100 Subject: [PATCH 2/5] pika: instrument Channel.basic_get --- instana/instrumentation/pika.py | 29 ++++++++++++- tests/clients/test_pika.py | 75 +++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/instana/instrumentation/pika.py b/instana/instrumentation/pika.py index 60301a6e..c963be3b 100644 --- a/instana/instrumentation/pika.py +++ b/instana/instrumentation/pika.py @@ -49,10 +49,37 @@ def basic_publish_with_instana(wrapped, instance, args, kwargs): else: return rv - def _bind_get_args(queue, callback, *args, **kwargs): + def _bind_get_args(queue, callback, *args, **kwargs): return (queue, callback, args, kwargs) + def basic_get_with_instana(wrapped, instance, args, kwargs): + (queue, callback, args, kwargs) = (_bind_get_args(*args, **kwargs)) + + def _cb_wrapper(channel, method, properties, body): + parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) + conn = channel.connection.params + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + try: + conn = instance.connection.params + + scope.span.set_tag("address", "%s:%d" % (conn.host, conn.port)) + scope.span.set_tag("sort", "consume") + scope.span.set_tag("key", queue) + except: + logger.debug("basic_get_with_instana: ", exc_info=True) + + try: + callback(channel, method, properties, body) + except Exception as e: + scope.span.log_exception(e) + raise + + args = (queue, _cb_wrapper) + args + return wrapped(*args, **kwargs) + wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_publish', basic_publish_with_instana) + wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_get', basic_get_with_instana) logger.debug("Instrumenting pika") except ImportError: diff --git a/tests/clients/test_pika.py b/tests/clients/test_pika.py index b54386aa..fe11e174 100644 --- a/tests/clients/test_pika.py +++ b/tests/clients/test_pika.py @@ -100,3 +100,78 @@ def test_Channel_basic_publish_with_headers(self, send_method, _unused): "X-Instana-S": rabbitmq_span.s, "X-Instana-L": "1" }), b"Hello!")) + + @mock.patch('pika.spec.Basic.Get') + def test_Channel_basic_get(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.spec.BasicProperties() + + method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_get("test.queue", cb) + self.obj._on_getok(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.t) + self.assertIsNone(rabbitmq_span.p) + self.assertIsNotNone(rabbitmq_span.s) + + # Error logging + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + cb.assert_called_once_with(self.obj, pika.spec.Basic.GetOk, properties, body) + + @mock.patch('pika.spec.Basic.Get') + def test_Channel_basic_get_with_trace_context(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.spec.BasicProperties(headers={ + "X-Instana-T": "0000000000000001", + "X-Instana-S": "0000000000000002", + "X-Instana-L": "1" + }) + + method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_get("test.queue", cb) + self.obj._on_getok(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # Trace context propagation + self.assertEqual("0000000000000001", rabbitmq_span.t) + self.assertEqual("0000000000000002", rabbitmq_span.p) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.s) + self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) From ccda25fe9e88067c72c7cb6ddf17ca8d6ed9934c Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Thu, 17 Dec 2020 16:15:54 +0100 Subject: [PATCH 3/5] pika: instrument Channel.basic_consume --- instana/instrumentation/pika.py | 63 +++++++++++++++++---------- tests/clients/test_pika.py | 75 +++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 23 deletions(-) diff --git a/instana/instrumentation/pika.py b/instana/instrumentation/pika.py index c963be3b..c410c53a 100644 --- a/instana/instrumentation/pika.py +++ b/instana/instrumentation/pika.py @@ -12,27 +12,30 @@ try: import pika - def _bind_publish_args(exchange, routing_key, body, properties=None, *args, **kwargs): - return (exchange, routing_key, body, properties, args, kwargs) + def _extract_broker_tags(span, sort, conn, queue_or_routing_key, exchange=None): + try: + span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) + span.set_tag("sort", sort) + span.set_tag("key", queue_or_routing_key) + + if exchange is not None: + span.set_tag("exchange", exchange) + except: + logger.debug("_extract_consumer_tags: ", exc_info=True) def basic_publish_with_instana(wrapped, instance, args, kwargs): + def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs): + return (exchange, routing_key, body, properties, args, kwargs) + parent_span = tracer.active_span if parent_span is None: return wrapped(*args, **kwargs) - (exchange, routing_key, body, properties, args, kwargs) = (_bind_publish_args(*args, **kwargs)) + (exchange, routing_key, body, properties, args, kwargs) = (_bind_args(*args, **kwargs)) with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: - try: - conn = instance.connection.params - - scope.span.set_tag("address", "%s:%d" % (conn.host, conn.port)) - scope.span.set_tag("sort", "publish") - scope.span.set_tag("exchange", exchange) - scope.span.set_tag("key", routing_key) - except: - logger.debug("basic_publish_with_instana: ", exc_info=True) + _extract_broker_tags(scope.span, "publish", instance.connection, routing_key, exchange=exchange) # context propagation properties = properties or pika.BasicProperties() @@ -49,25 +52,38 @@ def basic_publish_with_instana(wrapped, instance, args, kwargs): else: return rv - def _bind_get_args(queue, callback, *args, **kwargs): - return (queue, callback, args, kwargs) - def basic_get_with_instana(wrapped, instance, args, kwargs): - (queue, callback, args, kwargs) = (_bind_get_args(*args, **kwargs)) + def _bind_args(queue, callback, *args, **kwargs): + return (queue, callback, args, kwargs) + + (queue, callback, args, kwargs) = (_bind_args(*args, **kwargs)) def _cb_wrapper(channel, method, properties, body): parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) - conn = channel.connection.params with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + _extract_broker_tags(scope.span, "consume", instance.connection, queue) + try: - conn = instance.connection.params + callback(channel, method, properties, body) + except Exception as e: + scope.span.log_exception(e) + raise + + args = (queue, _cb_wrapper) + args + return wrapped(*args, **kwargs) + + def basic_consume_with_instana(wrapped, instance, args, kwargs): + def _bind_args(queue, on_consume_callback, *args, **kwargs): + return (queue, on_consume_callback, args, kwargs) - scope.span.set_tag("address", "%s:%d" % (conn.host, conn.port)) - scope.span.set_tag("sort", "consume") - scope.span.set_tag("key", queue) - except: - logger.debug("basic_get_with_instana: ", exc_info=True) + (queue, on_consume_callback, args, kwargs) = (_bind_args(*args, **kwargs)) + + def _cb_wrapper(channel, method, properies, body): + parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + _extract_broker_tags(scope.span, "consume", instance.connection, queue) try: callback(channel, method, properties, body) @@ -80,6 +96,7 @@ def _cb_wrapper(channel, method, properties, body): wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_publish', basic_publish_with_instana) wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_get', basic_get_with_instana) + wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_consume', basic_get_with_instana) logger.debug("Instrumenting pika") except ImportError: diff --git a/tests/clients/test_pika.py b/tests/clients/test_pika.py index fe11e174..e706fb4a 100644 --- a/tests/clients/test_pika.py +++ b/tests/clients/test_pika.py @@ -175,3 +175,78 @@ def test_Channel_basic_get_with_trace_context(self, _unused): # A new span has been started self.assertIsNotNone(rabbitmq_span.s) self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + + @mock.patch('pika.spec.Basic.Consume') + def test_Channel_basic_consume(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.spec.BasicProperties() + + method_frame = pika.frame.Method(1, pika.spec.Basic.Deliver(consumer_tag="test")) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_consume("test.queue", cb, consumer_tag="test") + self.obj._on_deliver(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.t) + self.assertIsNone(rabbitmq_span.p) + self.assertIsNotNone(rabbitmq_span.s) + + # Error logging + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + cb.assert_called_once_with(self.obj, method_frame.method, properties, body) + + @mock.patch('pika.spec.Basic.Consume') + def test_Channel_basic_consume_with_trace_context(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.spec.BasicProperties(headers={ + "X-Instana-T": "0000000000000001", + "X-Instana-S": "0000000000000002", + "X-Instana-L": "1" + }) + + method_frame = pika.frame.Method(1, pika.spec.Basic.Deliver(consumer_tag="test")) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_consume("test.queue", cb, consumer_tag="test") + self.obj._on_deliver(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # Trace context propagation + self.assertEqual("0000000000000001", rabbitmq_span.t) + self.assertEqual("0000000000000002", rabbitmq_span.p) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.s) + self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) From 60222a64e88e7372560fe0d5a4746325eb66153c Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Fri, 18 Dec 2020 19:34:05 +0100 Subject: [PATCH 4/5] pika: instrument BlockingChannel.consume --- instana/instrumentation/pika.py | 98 ++++++++++++++++++--- tests/clients/test_pika.py | 151 ++++++++++++++++++++++++++++---- tests/requirements.txt | 2 +- 3 files changed, 220 insertions(+), 31 deletions(-) diff --git a/instana/instrumentation/pika.py b/instana/instrumentation/pika.py index c410c53a..f4897bf3 100644 --- a/instana/instrumentation/pika.py +++ b/instana/instrumentation/pika.py @@ -4,7 +4,7 @@ import wrapt import opentracing -import pdb +import types from ..log import logger from ..singletons import tracer @@ -12,16 +12,22 @@ try: import pika - def _extract_broker_tags(span, sort, conn, queue_or_routing_key, exchange=None): - try: - span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) - span.set_tag("sort", sort) - span.set_tag("key", queue_or_routing_key) + def _extract_broker_tags(span, conn): + span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) - if exchange is not None: - span.set_tag("exchange", exchange) - except: - logger.debug("_extract_consumer_tags: ", exc_info=True) + def _extract_publisher_tags(span, conn, exchange, routing_key): + _extract_broker_tags(span, conn) + + span.set_tag("sort", "publish") + span.set_tag("key", routing_key) + span.set_tag("exchange", exchange) + + def _extract_consumer_tags(span, conn, queue): + _extract_broker_tags(span, conn) + + span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) + span.set_tag("sort", "consume") + span.set_tag("queue", queue) def basic_publish_with_instana(wrapped, instance, args, kwargs): def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs): @@ -35,7 +41,13 @@ def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs): (exchange, routing_key, body, properties, args, kwargs) = (_bind_args(*args, **kwargs)) with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: - _extract_broker_tags(scope.span, "publish", instance.connection, routing_key, exchange=exchange) + try: + _extract_publisher_tags(scope.span, + conn=instance.connection, + routing_key=routing_key, + exchange=exchange) + except: + logger.debug("publish_with_instana: ", exc_info=True) # context propagation properties = properties or pika.BasicProperties() @@ -62,7 +74,12 @@ def _cb_wrapper(channel, method, properties, body): parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: - _extract_broker_tags(scope.span, "consume", instance.connection, queue) + try: + _extract_consumer_tags(scope.span, + conn=instance.connection, + queue=queue) + except: + logger.debug("basic_get_with_instana: ", exc_info=True) try: callback(channel, method, properties, body) @@ -83,7 +100,12 @@ def _cb_wrapper(channel, method, properies, body): parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: - _extract_broker_tags(scope.span, "consume", instance.connection, queue) + try: + _extract_consumer_tags(scope.span, + conn=instance.connection, + queue=queue) + except: + logger.debug("basic_consume_with_instana: ", exc_info=True) try: callback(channel, method, properties, body) @@ -94,9 +116,59 @@ def _cb_wrapper(channel, method, properies, body): args = (queue, _cb_wrapper) + args return wrapped(*args, **kwargs) + def consume_with_instana(wrapped, instance, args, kwargs): + def _bind_args(queue, *args, **kwargs): + return (queue, args, kwargs) + + (queue, args, kwargs) = (_bind_args(*args, **kwargs)) + + def _consume(gen): + for yilded in gen: + # Bypass the delivery created due to inactivity timeout + if yilded is None or not any(yilded): + yield yilded + continue + + (method_frame, properties, body) = yilded + + parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + try: + _extract_consumer_tags(scope.span, + conn=instance.connection._impl, + queue=queue) + except: + logger.debug("consume_with_instana: ", exc_info=True) + + try: + yield yilded + except Exception as e: + scope.span.log_exception(e) + raise + + args = (queue,) + args + res = wrapped(*args, **kwargs) + + if isinstance(res, types.GeneratorType): + return _consume(res) + else: + return res + + @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.__init__') + def _BlockingChannel___init__(wrapped, instance, args, kwargs): + ret = wrapped(*args, **kwargs) + impl = getattr(instance, '_impl', None) + + if impl and hasattr(impl.basic_consume, '__wrapped__'): + impl.basic_consume = impl.basic_consume.__wrapped__ + + return ret + wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_publish', basic_publish_with_instana) wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_get', basic_get_with_instana) wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_consume', basic_get_with_instana) + wrapt.wrap_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.basic_consume', basic_get_with_instana) + wrapt.wrap_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.consume', consume_with_instana) logger.debug("Instrumenting pika") except ImportError: diff --git a/tests/clients/test_pika.py b/tests/clients/test_pika.py index e706fb4a..8d56adcb 100644 --- a/tests/clients/test_pika.py +++ b/tests/clients/test_pika.py @@ -4,33 +4,41 @@ import pika import unittest import mock +import threading +import time from ..helpers import testenv from instana.singletons import tracer -class TestPika(unittest.TestCase): +class _TestPika(unittest.TestCase): @staticmethod @mock.patch('pika.connection.Connection') def _create_connection(connection_class_mock=None): return connection_class_mock() + def _create_obj(self): + raise NotImplementedError() + def setUp(self): self.recorder = tracer.recorder self.recorder.clear_spans() self.connection = self._create_connection() self._on_openok_callback = mock.Mock() - self.obj = pika.channel.Channel(self.connection, 1, - self._on_openok_callback) + self.obj = self._create_obj() def tearDown(self): del self.connection del self._on_openok_callback del self.obj +class TestPikaChannel(_TestPika): + def _create_obj(self): + return pika.channel.Channel(self.connection, 1, self._on_openok_callback) + @mock.patch('pika.spec.Basic.Publish') @mock.patch('pika.channel.Channel._send_method') - def test_Channel_basic_publish(self, send_method, _unused): + def test_basic_publish(self, send_method, _unused): self.obj._set_state(self.obj.OPEN) with tracer.start_active_span("testing"): @@ -66,7 +74,7 @@ def test_Channel_basic_publish(self, send_method, _unused): send_method.assert_called_once_with( pika.spec.Basic.Publish( exchange="test.exchange", - routing_key="test.queue"), (pika.BasicProperties(headers={ + routing_key="test.queue"), (pika.spec.BasicProperties(headers={ "X-Instana-T": rabbitmq_span.t, "X-Instana-S": rabbitmq_span.s, "X-Instana-L": "1" @@ -74,7 +82,7 @@ def test_Channel_basic_publish(self, send_method, _unused): @mock.patch('pika.spec.Basic.Publish') @mock.patch('pika.channel.Channel._send_method') - def test_Channel_basic_publish_with_headers(self, send_method, _unused): + def test_basic_publish_with_headers(self, send_method, _unused): self.obj._set_state(self.obj.OPEN) with tracer.start_active_span("testing"): @@ -94,7 +102,7 @@ def test_Channel_basic_publish_with_headers(self, send_method, _unused): send_method.assert_called_once_with( pika.spec.Basic.Publish( exchange="test.exchange", - routing_key="test.queue"), (pika.BasicProperties(headers={ + routing_key="test.queue"), (pika.spec.BasicProperties(headers={ "X-Custom-1": "test", "X-Instana-T": rabbitmq_span.t, "X-Instana-S": rabbitmq_span.s, @@ -102,11 +110,11 @@ def test_Channel_basic_publish_with_headers(self, send_method, _unused): }), b"Hello!")) @mock.patch('pika.spec.Basic.Get') - def test_Channel_basic_get(self, _unused): + def test_basic_get(self, _unused): self.obj._set_state(self.obj.OPEN) body = "Hello!" - properties = pika.spec.BasicProperties() + properties = pika.BasicProperties() method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) header_frame = pika.frame.Header(1, len(body), properties) @@ -135,7 +143,7 @@ def test_Channel_basic_get(self, _unused): self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) self.assertIsNotNone(rabbitmq_span.stack) self.assertTrue(type(rabbitmq_span.stack) is list) self.assertGreater(len(rabbitmq_span.stack), 0) @@ -143,11 +151,11 @@ def test_Channel_basic_get(self, _unused): cb.assert_called_once_with(self.obj, pika.spec.Basic.GetOk, properties, body) @mock.patch('pika.spec.Basic.Get') - def test_Channel_basic_get_with_trace_context(self, _unused): + def test_basic_get_with_trace_context(self, _unused): self.obj._set_state(self.obj.OPEN) body = "Hello!" - properties = pika.spec.BasicProperties(headers={ + properties = pika.BasicProperties(headers={ "X-Instana-T": "0000000000000001", "X-Instana-S": "0000000000000002", "X-Instana-L": "1" @@ -177,11 +185,11 @@ def test_Channel_basic_get_with_trace_context(self, _unused): self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) @mock.patch('pika.spec.Basic.Consume') - def test_Channel_basic_consume(self, _unused): + def test_basic_consume(self, _unused): self.obj._set_state(self.obj.OPEN) body = "Hello!" - properties = pika.spec.BasicProperties() + properties = pika.BasicProperties() method_frame = pika.frame.Method(1, pika.spec.Basic.Deliver(consumer_tag="test")) header_frame = pika.frame.Header(1, len(body), properties) @@ -210,7 +218,7 @@ def test_Channel_basic_consume(self, _unused): self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) - self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) self.assertIsNotNone(rabbitmq_span.stack) self.assertTrue(type(rabbitmq_span.stack) is list) self.assertGreater(len(rabbitmq_span.stack), 0) @@ -218,11 +226,11 @@ def test_Channel_basic_consume(self, _unused): cb.assert_called_once_with(self.obj, method_frame.method, properties, body) @mock.patch('pika.spec.Basic.Consume') - def test_Channel_basic_consume_with_trace_context(self, _unused): + def test_basic_consume_with_trace_context(self, _unused): self.obj._set_state(self.obj.OPEN) body = "Hello!" - properties = pika.spec.BasicProperties(headers={ + properties = pika.BasicProperties(headers={ "X-Instana-T": "0000000000000001", "X-Instana-S": "0000000000000002", "X-Instana-L": "1" @@ -250,3 +258,112 @@ def test_Channel_basic_consume_with_trace_context(self, _unused): # A new span has been started self.assertIsNotNone(rabbitmq_span.s) self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + +class TestPikaBlockingChannel(_TestPika): + @mock.patch('pika.channel.Channel', spec=pika.channel.Channel) + def _create_obj(self, channel_impl): + self.impl = channel_impl() + self.impl.channel_number = 1 + + return pika.adapters.blocking_connection.BlockingChannel(self.impl, self.connection) + + def _generate_delivery(self, consumer_tag, properties, body): + from pika.adapters.blocking_connection import _ConsumerDeliveryEvt + + # Wait until queue consumer is initialized + while self.obj._queue_consumer_generator is None: + time.sleep(0.25) + + method = pika.spec.Basic.Deliver(consumer_tag=consumer_tag) + self.obj._on_consumer_generator_event(_ConsumerDeliveryEvt(method, properties, body)) + + def test_consume(self): + consumed_deliveries = [] + def __consume(): + for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): + # Skip deliveries generated due to inactivity + if delivery is not None and any(delivery): + consumed_deliveries.append(delivery) + + break + + consumer_tag = "test.consumer" + + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + self.impl._consumers = {} + + t = threading.Thread(target=__consume) + t.start() + + self._generate_delivery(consumer_tag, pika.BasicProperties(), "Hello!") + + t.join(timeout=5.0) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.t) + self.assertIsNone(rabbitmq_span.p) + self.assertIsNotNone(rabbitmq_span.s) + + # Error logging + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + self.assertEqual(1, len(consumed_deliveries)) + + def test_consume_with_trace_context(self): + consumed_deliveries = [] + def __consume(): + for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): + # Skip deliveries generated due to inactivity + if delivery is not None and any(delivery): + consumed_deliveries.append(delivery) + + break + + consumer_tag = "test.consumer" + + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + self.impl._consumers = {} + + t = threading.Thread(target=__consume) + t.start() + + self._generate_delivery(consumer_tag, pika.BasicProperties(headers={ + "X-Instana-T": "0000000000000001", + "X-Instana-S": "0000000000000002", + "X-Instana-L": "1" + }), "Hello!") + + t.join(timeout=5.0) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # Trace context propagation + self.assertEqual("0000000000000001", rabbitmq_span.t) + self.assertEqual("0000000000000002", rabbitmq_span.p) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.s) + self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) diff --git a/tests/requirements.txt b/tests/requirements.txt index b01a6f86..a3090981 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -17,7 +17,7 @@ nose>=1.0 PyMySQL[rsa]>=0.9.1 pyOpenSSL>=16.1.0;python_version<="2.7" psycopg2>=2.7.1 -pika>=1.1.0 +pika>=1.0.0 pymongo>=3.7.0 pyramid>=1.2 pytest>=4.6 From 8d4289cb21e10dcc37d07604661d2d9b41f85ca7 Mon Sep 17 00:00:00 2001 From: Andrey Slotin Date: Mon, 21 Dec 2020 16:33:42 +0100 Subject: [PATCH 5/5] pika: use decorators to wrap instrumented methods --- instana/instrumentation/pika.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instana/instrumentation/pika.py b/instana/instrumentation/pika.py index f4897bf3..a78a3572 100644 --- a/instana/instrumentation/pika.py +++ b/instana/instrumentation/pika.py @@ -29,6 +29,7 @@ def _extract_consumer_tags(span, conn, queue): span.set_tag("sort", "consume") span.set_tag("queue", queue) + @wrapt.patch_function_wrapper('pika.channel', 'Channel.basic_publish') def basic_publish_with_instana(wrapped, instance, args, kwargs): def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs): return (exchange, routing_key, body, properties, args, kwargs) @@ -90,6 +91,7 @@ def _cb_wrapper(channel, method, properties, body): args = (queue, _cb_wrapper) + args return wrapped(*args, **kwargs) + @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.basic_consume') def basic_consume_with_instana(wrapped, instance, args, kwargs): def _bind_args(queue, on_consume_callback, *args, **kwargs): return (queue, on_consume_callback, args, kwargs) @@ -116,6 +118,7 @@ def _cb_wrapper(channel, method, properies, body): args = (queue, _cb_wrapper) + args return wrapped(*args, **kwargs) + @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.consume') def consume_with_instana(wrapped, instance, args, kwargs): def _bind_args(queue, *args, **kwargs): return (queue, args, kwargs) @@ -164,11 +167,9 @@ def _BlockingChannel___init__(wrapped, instance, args, kwargs): return ret - wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_publish', basic_publish_with_instana) wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_get', basic_get_with_instana) wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_consume', basic_get_with_instana) - wrapt.wrap_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.basic_consume', basic_get_with_instana) - wrapt.wrap_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.consume', consume_with_instana) + logger.debug("Instrumenting pika") except ImportError: