Skip to content
Browse files

puka: six.

  • Loading branch information...
1 parent 75863fb commit c5c1973f29c5664cbe5ca94f6f9d36fc8be563c3 @majek majek committed
Showing with 76 additions and 0 deletions.
  1. +5 −0 python-puka/README.md
  2. +35 −0 python-puka/rpc_client.py
  3. +36 −0 python-puka/rpc_server.py
View
5 python-puka/README.md
@@ -49,3 +49,8 @@ You may need to install `pip` first:
python receive_logs_topic.py
python emit_log_topic.py
+
+[Tutorial six: RPC](http://www.rabbitmq.com/tutorial-six-python.html):
+
+ python rpc_server.py
+ python rpc_client.py
View
35 python-puka/rpc_client.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+import puka
+import uuid
+
+class FibonacciRpcClient(object):
+ def __init__(self):
+ self.client = client = puka.Client("amqp://localhost/")
+ promise = client.connect()
+ client.wait(promise)
+
+ promise = client.queue_declare(exclusive=True)
+ self.callback_queue = client.wait(promise)['queue']
+
+ self.consume_promise = client.basic_consume(queue=self.callback_queue,
+ no_ack=True)
+
+ def call(self, n):
+ correlation_id = str(uuid.uuid4())
+ # We don't need to wait on promise from publish, let it happen async.
+ self.client.basic_publish(exchange='',
+ routing_key='rpc_queue',
+ headers={'reply_to': self.callback_queue,
+ 'correlation_id': correlation_id},
+ body=str(n))
+ while True:
+ msg_result = self.client.wait(self.consume_promise)
+ if msg_result['headers']['correlation_id'] == correlation_id:
+ return int(msg_result['body'])
+
+
+fibonacci_rpc = FibonacciRpcClient()
+
+print " [x] Requesting fib(30)"
+response = fibonacci_rpc.call(30)
+print " [.] Got %r" % (response,)
View
36 python-puka/rpc_server.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+import puka
+
+client = puka.Client("amqp://localhost/")
+promise = client.connect()
+client.wait(promise)
+
+promise = client.queue_declare(queue='rpc_queue')
+client.wait(promise)
+
+# The worlds worst algorithm:
+def fib(n):
+ if n == 0:
+ return 0
+ elif n == 1:
+ return 1
+ else:
+ return fib(n-1) + fib(n-2)
+
+
+print " [x] Awaiting RPC requests"
+consume_promise = client.basic_consume(queue='rpc_queue', prefetch_count=1)
+while True:
+ msg_result = client.wait(consume_promise)
+ n = int(msg_result['body'])
+
+ print " [.] fib(%s)" % (n,)
+ response = fib(n)
+
+ # This publish doesn't need to be synchronous.
+ client.basic_publish(exchange='',
+ routing_key=msg_result['headers']['reply_to'],
+ headers={'correlation_id':
+ msg_result['headers']['correlation_id']},
+ body=str(response))
+ client.basic_ack(msg_result)

0 comments on commit c5c1973

Please sign in to comment.
Something went wrong with that request. Please try again.