Permalink
Browse files

Merge pull request #23 from willkg/celery-2.5-update

Celery 2.5 update
  • Loading branch information...
2 parents 7040835 + 275892b commit 22bfbe94b69b23f11a943d78d092f1e6e457533c @willkg willkg committed Oct 23, 2012
Showing 365 changed files with 39,253 additions and 11,888 deletions.
View
@@ -1,9 +1,10 @@
#!/usr/bin/env python
-# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.1.4','console_scripts','camqadm'
-__requires__ = 'celery==2.1.4'
+# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.5.5','console_scripts','camqadm'
+__requires__ = 'celery==2.5.5'
import sys
from pkg_resources import load_entry_point
-sys.exit(
- load_entry_point('celery==2.1.4', 'console_scripts', 'camqadm')()
-)
+if __name__ == '__main__':
+ sys.exit(
+ load_entry_point('celery==2.5.5', 'console_scripts', 'camqadm')()
+ )
View
@@ -1,9 +1,10 @@
#!/usr/bin/env python
-# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.1.4','console_scripts','celerybeat'
-__requires__ = 'celery==2.1.4'
+# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.5.5','console_scripts','celerybeat'
+__requires__ = 'celery==2.5.5'
import sys
from pkg_resources import load_entry_point
-sys.exit(
- load_entry_point('celery==2.1.4', 'console_scripts', 'celerybeat')()
-)
+if __name__ == '__main__':
+ sys.exit(
+ load_entry_point('celery==2.5.5', 'console_scripts', 'celerybeat')()
+ )
View
@@ -1,9 +1,10 @@
#!/usr/bin/env python
-# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.1.4','console_scripts','celeryctl'
-__requires__ = 'celery==2.1.4'
+# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.5.5','console_scripts','celeryctl'
+__requires__ = 'celery==2.5.5'
import sys
from pkg_resources import load_entry_point
-sys.exit(
- load_entry_point('celery==2.1.4', 'console_scripts', 'celeryctl')()
-)
+if __name__ == '__main__':
+ sys.exit(
+ load_entry_point('celery==2.5.5', 'console_scripts', 'celeryctl')()
+ )
View
@@ -1,9 +1,10 @@
#!/usr/bin/env python
-# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.1.4','console_scripts','celeryd'
-__requires__ = 'celery==2.1.4'
+# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.5.5','console_scripts','celeryd'
+__requires__ = 'celery==2.5.5'
import sys
from pkg_resources import load_entry_point
-sys.exit(
- load_entry_point('celery==2.1.4', 'console_scripts', 'celeryd')()
-)
+if __name__ == '__main__':
+ sys.exit(
+ load_entry_point('celery==2.5.5', 'console_scripts', 'celeryd')()
+ )
View
@@ -1,9 +1,10 @@
#!/usr/bin/env python
-# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.1.4','console_scripts','celeryd-multi'
-__requires__ = 'celery==2.1.4'
+# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.5.5','console_scripts','celeryd-multi'
+__requires__ = 'celery==2.5.5'
import sys
from pkg_resources import load_entry_point
-sys.exit(
- load_entry_point('celery==2.1.4', 'console_scripts', 'celeryd-multi')()
-)
+if __name__ == '__main__':
+ sys.exit(
+ load_entry_point('celery==2.5.5', 'console_scripts', 'celeryd-multi')()
+ )
View
@@ -1,9 +1,10 @@
#!/usr/bin/env python
-# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.1.4','console_scripts','celeryev'
-__requires__ = 'celery==2.1.4'
+# EASY-INSTALL-ENTRY-SCRIPT: 'celery==2.5.5','console_scripts','celeryev'
+__requires__ = 'celery==2.5.5'
import sys
from pkg_resources import load_entry_point
-sys.exit(
- load_entry_point('celery==2.1.4', 'console_scripts', 'celeryev')()
-)
+if __name__ == '__main__':
+ sys.exit(
+ load_entry_point('celery==2.5.5', 'console_scripts', 'celeryev')()
+ )
View
@@ -1,9 +1,10 @@
#!/usr/bin/env python
-# EASY-INSTALL-ENTRY-SCRIPT: 'django-celery==2.1.4','console_scripts','djcelerymon'
-__requires__ = 'django-celery==2.1.4'
+# EASY-INSTALL-ENTRY-SCRIPT: 'django-celery==2.5.5','console_scripts','djcelerymon'
+__requires__ = 'django-celery==2.5.5'
import sys
from pkg_resources import load_entry_point
-sys.exit(
- load_entry_point('django-celery==2.1.4', 'console_scripts', 'djcelerymon')()
-)
+if __name__ == '__main__':
+ sys.exit(
+ load_entry_point('django-celery==2.5.5', 'console_scripts', 'djcelerymon')()
+ )
@@ -22,6 +22,7 @@
# Pull in the public items from the various sub-modules
#
from basic_message import *
+from channel import *
from connection import *
from exceptions import *
@@ -20,6 +20,12 @@
from serialization import AMQPWriter
+try:
+ bytes
+except NameError:
+ # Python 2.5 and lower
+ bytes = str
+
__all__ = [
'AbstractChannel',
]
@@ -58,7 +64,7 @@ def __exit__(self, type, value, traceback):
self.close()
- def _send_method(self, method_sig, args='', content=None):
+ def _send_method(self, method_sig, args=bytes(), content=None):
"""
Send a method for our channel.
@@ -88,6 +94,10 @@ def wait(self, allowed_methods=None):
method_sig, args, content = self.connection._wait_method(
self.channel_id, allowed_methods)
+ return self.dispatch_method(method_sig, args, content)
+
+
+ def dispatch_method(self, method_sig, args, content):
if content \
and self.auto_decode \
and hasattr(content, 'content_encoding'):
@@ -115,14 +115,8 @@ def __init__(self, body='', children=None, **properties):
application_headers={'foo': 7})
"""
- if isinstance(body, unicode):
- if properties.get('content_encoding', None) is None:
- properties['content_encoding'] = 'UTF-8'
- self.body = body.encode(properties['content_encoding'])
- else:
- self.body = body
-
super(Message, self).__init__(**properties)
+ self.body = body
def __eq__(self, other):
@@ -134,4 +128,5 @@ def __eq__(self, other):
which isn't compared.
"""
- return super(Message, self).__eq__(other) and (self.body == other.body)
+ return super(Message, self).__eq__(other) \
+ and hasattr(other, 'body') and (self.body == other.body)
@@ -61,8 +61,8 @@ def __init__(self, connection, channel_id=None, auto_decode=True):
whether the library should attempt to decode the body
of Messages to a Unicode string if there's a 'content_encoding'
property for the message. If there's no 'content_encoding'
- property, or the decode raises an Exception, the plain string
- is left as the message body.
+ property, or the decode raises an Exception, the message body
+ is left as plain bytes.
"""
if channel_id is None:
@@ -1104,6 +1104,93 @@ def _queue_bind_ok(self, args):
pass
+ def queue_unbind(self, queue, exchange, routing_key='',
+ nowait=False, arguments=None, ticket=None):
+ """
+ NOTE::::This is not part of AMQP 0-8, but RabbitMQ supports this as
+ an extension
+
+ unbind a queue from an exchange
+
+ This method unbinds a queue from an exchange.
+
+ RULE:
+
+ If a unbind fails, the server MUST raise a connection exception.
+
+ PARAMETERS:
+ queue: shortstr
+
+ Specifies the name of the queue to unbind.
+
+ RULE:
+
+ The client MUST either specify a queue name or have
+ previously declared a queue on the same channel
+
+ RULE:
+
+ The client MUST NOT attempt to unbind a queue that
+ does not exist.
+
+ exchange: shortstr
+
+ The name of the exchange to unbind from.
+
+ RULE:
+
+ The client MUST NOT attempt to unbind a queue from an
+ exchange that does not exist.
+
+ RULE:
+
+ The server MUST accept a blank exchange name to mean
+ the default exchange.
+
+ routing_key: shortstr
+
+ routing key of binding
+
+ Specifies the routing key of the binding to unbind.
+
+ arguments: table
+
+ arguments of binding
+
+ Specifies the arguments of the binding to unbind.
+
+ """
+ if arguments is None:
+ arguments = {}
+
+ args = AMQPWriter()
+ if ticket is not None:
+ args.write_short(ticket)
+ else:
+ args.write_short(self.default_ticket)
+ args.write_shortstr(queue)
+ args.write_shortstr(exchange)
+ args.write_shortstr(routing_key)
+ #args.write_bit(nowait)
+ args.write_table(arguments)
+ self._send_method((50, 50), args)
+
+ if not nowait:
+ return self.wait(allowed_methods=[
+ (50, 51), # Channel.queue_unbind_ok
+ ])
+
+
+ def _queue_unbind_ok(self, args):
+ """
+ confirm unbind successful
+
+ This method confirms that the unbind was successful.
+
+ """
+ pass
+
+
def queue_declare(self, queue='', passive=False, durable=False,
exclusive=False, auto_delete=True, nowait=False,
arguments=None, ticket=None):
@@ -2589,6 +2676,7 @@ def _tx_select_ok(self, args):
(50, 21): _queue_bind_ok,
(50, 31): _queue_purge_ok,
(50, 41): _queue_delete_ok,
+ (50, 51): _queue_unbind_ok,
(60, 11): _basic_qos_ok,
(60, 21): _basic_consume_ok,
(60, 31): _basic_cancel_ok,
@@ -2600,3 +2688,7 @@ def _tx_select_ok(self, args):
(90, 21): _tx_commit_ok,
(90, 31): _tx_rollback_ok,
}
+
+ _IMMEDIATE_METHODS = [
+ (60, 50), # basic_return
+ ]
@@ -36,7 +36,7 @@
#
LIBRARY_PROPERTIES = {
'library': 'Python amqplib',
- 'library_version': '0.6.1',
+ 'library_version': '1.0.2',
}
AMQP_LOGGER = logging.getLogger('amqplib')
@@ -84,6 +84,10 @@ def __init__(self,
If login_response is not specified, one is built up for you from
userid and password if they are present.
+ The 'ssl' parameter may be simply True/False, or for Python >= 2.6
+ a dictionary of options to pass to ssl.wrap_socket() such as
+ requiring certain certificates.
+
"""
if (login_response is None) \
and (userid is not None) \
@@ -204,6 +208,15 @@ def _wait_method(self, channel_id, allowed_methods):
return method_sig, args, content
#
+ # Certain methods like basic_return should be dispatched
+ # immediately rather than being queued, even if they're not
+ # one of the 'allowed_methods' we're looking for.
+ #
+ if (channel != 0) and (method_sig in Channel._IMMEDIATE_METHODS):
+ self.channels[channel].dispatch_method(method_sig, args, content)
+ continue
+
+ #
# Not the channel and/or method we were looking for. Queue
# this method for later
#
@@ -824,3 +837,6 @@ def _x_tune_ok(self, channel_max, frame_max, heartbeat):
(10, 60): _close,
(10, 61): _close_ok,
}
+
+
+ _IMMEDIATE_METHODS = []
@@ -22,6 +22,12 @@
from struct import pack, unpack
try:
+ bytes
+except NameError:
+ # Python 2.5 and lower
+ bytes = str
+
+try:
from collections import defaultdict
except:
class defaultdict(dict):
@@ -88,7 +94,7 @@ def add_payload(self, payload):
self.body_received += len(payload)
if self.body_received == self.body_size:
- self.msg.body = ''.join(self.body_parts)
+ self.msg.body = bytes().join(self.body_parts)
self.complete = True
@@ -230,15 +236,26 @@ def __init__(self, dest, frame_max):
def write_method(self, channel, method_sig, args, content=None):
payload = pack('>HH', method_sig[0], method_sig[1]) + args
+ if content:
+ # do this early, so we can raise an exception if there's a
+ # problem with the content properties, before sending the
+ # first frame
+ body = content.body
+ if isinstance(body, unicode):
+ coding = content.properties.get('content_encoding', None)
+ if coding is None:
+ coding = content.properties['content_encoding'] = 'UTF-8'
+
+ body = body.encode(coding)
+ properties = content._serialize_properties()
+
self.dest.write_frame(1, channel, payload)
if content:
- body = content.body
- payload = pack('>HHQ', method_sig[0], 0, len(body)) + \
- content._serialize_properties()
+ payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties
self.dest.write_frame(2, channel, payload)
- while body:
- payload, body = body[:self.frame_max - 8], body[self.frame_max -8:]
- self.dest.write_frame(3, channel, payload)
+ chunk_size = self.frame_max - 8
+ for i in xrange(0, len(body), chunk_size):
+ self.dest.write_frame(3, channel, body[i:i+chunk_size])
Oops, something went wrong.

0 comments on commit 22bfbe9

Please sign in to comment.