@@ -12,26 +12,30 @@ def __init__(self):
1212 result = self .channel .queue_declare (auto_delete = True )
1313 self .callback_queue = result .queue
1414
15+ self .requests = {}
16+ self .channel .basic_consume (self .on_response , no_ack = True ,
17+ queue = self .callback_queue )
18+
19+ def on_response (self , ch , method , props , body ):
20+ corr_id = props .correlation_id
21+ if corr_id in self .requests :
22+ self .requests [corr_id ] = body
23+
1524 def call (self , n ):
16- correlation_id = str (uuid .uuid4 ())
25+ corr_id = str (uuid .uuid4 ())
26+ self .requests [corr_id ] = None
1727 self .channel .basic_publish (exchange = '' ,
1828 routing_key = 'rpc_queue' ,
1929 properties = pika .BasicProperties (
2030 reply_to = self .callback_queue ,
21- correlation_id = correlation_id ,
31+ correlation_id = corr_id ,
2232 ),
2333 body = str (n ))
24- response = []
25- def on_basic_deliver (ch , method , props , body ):
26- if props .correlation_id == correlation_id :
27- response .append (body )
28- self .channel .basic_consume (on_basic_deliver ,
29- queue = self .callback_queue ,
30- no_ack = True )
31- while not response :
34+ while self .requests [corr_id ] is None :
3235 pika .asyncore_loop (count = 1 )
33-
34- return int (response [0 ])
36+ response = self .requests [corr_id ]
37+ del self .requests [corr_id ]
38+ return int (response )
3539
3640
3741fibonacci_rpc = FibonacciClient ()
0 commit comments