Permalink
Browse files

documented public methods

  • Loading branch information...
1 parent 990f5c1 commit 152d979599fe6e020cc6f2aebbc372cab6070da0 @paolo-losi committed Jan 25, 2011
Showing with 118 additions and 0 deletions.
  1. +77 −0 stormed/channel.py
  2. +23 −0 stormed/connection.py
  3. +18 −0 stormed/message.py
View
@@ -6,6 +6,35 @@
class FlowStoppedException(AmqpError): pass
class Channel(FrameHandler):
+ """An AMQP Channel
+
+ And AMQP Channel represent a logical connection to the AMQP server.
+ Unless there are really specific needs, there is no reason to use
+ more than one Channel instance per process for a
+ standard stormed-amqp / tornadoweb application.
+
+ Then Channel class should be only instantiated by
+ stormed.Connection.channel method.
+
+ Channel.on_error callback is called in case of "Soft" AMQP Error with
+ a ChannelError instance as argument:
+
+ def on_channel_error(channel_error):
+ print channel_error.reply_code
+ print channel_error.reply_text
+ print channel_error.method
+
+ channel.on_error = on_channel_error
+
+ Channel.on_return is called when the AMQP server returns a
+ message published by the client ("basic.return").
+ the callback receives a stormed.Message as argument:
+
+ def on_msg_returned(msg):
+ print msg.rx_data.reply_code
+
+ channel.on_return = on_msg_returnedi
+ """
def __init__(self, channel_id, conn):
self.channel_id = channel_id
@@ -45,6 +74,18 @@ def exchange_delete(self, exchange, if_unused=False, callback=None):
def queue_declare(self, queue='', passive=False, durable=True,
exclusive=False, auto_delete=False, callback=None):
+ """implements "queue.declare" AMQP method
+
+ the callback receives as argument a queue.DeclareOk method instance:
+
+ def on_creation(qinfo):
+ print qinfo.queue # queue name
+ print qinfo.message_count
+ print qinfo.consumer_count
+
+ channel.queue_declare('queue_name', callback=on_creation)
+ """
+
self.send_method(_queue.Declare(ticket = 0,
queue = queue,
passive = passive,
@@ -79,6 +120,16 @@ def queue_unbind(self, queue, exchange, routing_key='', callback=None):
arguments = dict()), callback)
def queue_purge(self, queue, callback=None):
+ """implements "queue.purge" AMQP method
+
+ the callback receives as argument the number of purged messages:
+
+ def queue_purged(message_count):
+ print message_count
+
+ channel.queue_purge('queue_name')
+ """
+
self.send_method(_queue.Purge(ticket=0, queue=queue, nowait=False),
callback)
@@ -102,11 +153,30 @@ def publish(self, message, exchange, routing_key='', immediate=False,
immediate = immediate), message=message)
def get(self, queue, callback, no_ack=False):
+ """implements "basic.get" AMQP method
+
+ the callback receives as argument a stormed.Message instance
+ or None if the AMQP queue is empty:
+
+ def on_msg(msg):
+ if msg is not None:
+ print msg.body
+ else:
+ print "empty queue"
+
+ channel.get('queue_name', on_msg)
+ """
_get = basic.Get(ticket=0, queue=queue, no_ack=no_ack)
self.send_method(_get, callback)
def consume(self, queue, consumer, no_local=False, no_ack=False,
exclusive=False):
+ """implements "basic.consume" AMQP method
+
+ The consumer argument is either a callback or a Consumer instance.
+ The callback is called, with a Message instance as argument,
+ each time the client receives a message from the server.
+ """
if not isinstance(consumer, Consumer):
consumer = Consumer(consumer)
def set_consumer(consumer_tag):
@@ -141,12 +211,19 @@ def rollback(self, callback=None):
self.send_method(tx.Rollback(), callback)
class Consumer(object):
+ """AMQP Queue consumer
+
+ the Consumer can be used as Channel.consume() "consumer" argument
+ when the application must be able to stop a specific basic.consume message
+ flow from the server.
+ """
def __init__(self, callback):
self.tag = None
self.channel = None
self.callback = callback
def cancel(self, callback):
+ """implements "basic.cancel" AMQP method"""
_cancel = basic.Cancel(consumer_tag=self.tag, nowait=False)
self.channel.send_method(_cancel, callback)
View
@@ -12,6 +12,23 @@
TORNADO_1_2 = hasattr(IOStream, 'connect')
class Connection(FrameHandler):
+ """A "physical" TCP connection to the AMQP server
+
+ heartbeat: int, optional
+ the requested time interval in seconds for heartbeat frames.
+
+ Connection.on_error callback, when set, is called in case of
+ "hard" AMQP Error. It receives a ConnectionErrorinstance as argument:
+
+ def handle_error(conn_error):
+ print conn_error.method
+ print conn_error.reply_code
+
+ conn.on_error = handle_error
+
+ Connection.on_disconnect callback, when set, is called in case of
+ heartbeat timeout or TCP low level disconnection. It receives no args.
+ """
def __init__(self, host, username='guest', password='guest', vhost='/',
port=5672, heartbeat=0, io_loop=None):
@@ -36,6 +53,7 @@ def __init__(self, host, username='guest', password='guest', vhost='/',
super(Connection, self).__init__(connection=self)
def connect(self, callback):
+ """open the connection to the server"""
if self.status is not status.CLOSED:
raise Exception('Connection status is %s' % self.status)
self.status = status.OPENING
@@ -53,6 +71,10 @@ def connect(self, callback):
self._handshake()
def close(self, callback=None):
+ """cleanly closes the connection to the server.
+
+ all pending tasks are flushed before connection shutdown"""
+
if self.status != status.CLOSING:
self._close_callback = callback
self.status = status.CLOSING
@@ -70,6 +92,7 @@ def close(self, callback=None):
self.send_method(m, self._close_callback)
def channel(self, callback=None):
+ """get a Channel instance"""
if self.status == status.OPENED:
ch = Channel(channel_id=len(self.channels), conn=self)
self.channels.append(ch)
View
@@ -3,6 +3,22 @@
class Message(WithFields):
+ """An AMQP Message
+
+ The body parameter represents the message content. If the parameter
+ is a unicode object, it is encoded to UTF8.
+
+ The optional properties are those defined in the AMQP standard
+ (see stormed.method.codegen.basic.properties)
+
+ When the message is received from the server the rx_data attribute
+ contains the AMQP method instance (e.g. basic.GetOk, basic.Deliver).
+ This instance carries the server metadata (e.g. the redelivered bit).
+
+ A message received from the server can be acknowledged o rejected
+ with the Message.ack() and Message.reject() methods if required.
+ """
+
_fields = basic.properties
def __init__(self, body, **properties):
@@ -17,13 +33,15 @@ def __init__(self, body, **properties):
super(Message, self).__init__(**properties)
def ack(self, multiple=False):
+ """acknowledge the message"""
if self.rx_channel is None:
raise ValueError('cannot ack an unreceived message')
method = basic.Ack(delivery_tag=self.rx_data.delivery_tag,
multiple=multiple)
self.rx_channel.send_method(method)
def reject(self, requeue=True):
+ """reject the message"""
if self.rx_channel is None:
raise ValueError('cannot reject an unreceived message')
method = basic.Reject(delivery_tag=self.rx_data.delivery_tag,

0 comments on commit 152d979

Please sign in to comment.