Permalink
Browse files

Merge pull request #12 from tonybarbieri/alternative_serializers

Allows the use of alternative backend serializers.
  • Loading branch information...
2 parents 1fda432 + 26b13c3 commit dd8bfbad65a52922e71f17b0353ec91b1badca98 @mher committed May 21, 2013
Showing with 16 additions and 3 deletions.
  1. +16 −3 tcelery/producer.py
View
@@ -1,16 +1,17 @@
from __future__ import absolute_import
-import pickle
-
from functools import partial
from datetime import timedelta
+from kombu import serialization
+
from celery.app.amqp import TaskProducer
from celery.backends.amqp import AMQPBackend
from celery.utils import timeutils
from .result import AsyncResult
+is_py3k = sys.version_info >= (3, 0)
class NonBlockingTaskProducer(TaskProducer):
@@ -45,6 +46,12 @@ def publish(self, body, routing_key=None, delivery_mode=None,
body, serializer, content_type, content_encoding,
compression, headers)
+ self.serializer = self.app.backend.serializer
+
+ (self.content_type,
+ self.content_encoding,
+ self.encoder) = serialization.registry._encoders[self.serializer]
+
conn = self.conn_pool.connection()
publish = conn.publish
result = publish(body, priority=priority, content_type=content_type,
@@ -59,8 +66,14 @@ def publish(self, body, routing_key=None, delivery_mode=None,
x_expires=self.prepare_expires(type=int))
return result
+ def decode(self, payload):
+ payload = is_py3k and payload or str(payload)
+ return serialization.decode(payload,
+ content_type=self.content_type,
+ content_encoding=self.content_encoding)
+
def on_result(self, callback, method, channel, deliver, reply):
- reply = pickle.loads(reply)
+ reply = self.decode(reply)
callback(AsyncResult(**reply))
def prepare_expires(self, value=None, type=None):

0 comments on commit dd8bfba

Please sign in to comment.