diff --git a/instana/__init__.py b/instana/__init__.py index d0f9732a..39673179 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -157,6 +157,7 @@ def boot_agent(): from .instrumentation.tornado import client from .instrumentation.tornado import server from .instrumentation import logging + from .instrumentation import pika from .instrumentation import pymysql from .instrumentation import psycopg2 from .instrumentation import redis diff --git a/instana/agent/host.py b/instana/agent/host.py index 39c078f4..214bb6e4 100644 --- a/instana/agent/host.py +++ b/instana/agent/host.py @@ -253,7 +253,7 @@ def report_data_payload(self, payload): data=to_json(payload['profiles']), headers={"Content-Type": "application/json"}, timeout=0.8) - + if response is not None and 200 <= response.status_code <= 204: self.last_seen = datetime.now() diff --git a/instana/instrumentation/pika.py b/instana/instrumentation/pika.py new file mode 100644 index 00000000..a78a3572 --- /dev/null +++ b/instana/instrumentation/pika.py @@ -0,0 +1,176 @@ +# coding: utf-8 + +from __future__ import absolute_import + +import wrapt +import opentracing +import types + +from ..log import logger +from ..singletons import tracer + +try: + import pika + + def _extract_broker_tags(span, conn): + span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) + + def _extract_publisher_tags(span, conn, exchange, routing_key): + _extract_broker_tags(span, conn) + + span.set_tag("sort", "publish") + span.set_tag("key", routing_key) + span.set_tag("exchange", exchange) + + def _extract_consumer_tags(span, conn, queue): + _extract_broker_tags(span, conn) + + span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) + span.set_tag("sort", "consume") + span.set_tag("queue", queue) + + @wrapt.patch_function_wrapper('pika.channel', 'Channel.basic_publish') + def basic_publish_with_instana(wrapped, instance, args, kwargs): + def _bind_args(exchange, routing_key, body, properties=None, *args, **kwargs): + return (exchange, routing_key, body, properties, args, kwargs) + + parent_span = tracer.active_span + + if parent_span is None: + return wrapped(*args, **kwargs) + + (exchange, routing_key, body, properties, args, kwargs) = (_bind_args(*args, **kwargs)) + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + try: + _extract_publisher_tags(scope.span, + conn=instance.connection, + routing_key=routing_key, + exchange=exchange) + except: + logger.debug("publish_with_instana: ", exc_info=True) + + # context propagation + properties = properties or pika.BasicProperties() + properties.headers = properties.headers or {} + + tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, properties.headers) + args = (exchange, routing_key, body, properties) + args + + try: + rv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return rv + + def basic_get_with_instana(wrapped, instance, args, kwargs): + def _bind_args(queue, callback, *args, **kwargs): + return (queue, callback, args, kwargs) + + (queue, callback, args, kwargs) = (_bind_args(*args, **kwargs)) + + def _cb_wrapper(channel, method, properties, body): + parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + try: + _extract_consumer_tags(scope.span, + conn=instance.connection, + queue=queue) + except: + logger.debug("basic_get_with_instana: ", exc_info=True) + + try: + callback(channel, method, properties, body) + except Exception as e: + scope.span.log_exception(e) + raise + + args = (queue, _cb_wrapper) + args + return wrapped(*args, **kwargs) + + @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.basic_consume') + def basic_consume_with_instana(wrapped, instance, args, kwargs): + def _bind_args(queue, on_consume_callback, *args, **kwargs): + return (queue, on_consume_callback, args, kwargs) + + (queue, on_consume_callback, args, kwargs) = (_bind_args(*args, **kwargs)) + + def _cb_wrapper(channel, method, properies, body): + parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + try: + _extract_consumer_tags(scope.span, + conn=instance.connection, + queue=queue) + except: + logger.debug("basic_consume_with_instana: ", exc_info=True) + + try: + callback(channel, method, properties, body) + except Exception as e: + scope.span.log_exception(e) + raise + + args = (queue, _cb_wrapper) + args + return wrapped(*args, **kwargs) + + @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.consume') + def consume_with_instana(wrapped, instance, args, kwargs): + def _bind_args(queue, *args, **kwargs): + return (queue, args, kwargs) + + (queue, args, kwargs) = (_bind_args(*args, **kwargs)) + + def _consume(gen): + for yilded in gen: + # Bypass the delivery created due to inactivity timeout + if yilded is None or not any(yilded): + yield yilded + continue + + (method_frame, properties, body) = yilded + + parent_span = tracer.extract(opentracing.Format.HTTP_HEADERS, properties.headers) + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + try: + _extract_consumer_tags(scope.span, + conn=instance.connection._impl, + queue=queue) + except: + logger.debug("consume_with_instana: ", exc_info=True) + + try: + yield yilded + except Exception as e: + scope.span.log_exception(e) + raise + + args = (queue,) + args + res = wrapped(*args, **kwargs) + + if isinstance(res, types.GeneratorType): + return _consume(res) + else: + return res + + @wrapt.patch_function_wrapper('pika.adapters.blocking_connection', 'BlockingChannel.__init__') + def _BlockingChannel___init__(wrapped, instance, args, kwargs): + ret = wrapped(*args, **kwargs) + impl = getattr(instance, '_impl', None) + + if impl and hasattr(impl.basic_consume, '__wrapped__'): + impl.basic_consume = impl.basic_consume.__wrapped__ + + return ret + + wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_get', basic_get_with_instana) + wrapt.wrap_function_wrapper('pika.channel', 'Channel.basic_consume', basic_get_with_instana) + + + logger.debug("Instrumenting pika") +except ImportError: + pass diff --git a/instana/span.py b/instana/span.py index d1b72ae2..05f1ab13 100644 --- a/instana/span.py +++ b/instana/span.py @@ -110,7 +110,7 @@ def __init__(self, span, source, service_name, **kwargs): self.sy = span.synthetic self.__dict__.update(kwargs) - + def _validate_tags(self, tags): """ This method will loop through a set of tags to validate each key and value. diff --git a/tests/clients/test_pika.py b/tests/clients/test_pika.py new file mode 100644 index 00000000..8d56adcb --- /dev/null +++ b/tests/clients/test_pika.py @@ -0,0 +1,369 @@ +from __future__ import absolute_import + +import os +import pika +import unittest +import mock +import threading +import time + +from ..helpers import testenv +from instana.singletons import tracer + +class _TestPika(unittest.TestCase): + @staticmethod + @mock.patch('pika.connection.Connection') + def _create_connection(connection_class_mock=None): + return connection_class_mock() + + def _create_obj(self): + raise NotImplementedError() + + def setUp(self): + self.recorder = tracer.recorder + self.recorder.clear_spans() + + self.connection = self._create_connection() + self._on_openok_callback = mock.Mock() + self.obj = self._create_obj() + + def tearDown(self): + del self.connection + del self._on_openok_callback + del self.obj + +class TestPikaChannel(_TestPika): + def _create_obj(self): + return pika.channel.Channel(self.connection, 1, self._on_openok_callback) + + @mock.patch('pika.spec.Basic.Publish') + @mock.patch('pika.channel.Channel._send_method') + def test_basic_publish(self, send_method, _unused): + self.obj._set_state(self.obj.OPEN) + + with tracer.start_active_span("testing"): + self.obj.basic_publish("test.exchange", "test.queue", "Hello!") + + spans = self.recorder.queued_spans() + self.assertEqual(2, len(spans)) + + rabbitmq_span = spans[0] + test_span = spans[1] + + self.assertIsNone(tracer.active_span) + + # Same traceId + self.assertEqual(test_span.t, rabbitmq_span.t) + + # Parent relationships + self.assertEqual(rabbitmq_span.p, test_span.s) + + # Error logging + self.assertIsNone(test_span.ec) + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertEqual("test.exchange", rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual('publish', rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["key"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + send_method.assert_called_once_with( + pika.spec.Basic.Publish( + exchange="test.exchange", + routing_key="test.queue"), (pika.spec.BasicProperties(headers={ + "X-Instana-T": rabbitmq_span.t, + "X-Instana-S": rabbitmq_span.s, + "X-Instana-L": "1" + }), b"Hello!")) + + @mock.patch('pika.spec.Basic.Publish') + @mock.patch('pika.channel.Channel._send_method') + def test_basic_publish_with_headers(self, send_method, _unused): + self.obj._set_state(self.obj.OPEN) + + with tracer.start_active_span("testing"): + self.obj.basic_publish("test.exchange", + "test.queue", + "Hello!", + pika.BasicProperties(headers={ + "X-Custom-1": "test" + })) + + spans = self.recorder.queued_spans() + self.assertEqual(2, len(spans)) + + rabbitmq_span = spans[0] + test_span = spans[1] + + send_method.assert_called_once_with( + pika.spec.Basic.Publish( + exchange="test.exchange", + routing_key="test.queue"), (pika.spec.BasicProperties(headers={ + "X-Custom-1": "test", + "X-Instana-T": rabbitmq_span.t, + "X-Instana-S": rabbitmq_span.s, + "X-Instana-L": "1" + }), b"Hello!")) + + @mock.patch('pika.spec.Basic.Get') + def test_basic_get(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.BasicProperties() + + method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_get("test.queue", cb) + self.obj._on_getok(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.t) + self.assertIsNone(rabbitmq_span.p) + self.assertIsNotNone(rabbitmq_span.s) + + # Error logging + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + cb.assert_called_once_with(self.obj, pika.spec.Basic.GetOk, properties, body) + + @mock.patch('pika.spec.Basic.Get') + def test_basic_get_with_trace_context(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.BasicProperties(headers={ + "X-Instana-T": "0000000000000001", + "X-Instana-S": "0000000000000002", + "X-Instana-L": "1" + }) + + method_frame = pika.frame.Method(1, pika.spec.Basic.GetOk) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_get("test.queue", cb) + self.obj._on_getok(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # Trace context propagation + self.assertEqual("0000000000000001", rabbitmq_span.t) + self.assertEqual("0000000000000002", rabbitmq_span.p) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.s) + self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + + @mock.patch('pika.spec.Basic.Consume') + def test_basic_consume(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.BasicProperties() + + method_frame = pika.frame.Method(1, pika.spec.Basic.Deliver(consumer_tag="test")) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_consume("test.queue", cb, consumer_tag="test") + self.obj._on_deliver(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.t) + self.assertIsNone(rabbitmq_span.p) + self.assertIsNotNone(rabbitmq_span.s) + + # Error logging + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + cb.assert_called_once_with(self.obj, method_frame.method, properties, body) + + @mock.patch('pika.spec.Basic.Consume') + def test_basic_consume_with_trace_context(self, _unused): + self.obj._set_state(self.obj.OPEN) + + body = "Hello!" + properties = pika.BasicProperties(headers={ + "X-Instana-T": "0000000000000001", + "X-Instana-S": "0000000000000002", + "X-Instana-L": "1" + }) + + method_frame = pika.frame.Method(1, pika.spec.Basic.Deliver(consumer_tag="test")) + header_frame = pika.frame.Header(1, len(body), properties) + + cb = mock.Mock() + + self.obj.basic_consume("test.queue", cb, consumer_tag="test") + self.obj._on_deliver(method_frame, header_frame, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # Trace context propagation + self.assertEqual("0000000000000001", rabbitmq_span.t) + self.assertEqual("0000000000000002", rabbitmq_span.p) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.s) + self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + +class TestPikaBlockingChannel(_TestPika): + @mock.patch('pika.channel.Channel', spec=pika.channel.Channel) + def _create_obj(self, channel_impl): + self.impl = channel_impl() + self.impl.channel_number = 1 + + return pika.adapters.blocking_connection.BlockingChannel(self.impl, self.connection) + + def _generate_delivery(self, consumer_tag, properties, body): + from pika.adapters.blocking_connection import _ConsumerDeliveryEvt + + # Wait until queue consumer is initialized + while self.obj._queue_consumer_generator is None: + time.sleep(0.25) + + method = pika.spec.Basic.Deliver(consumer_tag=consumer_tag) + self.obj._on_consumer_generator_event(_ConsumerDeliveryEvt(method, properties, body)) + + def test_consume(self): + consumed_deliveries = [] + def __consume(): + for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): + # Skip deliveries generated due to inactivity + if delivery is not None and any(delivery): + consumed_deliveries.append(delivery) + + break + + consumer_tag = "test.consumer" + + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + self.impl._consumers = {} + + t = threading.Thread(target=__consume) + t.start() + + self._generate_delivery(consumer_tag, pika.BasicProperties(), "Hello!") + + t.join(timeout=5.0) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.t) + self.assertIsNone(rabbitmq_span.p) + self.assertIsNotNone(rabbitmq_span.s) + + # Error logging + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + self.assertEqual(1, len(consumed_deliveries)) + + def test_consume_with_trace_context(self): + consumed_deliveries = [] + def __consume(): + for delivery in self.obj.consume("test.queue", inactivity_timeout=3.0): + # Skip deliveries generated due to inactivity + if delivery is not None and any(delivery): + consumed_deliveries.append(delivery) + + break + + consumer_tag = "test.consumer" + + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + self.impl._consumers = {} + + t = threading.Thread(target=__consume) + t.start() + + self._generate_delivery(consumer_tag, pika.BasicProperties(headers={ + "X-Instana-T": "0000000000000001", + "X-Instana-S": "0000000000000002", + "X-Instana-L": "1" + }), "Hello!") + + t.join(timeout=5.0) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # Trace context propagation + self.assertEqual("0000000000000001", rabbitmq_span.t) + self.assertEqual("0000000000000002", rabbitmq_span.p) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.s) + self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) diff --git a/tests/requirements.txt b/tests/requirements.txt index 3e016f44..a3090981 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -17,6 +17,7 @@ nose>=1.0 PyMySQL[rsa]>=0.9.1 pyOpenSSL>=16.1.0;python_version<="2.7" psycopg2>=2.7.1 +pika>=1.0.0 pymongo>=3.7.0 pyramid>=1.2 pytest>=4.6 @@ -28,4 +29,4 @@ spyne>=2.9,<=2.12.14 suds-jurko>=0.6 tornado>=4.5.3,<6.0 uvicorn>=0.12.2;python_version>="3.6" -urllib3[secure]!=1.25.0,!=1.25.1,<1.26,>=1.21.1 \ No newline at end of file +urllib3[secure]!=1.25.0,!=1.25.1,<1.26,>=1.21.1