Permalink
Browse files

Merge pull request #30 from jianingy/auto-reconnect

make on_close event more convenient for users
  • Loading branch information...
2 parents 90b7dc7 + 0367b5c commit 52c4e42ddcb5243ec7d8caa9fbda8f6002259809 @paolo-losi committed Mar 9, 2016
Showing with 67 additions and 10 deletions.
  1. +30 −0 examples/tutorial7/auto_reconnect_receiver.py
  2. +23 −0 examples/tutorial7/loop_sender.py
  3. +14 −10 stormed/connection.py
@@ -0,0 +1,30 @@
+#! /usr/bin/env python
+
+import logging
+from tornado.ioloop import IOLoop
+from stormed import Connection
+
+
+def on_connect():
+ ch = conn.channel()
+ ch.queue_declare(queue='hello')
+ ch.consume('hello', callback, no_ack=True)
+
+
+def callback(msg):
+ print "[x] Received %r" % msg.body
+
+
+def on_close():
+ print "[*] reconnecting ..."
+ conn.connect(on_connect, on_close)
+
+logging.basicConfig()
+conn = Connection(host='localhost')
+conn.connect(on_connect, on_close)
+io_loop = IOLoop.instance()
+print ' [*] Waiting for messages. To exit press CTRL+C'
+try:
+ io_loop.start()
+except KeyboardInterrupt:
+ conn.close(io_loop.stop)
@@ -0,0 +1,23 @@
+#! /usr/bin/env python
+
+from tornado.ioloop import IOLoop, PeriodicCallback
+from stormed import Connection, Message
+
+msg = Message('Hello World!')
+
+
+def on_connect():
+ ch = conn.channel()
+ ch.queue_declare(queue='hello')
+
+ def _send():
+ ch.publish(msg, exchange='', routing_key='hello')
+ print '[*] Message sent'
+
+ cb = PeriodicCallback(_send, 1000)
+ cb.start()
+
+conn = Connection(host='localhost')
+conn.connect(on_connect)
+io_loop = IOLoop.instance()
+io_loop.start()
View
@@ -11,6 +11,7 @@
TORNADO_1_2 = hasattr(IOStream, 'connect')
+
class Connection(FrameHandler):
"""A "physical" TCP connection to the AMQP server
@@ -31,7 +32,7 @@ def handle_error(conn_error):
"""
def __init__(self, host, username='guest', password='guest', vhost='/',
- port=5672, heartbeat=0, io_loop=None):
+ port=5672, heartbeat=0, io_loop=None):
self.host = host
self.port = port
self.username = username
@@ -52,14 +53,15 @@ def __init__(self, host, username='guest', password='guest', vhost='/',
self._frame_count = 0
super(Connection, self).__init__(connection=self)
- def connect(self, callback):
+ def connect(self, callback, close_callback=None):
"""open the connection to the server"""
if self.status is not status.CLOSED:
raise AmqpStatusError('Connection status is %s' % self.status)
self.status = status.OPENING
sock = socket.socket()
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self.on_connect = callback
+ self.on_disconnect = close_callback
if TORNADO_1_2:
self.stream = IOStream(sock, io_loop=self.io_loop)
self.stream.set_close_callback(self.on_closed_stream)
@@ -148,17 +150,19 @@ def close_stream(self):
try:
self.stream.close()
finally:
- self.status = status.CLOSED
self.stream = None
def on_closed_stream(self):
- if self.status != status.CLOSED:
- if self.on_disconnect:
- try:
- self.on_disconnect()
- except Exception:
- logger.error('ERROR in on_disconnect() callback',
- exc_info=True)
+ if self.status == status.CLOSED:
+ return
+
+ self.status = status.CLOSED
+ if self.on_disconnect:
+ try:
+ self.on_disconnect()
+ except Exception:
+ logger.error('ERROR in on_disconnect() callback',
+ exc_info=True)
def reset(self):
for c in self.channels.values():

0 comments on commit 52c4e42

Please sign in to comment.