Skip to content

Commit

Permalink
support for transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
paolo-losi committed Jan 17, 2011
1 parent 6eb1b7a commit adb18d5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
13 changes: 12 additions & 1 deletion stormed/channel.py
@@ -1,6 +1,6 @@
from stormed.util import Enum, AmqpError
from stormed.method.channel import Open, Close, Flow
from stormed.method import exchange as _exchange, basic, queue as _queue
from stormed.method import exchange as _exchange, basic, queue as _queue, tx
from stormed.frame import FrameHandler, status

class FlowStoppedException(AmqpError): pass
Expand Down Expand Up @@ -129,6 +129,17 @@ def recover(self, requeue=False, callback=None):
def flow(self, active, callback=None):
self.send_method(Flow(active=active), callback)

def select(self, callback=None):
if self.on_error is None:
raise AmqpError("Channel.on_error callback must be set for tx mode")
self.send_method(tx.Select(), callback)

def commit(self, callback=None):
self.send_method(tx.Commit(), callback)

def rollback(self, callback=None):
self.send_method(tx.Rollback(), callback)

class Consumer(object):

def __init__(self, callback):
Expand Down
16 changes: 16 additions & 0 deletions test/test_channel.py
Expand Up @@ -187,5 +187,21 @@ def on_return(msg):
conn.connect(on_connect)
self.wait()

def test_reliable_publishing(self):
test_msg = Message('test')
conn = Connection('localhost', io_loop=self.io_loop)

def on_connect():
ch = conn.channel()
ch.exchange_declare('test_imm', durable=False)
ch.on_error = lambda: None
ch.select()
ch.publish(test_msg, exchange='test_imm')
ch.commit(lambda: conn.close(self.stop))

conn.connect(on_connect)
self.wait()


if __name__ == '__main__':
unittest.main()

0 comments on commit adb18d5

Please sign in to comment.