|
| 1 | +#!/usr/bin/env python |
| 2 | +import pika |
| 3 | +import uuid |
| 4 | + |
| 5 | +class FibonacciClient(object): |
| 6 | + def __init__(self): |
| 7 | + self.connection = pika.AsyncoreConnection(pika.ConnectionParameters( |
| 8 | + host='127.0.0.1', |
| 9 | + credentials=pika.PlainCredentials('guest', 'guest'))) |
| 10 | + self.channel = self.connection.channel() |
| 11 | + |
| 12 | + result = self.channel.queue_declare(auto_delete=True) |
| 13 | + self.callback_queue = result.queue |
| 14 | + |
| 15 | + self.requests = {} |
| 16 | + self.channel.basic_consume(self.on_response, no_ack=True, |
| 17 | + queue=self.callback_queue) |
| 18 | + |
| 19 | + def on_response(self, ch, method, props, body): |
| 20 | + corr_id = props.correlation_id |
| 21 | + if corr_id in self.requests: |
| 22 | + self.requests[corr_id] = body |
| 23 | + |
| 24 | + def call(self, n): |
| 25 | + corr_id = str(uuid.uuid4()) |
| 26 | + self.requests[corr_id] = None |
| 27 | + self.channel.basic_publish(exchange='', |
| 28 | + routing_key='rpc_queue', |
| 29 | + properties=pika.BasicProperties( |
| 30 | + reply_to = self.callback_queue, |
| 31 | + correlation_id = corr_id, |
| 32 | + ), |
| 33 | + body=str(n)) |
| 34 | + while self.requests[corr_id] is None: |
| 35 | + pika.asyncore_loop(count=1) |
| 36 | + response = self.requests[corr_id] |
| 37 | + del self.requests[corr_id] |
| 38 | + return int(response) |
| 39 | + |
| 40 | + |
| 41 | +fibonacci_rpc = FibonacciClient() |
| 42 | + |
| 43 | +print " [x] Requesting fib(30)" |
| 44 | +response = fibonacci_rpc.call(30) |
| 45 | +print " [.] Got %r" % (response,) |
| 46 | + |
0 commit comments