From 0f747eb4cb5772658689b57fbe92846e499b97d0 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 2 Apr 2019 15:01:40 +0200 Subject: [PATCH 1/6] Carry context across ensure_future calls --- instana/__init__.py | 1 + instana/instrumentation/asyncio.py | 24 ++++++++++++ runtests.py | 2 +- tests/test_asyncio.py | 59 ++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 instana/instrumentation/asyncio.py create mode 100644 tests/test_asyncio.py diff --git a/instana/__init__.py b/instana/__init__.py index 9c2b4851..c1e4fd19 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -62,6 +62,7 @@ def boot_agent(): if "INSTANA_DISABLE_AUTO_INSTR" not in os.environ: # Import & initialize instrumentation if sys.version_info >= (3, 5, 3): + from .instrumentation import asyncio # noqa from .instrumentation.aiohttp import client # noqa from .instrumentation.aiohttp import server # noqa from .instrumentation import asynqp # noqa diff --git a/instana/instrumentation/asyncio.py b/instana/instrumentation/asyncio.py new file mode 100644 index 00000000..0f563789 --- /dev/null +++ b/instana/instrumentation/asyncio.py @@ -0,0 +1,24 @@ +from __future__ import absolute_import + +import wrapt + +from ..log import logger +from ..singletons import async_tracer + +try: + import asyncio + + @wrapt.patch_function_wrapper('asyncio','ensure_future') + def ensure_future_with_instana(wrapped, instance, argv, kwargs): + + scope = async_tracer.scope_manager.active + task = wrapped(*argv, **kwargs) + + if scope is not None: + async_tracer.scope_manager._set_task_scope(scope, task=task) + + return task + + logger.debug("Instrumenting asyncio") +except ImportError: + pass diff --git a/runtests.py b/runtests.py index 7737f98e..56ae121d 100644 --- a/runtests.py +++ b/runtests.py @@ -5,7 +5,7 @@ command_line = [__file__, '--verbose'] if (LooseVersion(sys.version) < LooseVersion('3.5.3')): - command_line.extend(['-e', 'asynqp', '-e', 'aiohttp']) + command_line.extend(['-e', 'asynqp', '-e', 'aiohttp', '-e', 'async']) if (LooseVersion(sys.version) >= LooseVersion('3.7.0')): command_line.extend(['-e', 'sudsjurko']) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py new file mode 100644 index 00000000..24920df9 --- /dev/null +++ b/tests/test_asyncio.py @@ -0,0 +1,59 @@ +from __future__ import absolute_import + +import asyncio +import unittest + +import aiohttp + +from instana.singletons import async_tracer + +from .helpers import testenv + + +class TestAsyncio(unittest.TestCase): + def setUp(self): + """ Clear all spans before a test run """ + self.recorder = async_tracer.recorder + self.recorder.clear_spans() + + # New event loop for every test + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(None) + + def tearDown(self): + """ Purge the queue """ + pass + + async def fetch(self, session, url, headers=None): + try: + async with session.get(url, headers=headers) as response: + return response + except aiohttp.web_exceptions.HTTPException: + pass + + def test_ensure_future(self): + async def run_later(msg="Hello"): + print("run_later: %s" % async_tracer.active_span.operation_name) + async with aiohttp.ClientSession() as session: + return await self.fetch(session, testenv["wsgi_server"] + "/") + + async def test(): + with async_tracer.start_active_span('test'): + asyncio.ensure_future(run_later("Hello")) + await asyncio.sleep(1) + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + test_span = spans[0] + wsgi_span = spans[1] + aioclient_span = spans[2] + + self.assertEqual(test_span.t, wsgi_span.t) + self.assertEqual(test_span.t, aioclient_span.t) + + self.assertEqual(test_span.p, None) + self.assertEqual(wsgi_span.p, aioclient_span.s) + self.assertEqual(aioclient_span.p, test_span.s) From 112b6e06dce571889cb797cb7842b91129c2c392 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 2 Apr 2019 16:33:50 +0200 Subject: [PATCH 2/6] Add test without context --- tests/test_asyncio.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index 24920df9..36b4c9e4 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -33,14 +33,14 @@ async def fetch(self, session, url, headers=None): def test_ensure_future(self): async def run_later(msg="Hello"): - print("run_later: %s" % async_tracer.active_span.operation_name) + # print("run_later: %s" % async_tracer.active_span.operation_name) async with aiohttp.ClientSession() as session: return await self.fetch(session, testenv["wsgi_server"] + "/") async def test(): with async_tracer.start_active_span('test'): asyncio.ensure_future(run_later("Hello")) - await asyncio.sleep(1) + await asyncio.sleep(0.5) self.loop.run_until_complete(test()) @@ -57,3 +57,21 @@ async def test(): self.assertEqual(test_span.p, None) self.assertEqual(wsgi_span.p, aioclient_span.s) self.assertEqual(aioclient_span.p, test_span.s) + + def test_ensure_future_without_context(self): + async def run_later(msg="Hello"): + # print("run_later: %s" % async_tracer.active_span.operation_name) + async with aiohttp.ClientSession() as session: + return await self.fetch(session, testenv["wsgi_server"] + "/") + + async def test(): + asyncio.ensure_future(run_later("Hello")) + await asyncio.sleep(0.5) + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + + # Only the WSGI webserver generated a span (entry span) + self.assertEqual(1, len(spans)) + self.assertEqual("wsgi", spans[0].n) From af60ee518e9d43fac1f4e5f757dbde7f745ef6dd Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 2 Apr 2019 16:55:20 +0200 Subject: [PATCH 3/6] Add support for create_task (with tests) --- instana/instrumentation/asyncio.py | 12 +++++++- tests/test_asyncio.py | 46 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/instana/instrumentation/asyncio.py b/instana/instrumentation/asyncio.py index 0f563789..f0c36015 100644 --- a/instana/instrumentation/asyncio.py +++ b/instana/instrumentation/asyncio.py @@ -10,7 +10,6 @@ @wrapt.patch_function_wrapper('asyncio','ensure_future') def ensure_future_with_instana(wrapped, instance, argv, kwargs): - scope = async_tracer.scope_manager.active task = wrapped(*argv, **kwargs) @@ -19,6 +18,17 @@ def ensure_future_with_instana(wrapped, instance, argv, kwargs): return task + if hasattr(asyncio, "create_task"): + @wrapt.patch_function_wrapper('asyncio','create_task') + def create_task_with_instana(wrapped, instance, argv, kwargs): + scope = async_tracer.scope_manager.active + task = wrapped(*argv, **kwargs) + + if scope is not None: + async_tracer.scope_manager._set_task_scope(scope, task=task) + + return task + logger.debug("Instrumenting asyncio") except ImportError: pass diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index 36b4c9e4..cb9a4c82 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -75,3 +75,49 @@ async def test(): # Only the WSGI webserver generated a span (entry span) self.assertEqual(1, len(spans)) self.assertEqual("wsgi", spans[0].n) + + if hasattr(asyncio, "create_task"): + def test_create_task(self): + async def run_later(msg="Hello"): + # print("run_later: %s" % async_tracer.active_span.operation_name) + async with aiohttp.ClientSession() as session: + return await self.fetch(session, testenv["wsgi_server"] + "/") + + async def test(): + with async_tracer.start_active_span('test'): + asyncio.create_task(run_later("Hello")) + await asyncio.sleep(0.5) + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + self.assertEqual(3, len(spans)) + + test_span = spans[0] + wsgi_span = spans[1] + aioclient_span = spans[2] + + self.assertEqual(test_span.t, wsgi_span.t) + self.assertEqual(test_span.t, aioclient_span.t) + + self.assertEqual(test_span.p, None) + self.assertEqual(wsgi_span.p, aioclient_span.s) + self.assertEqual(aioclient_span.p, test_span.s) + + def test_create_task_without_context(self): + async def run_later(msg="Hello"): + # print("run_later: %s" % async_tracer.active_span.operation_name) + async with aiohttp.ClientSession() as session: + return await self.fetch(session, testenv["wsgi_server"] + "/") + + async def test(): + asyncio.create_task(run_later("Hello")) + await asyncio.sleep(0.5) + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + + # Only the WSGI webserver generated a span (entry span) + self.assertEqual(1, len(spans)) + self.assertEqual("wsgi", spans[0].n) From 919ca91ea66ecfab08787b4eea1f59c17b4eeec4 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 4 Apr 2019 11:42:16 +0200 Subject: [PATCH 4/6] Add package configurator module --- instana/configurator.py | 28 ++++++++++++++++++++++++++++ instana/instrumentation/asyncio.py | 7 +++++++ tests/test_configurator.py | 16 ++++++++++++++++ 3 files changed, 51 insertions(+) create mode 100644 instana/configurator.py create mode 100644 tests/test_configurator.py diff --git a/instana/configurator.py b/instana/configurator.py new file mode 100644 index 00000000..ba477ba6 --- /dev/null +++ b/instana/configurator.py @@ -0,0 +1,28 @@ +from __future__ import absolute_import +from collections import defaultdict + +# This file contains a config object that will hold configuration options for the package. +# Defaults are set and can be overridden after package load. + + +# Simple implementation of a nested dictionary. +# +# Same as: +# stan_dictionary = lambda: defaultdict(stan_dictionary) +# but we use the function form because of PEP 8 +# +def stan_dictionary(): + return defaultdict(stan_dictionary) + + +# La Protagonista +config = stan_dictionary() + + +# This option determines if tasks created via asyncio (with ensure_future or create_task) will +# automatically carry existing context into the created task. +config['asyncio_task_context_propagation']['enabled'] = False + + + + diff --git a/instana/instrumentation/asyncio.py b/instana/instrumentation/asyncio.py index f0c36015..62ed670d 100644 --- a/instana/instrumentation/asyncio.py +++ b/instana/instrumentation/asyncio.py @@ -4,12 +4,16 @@ from ..log import logger from ..singletons import async_tracer +from ..configurator import config try: import asyncio @wrapt.patch_function_wrapper('asyncio','ensure_future') def ensure_future_with_instana(wrapped, instance, argv, kwargs): + if config['asyncio_task_context_propagation']['enabled'] is False: + return wrapped(*argv, **kwargs) + scope = async_tracer.scope_manager.active task = wrapped(*argv, **kwargs) @@ -21,6 +25,9 @@ def ensure_future_with_instana(wrapped, instance, argv, kwargs): if hasattr(asyncio, "create_task"): @wrapt.patch_function_wrapper('asyncio','create_task') def create_task_with_instana(wrapped, instance, argv, kwargs): + if config['asyncio_task_context_propagation']['enabled'] is False: + return wrapped(*argv, **kwargs) + scope = async_tracer.scope_manager.active task = wrapped(*argv, **kwargs) diff --git a/tests/test_configurator.py b/tests/test_configurator.py new file mode 100644 index 00000000..6c538d27 --- /dev/null +++ b/tests/test_configurator.py @@ -0,0 +1,16 @@ +from __future__ import absolute_import + +import unittest + +from instana.configurator import config + + +class TestRedis(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_has_default_config(self): + self.assertEqual(config['asyncio_task_context_propagation']['enabled'], False) \ No newline at end of file From 965fe3484c37fd50fe79561eddeae211073ca766 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 4 Apr 2019 11:51:52 +0200 Subject: [PATCH 5/6] Add section describing configuratior --- Configuration.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Configuration.md b/Configuration.md index 7e9f731c..1ef749f8 100644 --- a/Configuration.md +++ b/Configuration.md @@ -25,6 +25,22 @@ or instana.service_name = "myservice" ``` +## Package Configuration + +The Instana package includes a runtime configuration module that manages the configuration of various components. + +_Note: as the package evolves, more options will be added here_ + +```python +from instana.configurator import config + +# To enable tracing context propagation across Asyncio ensure_future and create_task calls +# Default is false +config['asyncio_task_context_propagation']['enabled'] = True + +``` + + ## Debugging & More Verbosity Setting `INSTANA_DEV` to a non nil value will enable extra logging output generally useful From 635a79323a4a37ea842ec84e14fb5a8c088e4875 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 4 Apr 2019 12:16:39 +0200 Subject: [PATCH 6/6] Use configurator in tests --- tests/test_asyncio.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index cb9a4c82..c826109e 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -6,6 +6,7 @@ import aiohttp from instana.singletons import async_tracer +from instana.configurator import config from .helpers import testenv @@ -20,6 +21,9 @@ def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) + # Restore default + config['asyncio_task_context_propagation']['enabled'] = False + def tearDown(self): """ Purge the queue """ pass @@ -31,7 +35,7 @@ async def fetch(self, session, url, headers=None): except aiohttp.web_exceptions.HTTPException: pass - def test_ensure_future(self): + def test_ensure_future_with_context(self): async def run_later(msg="Hello"): # print("run_later: %s" % async_tracer.active_span.operation_name) async with aiohttp.ClientSession() as session: @@ -42,6 +46,9 @@ async def test(): asyncio.ensure_future(run_later("Hello")) await asyncio.sleep(0.5) + # Override default task context propagation + config['asyncio_task_context_propagation']['enabled'] = True + self.loop.run_until_complete(test()) spans = self.recorder.queued_spans() @@ -65,19 +72,23 @@ async def run_later(msg="Hello"): return await self.fetch(session, testenv["wsgi_server"] + "/") async def test(): - asyncio.ensure_future(run_later("Hello")) + with async_tracer.start_active_span('test'): + asyncio.ensure_future(run_later("Hello")) await asyncio.sleep(0.5) self.loop.run_until_complete(test()) spans = self.recorder.queued_spans() - # Only the WSGI webserver generated a span (entry span) - self.assertEqual(1, len(spans)) - self.assertEqual("wsgi", spans[0].n) + self.assertEqual(2, len(spans)) + self.assertEqual("sdk", spans[0].n) + self.assertEqual("wsgi", spans[1].n) + + # Without the context propagated, we should get two separate traces + self.assertNotEqual(spans[0].t, spans[1].t) if hasattr(asyncio, "create_task"): - def test_create_task(self): + def test_create_task_with_context(self): async def run_later(msg="Hello"): # print("run_later: %s" % async_tracer.active_span.operation_name) async with aiohttp.ClientSession() as session: @@ -88,6 +99,9 @@ async def test(): asyncio.create_task(run_later("Hello")) await asyncio.sleep(0.5) + # Override default task context propagation + config['asyncio_task_context_propagation']['enabled'] = True + self.loop.run_until_complete(test()) spans = self.recorder.queued_spans() @@ -111,13 +125,17 @@ async def run_later(msg="Hello"): return await self.fetch(session, testenv["wsgi_server"] + "/") async def test(): - asyncio.create_task(run_later("Hello")) + with async_tracer.start_active_span('test'): + asyncio.create_task(run_later("Hello")) await asyncio.sleep(0.5) self.loop.run_until_complete(test()) spans = self.recorder.queued_spans() - # Only the WSGI webserver generated a span (entry span) - self.assertEqual(1, len(spans)) - self.assertEqual("wsgi", spans[0].n) + self.assertEqual(2, len(spans)) + self.assertEqual("sdk", spans[0].n) + self.assertEqual("wsgi", spans[1].n) + + # Without the context propagated, we should get two separate traces + self.assertNotEqual(spans[0].t, spans[1].t)