diff --git a/instana/instrumentation/pika.py b/instana/instrumentation/pika.py index 550d9388..845ff9fe 100644 --- a/instana/instrumentation/pika.py +++ b/instana/instrumentation/pika.py @@ -32,7 +32,6 @@ def _extract_publisher_tags(span, conn, exchange, routing_key): def _extract_consumer_tags(span, conn, queue): _extract_broker_tags(span, conn) - span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port)) span.set_tag("sort", "consume") span.set_tag("queue", queue) @@ -119,7 +118,7 @@ def _cb_wrapper(channel, method, properties, body): with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: try: _extract_consumer_tags(scope.span, - conn=instance.connection, + conn=instance.connection._impl, queue=queue) except: logger.debug("basic_consume_with_instana: ", exc_info=True) diff --git a/instana/version.py b/instana/version.py index 1c6d4097..96afed3b 100644 --- a/instana/version.py +++ b/instana/version.py @@ -3,4 +3,4 @@ # Module version file. Used by setup.py and snapshot reporting. -VERSION = '1.37.0' +VERSION = '1.37.1' diff --git a/tests/clients/test_pika.py b/tests/clients/test_pika.py index 4a06d2eb..11dadacb 100644 --- a/tests/clients/test_pika.py +++ b/tests/clients/test_pika.py @@ -3,14 +3,12 @@ from __future__ import absolute_import -import os import pika import unittest import mock import threading import time -from ..helpers import testenv from instana.singletons import tracer @@ -375,3 +373,99 @@ def __consume(): # A new span has been started self.assertIsNotNone(rabbitmq_span.s) self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s) + + +class TestPikaBlockingChannelBlockingConnection(_TestPika): + @mock.patch('pika.adapters.blocking_connection.BlockingConnection', autospec=True) + def _create_connection(self, connection=None): + connection._impl = mock.create_autospec(pika.connection.Connection) + connection._impl.params = pika.connection.Parameters() + return connection + + @mock.patch('pika.channel.Channel', spec=pika.channel.Channel) + def _create_obj(self, channel_impl): + self.impl = channel_impl() + self.impl.channel_number = 1 + + return pika.adapters.blocking_connection.BlockingChannel(self.impl, self.connection) + + def _generate_delivery(self, method, properties, body): + from pika.adapters.blocking_connection import _ConsumerDeliveryEvt + evt = _ConsumerDeliveryEvt(method, properties, body) + self.obj._add_pending_event(evt) + self.obj._dispatch_events() + + def test_basic_consume(self): + consumer_tag = "test.consumer" + + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + + cb = mock.Mock() + + self.obj.basic_consume(queue="test.queue", on_message_callback=cb) + + body = "Hello!" + properties = pika.BasicProperties() + method = pika.spec.Basic.Deliver(consumer_tag) + self._generate_delivery(method, properties, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.t) + self.assertIsNone(rabbitmq_span.p) + self.assertIsNotNone(rabbitmq_span.s) + + # Error logging + self.assertIsNone(rabbitmq_span.ec) + + # Span tags + self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"]) + self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"]) + self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"]) + self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"]) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + cb.assert_called_once_with(self.obj, method, properties, body) + + def test_basic_consume_with_trace_context(self): + consumer_tag = "test.consumer" + + self.impl.basic_consume.return_value = consumer_tag + self.impl._generate_consumer_tag.return_value = consumer_tag + + cb = mock.Mock() + + self.obj.basic_consume(queue="test.queue", on_message_callback=cb) + + body = "Hello!" + properties = pika.BasicProperties(headers={ + "X-INSTANA-T": "0000000000000001", + "X-INSTANA-S": "0000000000000002", + "X-INSTANA-L": "1" + }) + method = pika.spec.Basic.Deliver(consumer_tag) + self._generate_delivery(method, properties, body) + + spans = self.recorder.queued_spans() + self.assertEqual(1, len(spans)) + + rabbitmq_span = spans[0] + + self.assertIsNone(tracer.active_span) + + # Trace context propagation + self.assertEqual("0000000000000001", rabbitmq_span.t) + self.assertEqual("0000000000000002", rabbitmq_span.p) + + # A new span has been started + self.assertIsNotNone(rabbitmq_span.s) + self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s)