diff --git a/.env-test b/.env-test new file mode 100644 index 00000000..5a88cbdb --- /dev/null +++ b/.env-test @@ -0,0 +1 @@ +export RABBITMQ_HOST="192.168.201.129" diff --git a/.travis.yml b/.travis.yml index fa4d6b20..aa754e99 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,5 +13,9 @@ before_install: install: "pip install -r requirements-test.txt" +sudo: required -script: nosetests -v +services: + - rabbitmq + +script: python runtests.py diff --git a/instana/__init__.py b/instana/__init__.py index 841a561c..c17c0284 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -59,6 +59,7 @@ def load(module): def load_instrumentation(): if "INSTANA_DISABLE_AUTO_INSTR" not in os.environ: # Import & initialize instrumentation + from .instrumentation import asynqp # noqa from .instrumentation import urllib3 # noqa from .instrumentation import sudsjurko # noqa from .instrumentation import mysqlpython # noqa diff --git a/instana/http_propagator.py b/instana/http_propagator.py index 00397c71..9646955c 100644 --- a/instana/http_propagator.py +++ b/instana/http_propagator.py @@ -58,7 +58,7 @@ def extract(self, carrier): # noqa raise ot.SpanContextCorruptedException() # Look for standard X-Instana-T/S format - if self.HEADER_KEY_T in dc and self.header_key_s in dc: + if self.HEADER_KEY_T in dc and self.HEADER_KEY_S in dc: trace_id = header_to_id(dc[self.HEADER_KEY_T]) span_id = header_to_id(dc[self.HEADER_KEY_S]) diff --git a/instana/instrumentation/asynqp.py b/instana/instrumentation/asynqp.py new file mode 100644 index 00000000..37468e5c --- /dev/null +++ b/instana/instrumentation/asynqp.py @@ -0,0 +1,97 @@ +from __future__ import absolute_import + +import opentracing +import opentracing.ext.tags as ext +import wrapt + +from ..log import logger +from ..singletons import tracer + +try: + import asyncio + import asynqp + + @wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish') + def publish_with_instana(wrapped, instance, args, kwargs): + parent_span = tracer.active_span + + # If we're not tracing, just return + if parent_span is None: + return wrapped(*args, **kwargs) + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + host, port = instance.sender.protocol.transport._sock.getsockname() + + msg = args[0] + if msg.headers is None: + msg.headers = {} + tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers) + + try: + scope.span.set_tag("exchange", instance.name) + scope.span.set_tag("sort", "publish") + scope.span.set_tag("address", host + ":" + str(port) ) + scope.span.set_tag("key", args[1]) + + rv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_kv({'message': e}) + scope.span.set_tag("error", True) + ec = scope.span.tags.get('ec', 0) + scope.span.set_tag("ec", ec+1) + raise + else: + return rv + + @wrapt.patch_function_wrapper('asynqp.queue','Queue.get') + def get_with_instana(wrapped, instance, args, kwargs): + parent_span = tracer.active_span + + with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope: + host, port = instance.sender.protocol.transport._sock.getsockname() + + try: + scope.span.set_tag("queue", instance.name) + scope.span.set_tag("sort", "consume") + scope.span.set_tag("address", host + ":" + str(port) ) + + rv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_kv({'message': e}) + scope.span.set_tag("error", True) + ec = scope.span.tags.get('ec', 0) + scope.span.set_tag("ec", ec+1) + raise + else: + return rv + + @wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver') + def deliver_with_instana(wrapped, instance, args, kwargs): + + ctx = None + msg = args[1] + if 'X-Instana-T' in msg.headers and 'X-Instana-S' in msg.headers: + ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers)) + + with tracer.start_active_span("rabbitmq", child_of=ctx) as scope: + host, port = args[1].sender.protocol.transport._sock.getsockname() + + try: + scope.span.set_tag("exchange", msg.exchange_name) + scope.span.set_tag("sort", "consume") + scope.span.set_tag("address", host + ":" + str(port) ) + scope.span.set_tag("key", msg.routing_key) + + rv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_kv({'message': e}) + scope.span.set_tag("error", True) + ec = scope.span.tags.get('ec', 0) + scope.span.set_tag("ec", ec+1) + raise + else: + return rv + + logger.debug("Instrumenting asynqp") +except ImportError: + pass diff --git a/instana/json_span.py b/instana/json_span.py index f1186d2d..15350662 100644 --- a/instana/json_span.py +++ b/instana/json_span.py @@ -17,6 +17,14 @@ def __init__(self, **kwds): self.__dict__[key] = kwds[key] +class CustomData(object): + tags = None + logs = None + + def __init__(self, **kwds): + self.__dict__.update(kwds) + + class Data(object): service = None http = None @@ -24,43 +32,47 @@ class Data(object): custom = None sdk = None soap = None + rabbitmq = None def __init__(self, **kwds): self.__dict__.update(kwds) -class MySQLData(object): - db = None +class HttpData(object): host = None - user = None - stmt = None + url = None + status = 0 + method = None error = None def __init__(self, **kwds): self.__dict__.update(kwds) -class HttpData(object): +class MySQLData(object): + db = None host = None - url = None - status = 0 - method = None + user = None + stmt = None error = None def __init__(self, **kwds): self.__dict__.update(kwds) -class SoapData(object): - action = None +class RabbitmqData(object): + exchange = None + queue = None + sort = None + address = None + key = None def __init__(self, **kwds): self.__dict__.update(kwds) -class CustomData(object): - tags = None - logs = None +class SoapData(object): + action = None def __init__(self, **kwds): self.__dict__.update(kwds) diff --git a/instana/recorder.py b/instana/recorder.py index 09b234a5..cb411f9c 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -12,7 +12,7 @@ import instana.singletons from .json_span import (CustomData, Data, HttpData, JsonSpan, MySQLData, - SDKData, SoapData) + RabbitmqData, SDKData, SoapData) from .log import logger if sys.version_info.major is 2: @@ -22,12 +22,12 @@ class InstanaRecorder(SpanRecorder): - registered_spans = ("django", "memcache", "mysql", "rpc-client", + registered_spans = ("django", "memcache", "mysql", "rabbitmq", "rpc-client", "rpc-server", "soap", "urllib3", "wsgi") http_spans = ("django", "wsgi", "urllib3", "soap") - exit_spans = ("memcache", "mysql", "rpc-client", "soap", "urllib3") - entry_spans = ("django", "wsgi", "rpc-server") + exit_spans = ("memcache", "mysql", "rabbitmq", "rpc-client", "soap", "urllib3") + entry_spans = ("django", "wsgi", "rabbitmq", "rpc-server") entry_kind = ["entry", "server", "consumer"] exit_kind = ["exit", "client", "producer"] @@ -91,9 +91,13 @@ def record_span(self, span): def build_registered_span(self, span): """ Takes a BasicSpan and converts it into a registered JsonSpan """ - data = Data(baggage=span.context.baggage, - custom=CustomData(tags=span.tags, - logs=self.collect_logs(span))) + data = Data(baggage=span.context.baggage) + + logs = self.collect_logs(span) + if len(logs) > 0: + if data.custom is None: + data.custom = CustomData() + data.custom.logs = logs if span.operation_name in self.http_spans: data.http = HttpData(host=self.get_http_host_name(span), @@ -102,6 +106,13 @@ def build_registered_span(self, span): status=span.tags.pop(ext.HTTP_STATUS_CODE, None), error=span.tags.pop('http.error', None)) + if span.operation_name == "rabbitmq": + data.rabbitmq = RabbitmqData(exchange=span.tags.pop('exchange', None), + queue=span.tags.pop('queue', None), + sort=span.tags.pop('sort', None), + address=span.tags.pop('address', None), + key=span.tags.pop('key', None)) + if span.operation_name == "soap": data.soap = SoapData(action=span.tags.pop('soap.action', None)) @@ -110,10 +121,15 @@ def build_registered_span(self, span): db=span.tags.pop(ext.DATABASE_INSTANCE, None), user=span.tags.pop(ext.DATABASE_USER, None), stmt=span.tags.pop(ext.DATABASE_STATEMENT, None)) - if len(data.custom.logs.keys()): + if (data.custom is not None) and (data.custom.logs is not None) and len(data.custom.logs): tskey = list(data.custom.logs.keys())[0] data.mysql.error = data.custom.logs[tskey]['message'] + if len(span.tags) > 0: + if data.custom is None: + data.custom = CustomData() + data.custom.tags = span.tags + entityFrom = {'e': instana.singletons.agent.from_.pid, 'h': instana.singletons.agent.from_.agentUuid} diff --git a/instana/tracer.py b/instana/tracer.py index 33d98e29..53020daf 100644 --- a/instana/tracer.py +++ b/instana/tracer.py @@ -93,13 +93,13 @@ def start_span(self, tags=tags, start_time=start_time) - if operation_name in self.recorder.entry_spans: - # For entry spans, add only a backtrace fingerprint - self.__add_stack(span, limit=2) - if operation_name in self.recorder.exit_spans: self.__add_stack(span) + elif operation_name in self.recorder.entry_spans: + # For entry spans, add only a backtrace fingerprint + self.__add_stack(span, limit=2) + return span def inject(self, span_context, format, carrier): diff --git a/runtests.py b/runtests.py new file mode 100644 index 00000000..5b59260f --- /dev/null +++ b/runtests.py @@ -0,0 +1,10 @@ +import sys +import nose +from distutils.version import LooseVersion + +args = ['nosetests', '-v'] + +if (LooseVersion(sys.version) <= LooseVersion('3.5')): + args.extend(['-e', 'asynqp']) + +result = nose.run(argv=args) diff --git a/setup.py b/setup.py index df9504ae..378845c9 100644 --- a/setup.py +++ b/setup.py @@ -47,12 +47,15 @@ def check_setuptools(): }, extras_require={ 'test': [ + 'asynqp>=0.4', 'django>=1.11', 'nose>=1.0', 'flask>=0.12.2', 'lxml>=3.4', + 'mock>=2.0.0', 'MySQL-python>=1.2.5;python_version<="2.7"', 'pyOpenSSL>=16.1.0;python_version<="2.7"', + 'pytest>=3.0.1', 'requests>=2.17.1', 'urllib3[secure]>=1.15', 'spyne>=2.9', diff --git a/tests/test_asynqp.py b/tests/test_asynqp.py new file mode 100644 index 00000000..53e01aa4 --- /dev/null +++ b/tests/test_asynqp.py @@ -0,0 +1,182 @@ +from __future__ import absolute_import + +import asyncio +import os +import sys +import unittest + +import asynqp + +from instana.singletons import tracer + +rabbitmq_host = "" +if "RABBITMQ_HOST" in os.environ: + rabbitmq_host = os.environ["RABBITMQ_HOST"] +else: + rabbitmq_host = "localhost" + +class TestAsynqp(unittest.TestCase): + @asyncio.coroutine + def connect(self): + # connect to the RabbitMQ broker + self.connection = yield from asynqp.connect(rabbitmq_host, 5672, username='guest', password='guest') + + # Open a communications channel + self.channel = yield from self.connection.open_channel() + + # Create a queue and an exchange on the broker + self.exchange = yield from self.channel.declare_exchange('test.exchange', 'direct') + self.queue = yield from self.channel.declare_queue('test.queue') + + # Bind the queue to the exchange, so the queue will get messages published to the exchange + yield from self.queue.bind(self.exchange, 'routing.key') + yield from self.queue.purge() + + def setUp(self): + """ Clear all spans before a test run """ + self.recorder = tracer.recorder + self.recorder.clear_spans() + + # New event loop for every test + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + self.loop.run_until_complete(self.connect()) + + def tearDown(self): + """ Purge the queue """ + self.queue.purge() + self.queue.delete(if_unused=False, if_empty=False) + + def test_publish(self): + @asyncio.coroutine + def test(): + with tracer.start_active_span('test'): + msg = asynqp.Message({'hello': 'world'}) + self.exchange.publish(msg, 'routing.key') + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + self.assertEqual(2, len(spans)) + + rabbitmq_span = spans[0] + test_span = spans[1] + + self.assertIsNone(tracer.active_span) + + # Same traceId + self.assertEqual(test_span.t, rabbitmq_span.t) + + # Parent relationships + self.assertEqual(rabbitmq_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(rabbitmq_span.error) + self.assertIsNone(rabbitmq_span.ec) + + # Rabbitmq + self.assertEqual('test.exchange', rabbitmq_span.data.rabbitmq.exchange) + self.assertEqual('publish', rabbitmq_span.data.rabbitmq.sort) + self.assertIsNotNone(rabbitmq_span.data.rabbitmq.address) + self.assertEqual('routing.key', rabbitmq_span.data.rabbitmq.key) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + def test_get(self): + @asyncio.coroutine + def test(): + with tracer.start_active_span('test'): + received_message = yield from self.queue.get() + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + self.assertEqual(2, len(spans)) + + rabbitmq_span = spans[0] + test_span = spans[1] + + self.assertIsNone(tracer.active_span) + + # Same traceId + self.assertEqual(test_span.t, rabbitmq_span.t) + + # Parent relationships + self.assertEqual(rabbitmq_span.p, test_span.s) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(rabbitmq_span.error) + self.assertIsNone(rabbitmq_span.ec) + + # Rabbitmq + self.assertEqual('test.queue', rabbitmq_span.data.rabbitmq.queue) + self.assertEqual('consume', rabbitmq_span.data.rabbitmq.sort) + self.assertIsNotNone(rabbitmq_span.data.rabbitmq.address) + self.assertIsNotNone(rabbitmq_span.stack) + self.assertTrue(type(rabbitmq_span.stack) is list) + self.assertGreater(len(rabbitmq_span.stack), 0) + + + def test_consume(self): + def handle_message(msg): + # print('>> {}'.format(msg.body)) + msg.ack() + + @asyncio.coroutine + def test(): + with tracer.start_active_span('test'): + msg1 = asynqp.Message({'consume': 'this'}) + self.exchange.publish(msg1, 'routing.key') + + yield from self.queue.consume(handle_message) + yield from asyncio.sleep(0.5) + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + publish_span = spans[0] + test_span = spans[1] + consume_span = spans[2] + + self.assertIsNone(tracer.active_span) + + # Same traceId + self.assertEqual(test_span.t, publish_span.t) + self.assertEqual(test_span.t, consume_span.t) + + # Parent relationships + self.assertEqual(publish_span.p, test_span.s) + self.assertEqual(consume_span.p, publish_span.s) + + # publish + self.assertEqual('test.exchange', publish_span.data.rabbitmq.exchange) + self.assertEqual('publish', publish_span.data.rabbitmq.sort) + self.assertIsNotNone(publish_span.data.rabbitmq.address) + self.assertEqual('routing.key', publish_span.data.rabbitmq.key) + self.assertIsNotNone(publish_span.stack) + self.assertTrue(type(publish_span.stack) is list) + self.assertGreater(len(publish_span.stack), 0) + + # consume + self.assertEqual('test.exchange', consume_span.data.rabbitmq.exchange) + self.assertEqual('consume', consume_span.data.rabbitmq.sort) + self.assertIsNotNone(consume_span.data.rabbitmq.address) + self.assertEqual('routing.key', consume_span.data.rabbitmq.key) + self.assertIsNotNone(consume_span.stack) + self.assertTrue(type(consume_span.stack) is list) + self.assertGreater(len(consume_span.stack), 0) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(consume_span.error) + self.assertIsNone(consume_span.ec) + self.assertFalse(publish_span.error) + self.assertIsNone(publish_span.ec)