File tree Expand file tree Collapse file tree 5 files changed +80
-5
lines changed Expand file tree Collapse file tree 5 files changed +80
-5
lines changed Original file line number Diff line number Diff line change 33import aiohttp
44import asyncio
55
6- from instana .singletons import async_tracer , agent
6+ from instana .singletons import async_tracer
7+
78
89async def test ():
910 while True :
10- await asyncio .sleep (1 )
11+ await asyncio .sleep (2 )
1112 with async_tracer .start_active_span ('JobRunner' ):
1213 async with aiohttp .ClientSession () as session :
13- async with session .get ("http://localhost:5102/?secret=iloveyou" ) as response :
14+ # aioserver exposes /, /401, /500 & /publish (via asynqp)
15+ async with session .get ("http://localhost:5102/publish?secret=iloveyou" ) as response :
1416 print (response .status )
1517
1618
Original file line number Diff line number Diff line change 99else :
1010 RABBITMQ_HOST = "localhost"
1111
12- class RabbitUtil ():
1312
13+ class RabbitUtil ():
1414 def __init__ (self , loop ):
1515 self .loop = loop
1616 self .loop .run_until_complete (self .connect ())
Original file line number Diff line number Diff line change 1+ import xmlrpc .client
2+
3+ import time
4+ import opentracing
5+
6+ while True :
7+ time .sleep (2 )
8+ with opentracing .tracer .start_active_span ('RPCJobRunner' ) as rscope :
9+ rscope .span .set_tag ("span.kind" , "entry" )
10+ rscope .span .set_tag ("http.url" , "http://jobkicker.instana.com/runrpcjob" )
11+ rscope .span .set_tag ("http.method" , "GET" )
12+ rscope .span .set_tag ("http.params" , "secret=iloveyou" )
13+
14+ with opentracing .tracer .start_active_span ("RPCClient" ) as scope :
15+ scope .span .set_tag ("span.kind" , "exit" )
16+ scope .span .set_tag ("rpc.host" , "rpc-api.instana.com:8261" )
17+ scope .span .set_tag ("rpc.call" , "dance" )
18+
19+ carrier = dict ()
20+ opentracing .tracer .inject (scope .span .context , opentracing .Format .HTTP_HEADERS , carrier )
21+
22+ with xmlrpc .client .ServerProxy ("http://localhost:8261/" ) as proxy :
23+
24+ result = proxy .dance ("NOW!" , carrier )
25+ scope .span .set_tag ("result" , result )
26+
27+ rscope .span .set_tag ("http.status_code" , 200 )
Original file line number Diff line number Diff line change 1+ from xmlrpc .server import SimpleXMLRPCServer
2+
3+ import opentracing
4+
5+
6+ def dance (payload , carrier ):
7+ ctx = opentracing .tracer .extract (opentracing .Format .HTTP_HEADERS , carrier )
8+
9+ with opentracing .tracer .start_active_span ('RPCServer' , child_of = ctx ) as scope :
10+ scope .span .set_tag ("span.kind" , "entry" )
11+ scope .span .set_tag ("rpc.call" , "dance" )
12+ scope .span .set_tag ("rpc.host" , "rpc-api.instana.com:8261" )
13+
14+ return "♪┏(°.°)┛┗(°.°)┓%s┗(°.°)┛┏(°.°)┓ ♪" % str (payload )
15+
16+
17+ server = SimpleXMLRPCServer (("localhost" , 8261 ))
18+ print ("Listening on port 8261..." )
19+ server .register_function (dance , "dance" )
20+ server .serve_forever ()
Original file line number Diff line number Diff line change @@ -87,13 +87,39 @@ def test():
8787 self .assertTrue (type (rabbitmq_span .stack ) is list )
8888 self .assertGreater (len (rabbitmq_span .stack ), 0 )
8989
90+ def test_many_publishes (self ):
91+ @asyncio .coroutine
92+ def test ():
93+ @asyncio .coroutine
94+ def publish_a_bunch (msg ):
95+ for _ in range (20 ):
96+ self .exchange .publish (msg , 'routing.key' )
97+
98+ with async_tracer .start_active_span ('test' ):
99+ msg = asynqp .Message ({'hello' : 'world' })
100+ yield from publish_a_bunch (msg )
101+
102+ for _ in range (10 ):
103+ msg = yield from self .queue .get ()
104+ self .assertIsNotNone (msg )
105+
106+ self .loop .run_until_complete (test ())
107+
108+ spans = self .recorder .queued_spans ()
109+ self .assertEqual (31 , len (spans ))
110+
111+ trace_id = spans [0 ].t
112+ for span in spans :
113+ self .assertEqual (span .t , trace_id )
114+
115+ self .assertIsNone (async_tracer .active_span )
116+
90117 def test_get (self ):
91118 @asyncio .coroutine
92119 def publish ():
93120 with async_tracer .start_active_span ('test' ):
94121 msg1 = asynqp .Message ({'consume' : 'this' })
95122 self .exchange .publish (msg1 , 'routing.key' )
96- asyncio .sleep (0.5 )
97123 msg = yield from self .queue .get ()
98124 self .assertIsNotNone (msg )
99125
You can’t perform that action at this time.
0 commit comments