From 4a58dc1a9f0b7b9b97fc19ba93629b3f01c40ab9 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 2 Apr 2019 10:57:48 +0200 Subject: [PATCH 1/4] Better async tracer initialization --- instana/singletons.py | 2 +- instana/tracer.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/instana/singletons.py b/instana/singletons.py index d775b3e2..0e855545 100644 --- a/instana/singletons.py +++ b/instana/singletons.py @@ -20,7 +20,7 @@ if sys.version_info >= (3,4): from opentracing.scope_managers.asyncio import AsyncioScopeManager - async_tracer = InstanaTracer(AsyncioScopeManager()) + async_tracer = InstanaTracer(scope_manager=AsyncioScopeManager()) # Set ourselves as the tracer. opentracing.tracer = tracer diff --git a/instana/tracer.py b/instana/tracer.py index 53020daf..a5152cec 100644 --- a/instana/tracer.py +++ b/instana/tracer.py @@ -18,9 +18,9 @@ class InstanaTracer(BasicTracer): - def __init__(self, options=Options()): + def __init__(self, options=Options(), scope_manager=None): super(InstanaTracer, self).__init__( - InstanaRecorder(), InstanaSampler()) + InstanaRecorder(), InstanaSampler(), scope_manager) self._propagators[ot.Format.HTTP_HEADERS] = HTTPPropagator() self._propagators[ot.Format.TEXT_MAP] = TextPropagator() From af500ce7ea28d4802525468693857a4b732ffd83 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 2 Apr 2019 11:19:32 +0200 Subject: [PATCH 2/4] Syntax --- instana/log.py | 1 + 1 file changed, 1 insertion(+) diff --git a/instana/log.py b/instana/log.py index 3ae467ad..4dc151d1 100644 --- a/instana/log.py +++ b/instana/log.py @@ -3,6 +3,7 @@ logger = log.getLogger('instana') + def init(level): ch = log.StreamHandler() f = log.Formatter('%(asctime)s: %(process)d %(levelname)s %(name)s: %(message)s') From a703db6d9986c1281786b674237c326862e6186f Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 2 Apr 2019 11:20:02 +0200 Subject: [PATCH 3/4] Add test showing ensure_future usage --- tests/test_asynqp.py | 98 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/tests/test_asynqp.py b/tests/test_asynqp.py index 69d58590..b232aa0a 100644 --- a/tests/test_asynqp.py +++ b/tests/test_asynqp.py @@ -5,9 +5,14 @@ import unittest import asynqp +import aiohttp +import opentracing from instana.singletons import async_tracer +from .helpers import testenv + + rabbitmq_host = "" if "RABBITMQ_HOST" in os.environ: rabbitmq_host = os.environ["RABBITMQ_HOST"] @@ -49,6 +54,14 @@ def tearDown(self): """ Purge the queue """ self.loop.run_until_complete(self.reset()) + async def fetch(self, session, url, headers=None): + try: + async with session.get(url, headers=headers) as response: + return response + except aiohttp.web_exceptions.HTTPException: + pass + + def test_publish(self): @asyncio.coroutine def test(): @@ -298,3 +311,88 @@ def test(): self.assertIsNone(publish1_span.ec) self.assertFalse(publish2_span.error) self.assertIsNone(publish2_span.ec) + + def test_consume_with_ensure_future(self): + async def run_later(msg): + # Extract the context from the message (if there is any) + ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers)) + + # Start a new span to track work that is done processing this message + with async_tracer.start_active_span("run_later", child_of=ctx) as scope: + scope.span.set_tag("exchange", msg.exchange_name) + # print("") + # print("run_later active scope: %s" % async_tracer.scope_manager.active) + # print("") + async with aiohttp.ClientSession() as session: + return await self.fetch(session, testenv["wsgi_server"] + "/") + + def handle_message(msg): + # print("") + # print("handle_message active scope: %s" % async_tracer.scope_manager.active) + # print("") + async_tracer.inject(async_tracer.active_span.context, opentracing.Format.HTTP_HEADERS, msg.headers) + asyncio.ensure_future(run_later(msg)) + msg.ack() + + @asyncio.coroutine + def test(): + with async_tracer.start_active_span('test'): + msg1 = asynqp.Message({'consume': 'this'}) + self.exchange.publish(msg1, 'routing.key') + + self.consumer = yield from self.queue.consume(handle_message) + yield from asyncio.sleep(0.5) + self.consumer.cancel() + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + self.assertEqual(6, len(spans)) + + publish_span = spans[0] + test_span = spans[1] + consume_span = spans[2] + wsgi_span = spans[3] + aioclient_span = spans[4] + run_later_span = spans[5] + + self.assertIsNone(async_tracer.active_span) + + # Same traceId + self.assertEqual(test_span.t, publish_span.t) + self.assertEqual(test_span.t, consume_span.t) + self.assertEqual(test_span.t, aioclient_span.t) + self.assertEqual(test_span.t, wsgi_span.t) + + # Parent relationships + self.assertEqual(publish_span.p, test_span.s) + self.assertEqual(consume_span.p, publish_span.s) + self.assertEqual(aioclient_span.p, run_later_span.s) + self.assertEqual(run_later_span.p, consume_span.s) + self.assertEqual(wsgi_span.p, aioclient_span.s) + + # publish + self.assertEqual('test.exchange', publish_span.data.rabbitmq.exchange) + self.assertEqual('publish', publish_span.data.rabbitmq.sort) + self.assertIsNotNone(publish_span.data.rabbitmq.address) + self.assertEqual('routing.key', publish_span.data.rabbitmq.key) + self.assertIsNotNone(publish_span.stack) + self.assertTrue(type(publish_span.stack) is list) + self.assertGreater(len(publish_span.stack), 0) + + # consume + self.assertEqual('test.exchange', consume_span.data.rabbitmq.exchange) + self.assertEqual('consume', consume_span.data.rabbitmq.sort) + self.assertIsNotNone(consume_span.data.rabbitmq.address) + self.assertEqual('routing.key', consume_span.data.rabbitmq.key) + self.assertIsNotNone(consume_span.stack) + self.assertTrue(type(consume_span.stack) is list) + self.assertGreater(len(consume_span.stack), 0) + + # Error logging + self.assertFalse(test_span.error) + self.assertIsNone(test_span.ec) + self.assertFalse(consume_span.error) + self.assertIsNone(consume_span.ec) + self.assertFalse(publish_span.error) + self.assertIsNone(publish_span.ec) From 3d90662b04d8c1cb5f163b79ce7c3d8b96f3b061 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 2 Apr 2019 11:42:57 +0200 Subject: [PATCH 4/4] Limit Django version until 2.2 is validated --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 847590a9..681be0c3 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ def check_setuptools(): 'test': [ 'aiohttp>=3.5.4;python_version>="3.5"', 'asynqp>=0.4;python_version>="3.5"', - 'django>=1.11', + 'django>=1.11,<2.2', 'nose>=1.0', 'flask>=0.12.2', 'lxml>=3.4',