Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Tutorial 6 added.

  • Loading branch information...
commit 0f60d6f5ecc19207a0d65916f6d8c752467477c5 1 parent b0c2dfe
@brimcfadden brimcfadden authored
Showing with 84 additions and 0 deletions.
  1. +44 −0 examples/tutorial6/rpc_client.py
  2. +40 −0 examples/tutorial6/rpc_server.py
View
44 examples/tutorial6/rpc_client.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env pythong
+
+import logging
+import sys
+import uuid
+from tornado.ioloop import IOLoop
+from stormed import Connection, Message
+
+class FibonacciRpcClient(object):
+ def __init__(self, n):
+ self.conn = Connection(host='localhost')
+ self.conn.connect(self.on_connect)
+ self.n = n
+
+ def on_connect(self):
+ self.ch = self.conn.channel()
+ self.ch.queue_declare(exclusive=True, callback=self.on_queue_declare)
+
+ def on_queue_declare(self, q_info):
+ callback_queue = q_info.queue
+ self.ch.consume(callback_queue, self.on_response)
+ self.corr_id = str(uuid.uuid4())
+ msg = Message(str(self.n), delivery_mode=2, reply_to=callback_queue,
+ correlation_id=self.corr_id)
+ self.ch.publish(msg, exchange='', routing_key='rpc_queue')
+
+ def on_response(self, msg):
+ if self.corr_id == msg.correlation_id:
+ print " [x] Received %r" % msg.body
+ self.conn.close(callback=IOLoop.instance().stop)
+ print 'Closing connection.'
+
+logging.basicConfig()
+try:
+ n = int(sys.argv[1])
+except:
+ n = 30
+io_loop = IOLoop.instance()
+fibonacci_rpc = FibonacciRpcClient(n)
+print ' [x] Requesting fib(%s)' % n
+try:
+ io_loop.start()
+except:
+ io_loop.stop()
View
40 examples/tutorial6/rpc_server.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+
+import logging
+from tornado.ioloop import IOLoop
+from stormed import Connection, Message
+
+def fib(n):
+ if n == 0:
+ return 0
+ elif n == 1:
+ return 1
+ else:
+ return fib(n-1) + fib(n-2)
+
+def on_connect():
+ global ch
+ ch = conn.channel()
+ ch.queue_declare(queue='task_queue', durable=True)
+ ch.qos(prefetch_count=1)
+ ch.consume('rpc_queue', on_request)
+
+def on_request(msg):
+ n = int(msg.body)
+ print " [.] fib(%s)" % n
+ response = str(fib(n))
+ response_msg = Message(response, delivery_mode=2,
+ correlation_id=msg.correlation_id)
+ ch.publish(response_msg, exchange='', routing_key=msg.reply_to)
+ msg.ack()
+
+logging.basicConfig()
+ch = None
+conn = Connection(host='localhost')
+conn.connect(on_connect)
+io_loop = IOLoop.instance()
+print ' [*] Waiting for messages. To exit press CTRL+C'
+try:
+ io_loop.start()
+except KeyboardInterrupt:
+ conn.close(io_loop.stop)
Please sign in to comment.
Something went wrong with that request. Please try again.