Permalink
Browse files

basic.return

  • Loading branch information...
1 parent c5ef4f1 commit e99bb170c3f306b669c8810c34d4ef9c36b43c29 @paolo-losi committed Jan 17, 2011
Showing with 33 additions and 2 deletions.
  1. +0 −1 TODO
  2. +4 −0 stormed/channel.py
  3. +11 −1 stormed/method/basic.py
  4. +18 −0 test/test_channel.py
View
1 TODO
@@ -1,5 +1,4 @@
* review of all methods
- - basic.return
- basic.recover?
- tx.select, tx.commit, tx.rollback
* check conn/channel status before sending frames
View
@@ -12,6 +12,7 @@ def __init__(self, channel_id, conn):
self.consumers = {}
self.status = status.CLOSED
self.on_error = None
+ self.on_return = None
self.flow_stopped = False
super(Channel, self).__init__(conn)
@@ -91,6 +92,9 @@ def publish(self, message, exchange, routing_key='', immediate=False,
mandatory=False):
if self.flow_stopped:
raise FlowStoppedException
+ if (immediate or mandatory) and self.on_return is None:
+ raise AmqpError("on_return callback must be set for "
+ "immediate or mandatory publishing")
self.send_method(basic.Publish(ticket = 0,
exchange = exchange,
routing_key = routing_key,
View
@@ -1,4 +1,4 @@
-from stormed.util import add_method
+from stormed.util import add_method, logger
from stormed.method.codegen.basic import *
@add_method(GetOk)
@@ -24,3 +24,13 @@ def handle(self, ch):
msg = ch.message
msg.rx_channel = ch
ch.consumers[self.consumer_tag].callback(msg)
+
+@add_method(Return)
+def handle(self, ch):
+ msg = ch.message
+ msg.rx_channel = ch
+ if ch.on_return:
+ try:
+ ch.on_return(msg)
+ except Exception:
+ logger.error('ERROR in on_return() callback', exc_info=True)
View
@@ -168,6 +168,24 @@ def purged(msg_count):
conn.connect(on_connect)
self.wait()
+ def test_basic_return(self):
+ test_msg = Message('test')
+ conn = Connection('localhost', io_loop=self.io_loop)
+
+ def on_connect():
+ ch = conn.channel()
+ ch.on_return = on_return
+ ch.exchange_declare('test_imm', durable=False)
+ ch.publish(test_msg, exchange='test_imm', immediate=True)
+
+ def on_return(msg):
+ rx = msg.rx_data
+ assert rx.reply_code == 313, rx.reply_code # NO_CONSUMERS
+ assert rx.exchange == 'test_imm', rx.exchange_declare
+ conn.close(self.stop)
+
+ conn.connect(on_connect)
+ self.wait()
if __name__ == '__main__':
unittest.main()

0 comments on commit e99bb17

Please sign in to comment.