Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix a major reference cycle memory leak and make caching and connecti…

…on more configurable
  • Loading branch information...
commit a2ddd0c461a2dd4fc4fc9a6933522aa1f1373960 1 parent bb91ab6
Sean Talts authored
Showing with 31 additions and 32 deletions.
  1. +31 −32 sqskombu/transport.py
View
63 sqskombu/transport.py
@@ -1,45 +1,40 @@
from Queue import Empty
from kombu.transport import virtual
-from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message
+from boto.exception import SQSError
from anyjson import serialize, deserialize
import socket
import time
-try:
- import pylibmc
-except ImportError:
- pylibmc = None
-
-#TODO how to make this configurable?
-THROTTLE = 30 #only poll every 30 seconds
-
class Channel(virtual.Channel):
+ _instance = None
+
def __init__(self, *args, **kwargs):
- virtual.Channel.__init__(self, *args, **kwargs)
+ super(Channel, self).__init__(*args, **kwargs)
self.conninfo = self.connection.client
- if pylibmc:
- memcached_servers = self.conninfo.transport_options["memcached_servers"]
- self.cache = pylibmc.Client(memcached_servers)
- self.cache_set = lambda x: self.cache.set("celery:"+x, 1, time=3600) # 1hour
- print "self.cache:", self.cache
- else:
- print "Warning: pylibmc not found! Memcache will not be used for SQS deduplication."
- self.cache = {}
- self.cache_set = lambda x: self.cache.set(x, 1)
-
- def seen(self, message):
- """warning: deletes seen messages"""
- str_id = str(message.id) #message.id is not the same as the celery id
- if self.cache.get(str_id):
+ cache_class = self.conninfo.transport_options["cache_class"]
+ self.cache = cache_class and cache_class()
+ if not self.cache:
+ self.dedupe = lambda x, y: False
+ self.connclass = self.conninfo.transport_options["connection_class"]
+ self.throttle = self.conninfo.transport_options["throttle"]
+
+ def dedupe(self, message):
+ """woopdedupe
+
+ warning: deletes seen messages"""
+ #message.id is not the same as the celery id
+ str_id = "sqsdedupe:"+str(message.id)
+ if str_id in self.cache:
message.delete()
return True
- self.cache_set(str_id)
+ self.cache[str_id] = 1
return False
- def normalize_queue_name(self, queue):
+ @staticmethod
+ def normalize_queue_name(queue):
"""
A queue name must conform to the following::
@@ -68,14 +63,14 @@ def _put(self, queue, message, **kwargs):
def _get(self, queue):
q = self.get_or_create_queue(queue)
m = q.read()
- if m and not self.seen(m):
+ if m and not self.dedupe(m):
msg = deserialize(m.get_body())
q.delete_message(m)
return msg
else:
if getattr(self, '_last_get', None):
time_passed = time.time() - self._last_get
- time_to_sleep = THROTTLE - time_passed
+ time_to_sleep = self.throttle - time_passed
if time_to_sleep > 0:
time.sleep(time_to_sleep)
self._last_get = time.time()
@@ -92,7 +87,7 @@ def _purge(self, queue):
return count #CONSIDER this number may not be accurate
def _open(self):
- return SQSConnection(self.conninfo.userid, self.conninfo.password)
+ return self.connclass(self.conninfo.userid, self.conninfo.password)
@property
def client(self):
@@ -101,9 +96,13 @@ def client(self):
self._queues = dict()
return self._client
+ def close(self):
+ self.exchange_types = {} #remove circular references
+ super(Channel, self).close()
+
class SQSTransport(virtual.Transport):
- Channel = Channel
- connection_errors = (socket.error)
- channel_errors = ()
+ Channel = Channel
+ connection_errors = (socket.error,)
+ channel_errors = (SQSError,)
Please sign in to comment.
Something went wrong with that request. Please try again.