Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

flow control + queue.unbind + queue.purge

  • Loading branch information...
commit 6e6a4a038b170385316b91a70fc93d4aa0b12696 1 parent 80470a1
@paolo-losi authored
View
3  TODO
@@ -1,6 +1,5 @@
* review of all methods
- - flow control
- - queue.bind and queue.purge
+ - queue.unbind and queue.purge
- refactor queue_declare callback
- basic.return
- basic.recover?
View
26 stormed/channel.py
@@ -1,15 +1,18 @@
-from stormed.util import Enum
-from stormed.method.channel import Open, Close
+from stormed.util import Enum, AmqpError
+from stormed.method.channel import Open, Close, Flow
from stormed.method import exchange as _exchange, basic, queue as _queue
from stormed.frame import FrameHandler, status
+class FlowStoppedException(AmqpError): pass
+
class Channel(FrameHandler):
def __init__(self, channel_id, conn):
self.channel_id = channel_id
self.consumers = {}
- self.status = status.CLOSED #FIXME is it needed?
+ self.status = status.CLOSED
self.on_error = None
+ self.flow_stopped = False
super(Channel, self).__init__(conn)
def open(self, callback=None):
@@ -66,6 +69,18 @@ def queue_bind(self, queue, exchange, routing_key='', callback=None):
nowait = False,
arguments = dict()), callback)
+ def queue_unbind(self, queue, exchange, routing_key='', callback=None):
+ self.send_method(_queue.Unind(ticket = 0,
+ queue = queue,
+ exchange = exchange,
+ routing_key = routing_key,
+ nowait = False,
+ arguments = dict()), callback)
+
+ def queue_purge(self, queue, callback=None):
+ self.send_method(_queue.Purge(ticket=0, queue=queue, nowait=False),
+ callback)
+
def qos(self, prefetch_size=0, prefetch_count=0, _global=False,
callback=None):
self.send_method(basic.Qos(prefetch_size = prefetch_size,
@@ -74,6 +89,8 @@ def qos(self, prefetch_size=0, prefetch_count=0, _global=False,
def publish(self, message, exchange, routing_key='', immediate=False,
mandatory=False):
+ if self.flow_stopped:
+ raise FlowStoppedException
self.send_method(basic.Publish(ticket = 0,
exchange = exchange,
routing_key = routing_key,
@@ -102,6 +119,9 @@ def set_consumer(consumer_tag):
arguments = dict())
self.send_method(_consume, set_consumer)
+ def flow(self, active, callback=None):
+ self.send_method(Flow(active=active), callback)
+
class Consumer(object):
def __init__(self, callback):
View
1  stormed/connection.py
@@ -5,6 +5,7 @@
from tornado.ioloop import IOLoop
from tornado import stack_context
+from stormed.util import logger
from stormed.frame import FrameReader, FrameHandler, status
from stormed import frame
from stormed.serialization import parse_method, table2str
View
7 stormed/method/channel.py
@@ -32,9 +32,12 @@ def handle(self, channel):
error_code = id2constant.get(self.reply_code, '')
if channel.on_error:
try:
- channel.on_error(ChannelError(error_code, self.reply_text, method))
+ channel.on_error(ChannelError(error_code, self.reply_text, method))
except Exception:
logger.error('ERROR in on_error() callback for channel %d',
channel.channel_id, exc_info=True)
-# TODO flow control
+@add_method(Flow)
+def handle(self, channel):
+ channel.flow_stopped = not self.active
+ self.send_method(FlowOk(active=self.active))
View
2  stormed/method/connection.py
@@ -1,4 +1,4 @@
-from stormed.util import add_method, AmqpError
+from stormed.util import add_method, AmqpError, logger
from stormed.serialization import table2str
from stormed.heartbeat import HeartbeatMonitor
from stormed.frame import status
View
4 stormed/method/queue.py
@@ -5,3 +5,7 @@
def handle(self, ch):
if ch.callback:
ch.invoke_callback(self.queue, self.message_count, self.consumer_count)
+
+@add_method(PurgeOk)
+def handle(self, channel):
+ channel.invoke_callback(self.message_count)
View
4 stormed/serialization.py
@@ -20,7 +20,9 @@ def parse_fields(fields, data):
parser = globals()['parse_%s' % f]
val, offset = parser(data, offset)
vals.append(val)
- assert offset == len(data), '%d %d' % (offset, len(data))
+
+ assert offset + int(bit_parser is not None) == len(data), \
+ '%d %d' % (offset, len(data))
return vals
def dump(o):
View
39 test/test_channel.py
@@ -130,5 +130,44 @@ def on_error(ch_error):
conn.connect(on_connect)
self.wait()
+ def test_channel_flow(self):
+
+ conn = Connection('localhost', io_loop=self.io_loop)
+
+ def on_connect():
+ self.ch = conn.channel()
+ self.ch.flow(active=False, callback=cleanup)
+
+ def cleanup():
+ conn.close(self.stop)
+
+ conn.connect(on_connect)
+ self.wait()
+
+ def test_purge_queue(self):
+
+ test_msg = Message('test')
+ conn = Connection('localhost', io_loop=self.io_loop)
+
+ def on_connect():
+ self.ch = conn.channel()
+ self.ch.queue_declare('test_purge_queue', auto_delete=True)
+ self.ch.exchange_declare('test_purge_exchange', durable=False)
+ self.ch.queue_bind(queue='test_purge_queue',
+ exchange='test_purge_exchange')
+
+ self.ch.queue_purge('test_purge_queue')
+ for _ in xrange(3):
+ self.ch.publish(test_msg, exchange='test_purge_exchange')
+ self.ch.queue_purge('test_purge_queue', purged)
+
+ def purged(msg_count):
+ assert msg_count==3, msg_count
+ conn.close(self.stop)
+
+ conn.connect(on_connect)
+ self.wait()
+
+
if __name__ == '__main__':
unittest.main()
Please sign in to comment.
Something went wrong with that request. Please try again.