Permalink
Browse files

Merge branch 'result_cls_option' of https://github.com/tonybarbieri/t…

…ornado-celery into tonybarbieri-result_cls_option
  • Loading branch information...
2 parents 1fda432 + f985979 commit ce323933e1782f53aab6e265fb2b38007885904d @mher committed May 21, 2013
Showing with 6 additions and 3 deletions.
  1. +4 −2 tcelery/__init__.py
  2. +2 −1 tcelery/producer.py
View
@@ -6,19 +6,21 @@
from .connection import ConnectionPool
from .producer import NonBlockingTaskProducer
-
+from .result import AsyncResult
VERSION = (0, 2, 1)
__version__ = '.'.join(map(str, VERSION))
def setup_nonblocking_producer(celery_app=None, io_loop=None,
- on_ready=None, limit=1):
+ on_ready=None, result_cls=AsyncResult,
+ limit=1):
celery_app = celery_app or celery.current_app
io_loop = io_loop or ioloop.IOLoop.instance()
NonBlockingTaskProducer.app = celery_app
NonBlockingTaskProducer.conn_pool = ConnectionPool(limit=limit)
+ NonBlockingTaskProducer.result_cls = result_cls
celery.app.amqp.AMQP.producer_cls = NonBlockingTaskProducer
def connect():
View
@@ -16,6 +16,7 @@ class NonBlockingTaskProducer(TaskProducer):
conn_pool = None
app = None
+ result_cls = AsyncResult
def __init__(self, channel=None, *args, **kwargs):
super(NonBlockingTaskProducer, self).__init__(
@@ -61,7 +62,7 @@ def publish(self, body, routing_key=None, delivery_mode=None,
def on_result(self, callback, method, channel, deliver, reply):
reply = pickle.loads(reply)
- callback(AsyncResult(**reply))
+ callback(self.result_cls(**reply))
def prepare_expires(self, value=None, type=None):
if value is None:

0 comments on commit ce32393

Please sign in to comment.