diff --git a/stormed/channel.py b/stormed/channel.py index 8777914..5dcd8a4 100644 --- a/stormed/channel.py +++ b/stormed/channel.py @@ -1,6 +1,6 @@ from stormed.util import Enum, AmqpError from stormed.method.channel import Open, Close, Flow -from stormed.method import exchange as _exchange, basic, queue as _queue +from stormed.method import exchange as _exchange, basic, queue as _queue, tx from stormed.frame import FrameHandler, status class FlowStoppedException(AmqpError): pass @@ -129,6 +129,17 @@ def recover(self, requeue=False, callback=None): def flow(self, active, callback=None): self.send_method(Flow(active=active), callback) + def select(self, callback=None): + if self.on_error is None: + raise AmqpError("Channel.on_error callback must be set for tx mode") + self.send_method(tx.Select(), callback) + + def commit(self, callback=None): + self.send_method(tx.Commit(), callback) + + def rollback(self, callback=None): + self.send_method(tx.Rollback(), callback) + class Consumer(object): def __init__(self, callback): diff --git a/test/test_channel.py b/test/test_channel.py index 39b32d4..33b2686 100644 --- a/test/test_channel.py +++ b/test/test_channel.py @@ -187,5 +187,21 @@ def on_return(msg): conn.connect(on_connect) self.wait() + def test_reliable_publishing(self): + test_msg = Message('test') + conn = Connection('localhost', io_loop=self.io_loop) + + def on_connect(): + ch = conn.channel() + ch.exchange_declare('test_imm', durable=False) + ch.on_error = lambda: None + ch.select() + ch.publish(test_msg, exchange='test_imm') + ch.commit(lambda: conn.close(self.stop)) + + conn.connect(on_connect) + self.wait() + + if __name__ == '__main__': unittest.main()