diff --git a/example/asyncio/aioclient.py b/example/asyncio/aioclient.py index 8439adb1..d1db3eca 100644 --- a/example/asyncio/aioclient.py +++ b/example/asyncio/aioclient.py @@ -3,14 +3,16 @@ import aiohttp import asyncio -from instana.singletons import async_tracer, agent +from instana.singletons import async_tracer + async def test(): while True: - await asyncio.sleep(1) + await asyncio.sleep(2) with async_tracer.start_active_span('JobRunner'): async with aiohttp.ClientSession() as session: - async with session.get("http://localhost:5102/?secret=iloveyou") as response: + # aioserver exposes /, /401, /500 & /publish (via asynqp) + async with session.get("http://localhost:5102/publish?secret=iloveyou") as response: print(response.status) diff --git a/example/asyncio/aioserver.py b/example/asyncio/aioserver.py index e3c0aed1..3b8cea81 100644 --- a/example/asyncio/aioserver.py +++ b/example/asyncio/aioserver.py @@ -9,8 +9,8 @@ else: RABBITMQ_HOST = "localhost" -class RabbitUtil(): +class RabbitUtil(): def __init__(self, loop): self.loop = loop self.loop.run_until_complete(self.connect()) diff --git a/example/xmlrpc/rpcclient.py b/example/xmlrpc/rpcclient.py new file mode 100644 index 00000000..99905332 --- /dev/null +++ b/example/xmlrpc/rpcclient.py @@ -0,0 +1,27 @@ +import xmlrpc.client + +import time +import opentracing + +while True: + time.sleep(2) + with opentracing.tracer.start_active_span('RPCJobRunner') as rscope: + rscope.span.set_tag("span.kind", "entry") + rscope.span.set_tag("http.url", "http://jobkicker.instana.com/runrpcjob") + rscope.span.set_tag("http.method", "GET") + rscope.span.set_tag("http.params", "secret=iloveyou") + + with opentracing.tracer.start_active_span("RPCClient") as scope: + scope.span.set_tag("span.kind", "exit") + scope.span.set_tag("rpc.host", "rpc-api.instana.com:8261") + scope.span.set_tag("rpc.call", "dance") + + carrier = dict() + opentracing.tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, carrier) + + with xmlrpc.client.ServerProxy("http://localhost:8261/") as proxy: + + result = proxy.dance("NOW!", carrier) + scope.span.set_tag("result", result) + + rscope.span.set_tag("http.status_code", 200) diff --git a/example/xmlrpc/rpcserver.py b/example/xmlrpc/rpcserver.py new file mode 100644 index 00000000..21d5f998 --- /dev/null +++ b/example/xmlrpc/rpcserver.py @@ -0,0 +1,20 @@ +from xmlrpc.server import SimpleXMLRPCServer + +import opentracing + + +def dance(payload, carrier): + ctx = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, carrier) + + with opentracing.tracer.start_active_span('RPCServer', child_of=ctx) as scope: + scope.span.set_tag("span.kind", "entry") + scope.span.set_tag("rpc.call", "dance") + scope.span.set_tag("rpc.host", "rpc-api.instana.com:8261") + + return "♪┏(°.°)┛┗(°.°)┓%s┗(°.°)┛┏(°.°)┓ ♪" % str(payload) + + +server = SimpleXMLRPCServer(("localhost", 8261)) +print("Listening on port 8261...") +server.register_function(dance, "dance") +server.serve_forever() \ No newline at end of file diff --git a/tests/test_asynqp.py b/tests/test_asynqp.py index 5ed81c4d..69d58590 100644 --- a/tests/test_asynqp.py +++ b/tests/test_asynqp.py @@ -87,13 +87,39 @@ def test(): self.assertTrue(type(rabbitmq_span.stack) is list) self.assertGreater(len(rabbitmq_span.stack), 0) + def test_many_publishes(self): + @asyncio.coroutine + def test(): + @asyncio.coroutine + def publish_a_bunch(msg): + for _ in range(20): + self.exchange.publish(msg, 'routing.key') + + with async_tracer.start_active_span('test'): + msg = asynqp.Message({'hello': 'world'}) + yield from publish_a_bunch(msg) + + for _ in range(10): + msg = yield from self.queue.get() + self.assertIsNotNone(msg) + + self.loop.run_until_complete(test()) + + spans = self.recorder.queued_spans() + self.assertEqual(31, len(spans)) + + trace_id = spans[0].t + for span in spans: + self.assertEqual(span.t, trace_id) + + self.assertIsNone(async_tracer.active_span) + def test_get(self): @asyncio.coroutine def publish(): with async_tracer.start_active_span('test'): msg1 = asynqp.Message({'consume': 'this'}) self.exchange.publish(msg1, 'routing.key') - asyncio.sleep(0.5) msg = yield from self.queue.get() self.assertIsNotNone(msg)