Permalink
Browse files

Fixes x_expires handling #9

  • Loading branch information...
1 parent ab24794 commit ff7223d7def39e6e4bfb873e91bffd4916ba6e54 @mher committed May 15, 2013
Showing with 12 additions and 3 deletions.
  1. +12 −3 tcelery/producer.py
View
@@ -1,11 +1,13 @@
from __future__ import absolute_import
import pickle
+import timedelta
from functools import partial
from celery.app.amqp import TaskProducer
from celery.backends.amqp import AMQPBackend
+from celery.utils import timeutils
from .result import AsyncResult
@@ -52,16 +54,23 @@ def publish(self, body, routing_key=None, delivery_mode=None,
exchange=exchange, declare=declare)
if callback:
- x_expires = self.app.conf.CELERY_TASK_RESULT_EXPIRES
- x_expires = int(x_expires.total_seconds() * 1000)
conn.consume(task_id.replace('-', ''),
partial(self.on_result, callback),
- x_expires=x_expires)
+ x_expires=self.prepare_expires(type=int))
return result
def on_result(self, callback, method, channel, deliver, reply):
reply = pickle.loads(reply)
callback(AsyncResult(**reply))
+ def prepare_expires(self, value, type=None):
+ if value is None:
+ value = self.app.conf.CELERY_TASK_RESULT_EXPIRES
+ if isinstance(value, timedelta):
+ value = timeutils.timedelta_seconds(value)
+ if value is not None and type:
+ return type(value)
+ return value
+
def __repr__(self):
return '<NonBlockingTaskProducer: {0.channel}>'.format(self)

0 comments on commit ff7223d

Please sign in to comment.