Switch branches/tags
Find file
Fetching contributors…
Cannot retrieve contributors at this time
155 lines (122 sloc) 4.59 KB
# -*- coding: utf-8 -*-
"""HotQueue is a Python library that allows you to use Redis as a message queue
within your Python programs.
from functools import wraps
import cPickle as pickle
except ImportError:
import pickle
from redis import Redis
__all__ = ['HotQueue']
__version__ = '0.2.5'
def key_for_name(name):
"""Return the key name used to store the given queue name in Redis."""
return 'hotqueue:%s' % name
class HotQueue(object):
"""Simple FIFO message queue stored in a Redis list. Example:
>>> from hotqueue import HotQueue
>>> queue = HotQueue("myqueue", host="localhost", port=6379, db=0)
:param name: name of the queue
:param serializer: the class or module to serialize msgs with, must have
methods or functions named ``dumps`` and ``loads``,
`pickle <>`_ will be used
if ``None`` is given
:param kwargs: additional kwargs to pass to :class:`Redis`, most commonly
:attr:`host`, :attr:`port`, :attr:`db`
def __init__(self, name, serializer=None, **kwargs): = name
if serializer is not None:
self.serializer = serializer
self.serializer = pickle
self.__redis = Redis(**kwargs)
def __len__(self):
return self.__redis.llen(self.key)
def __repr__(self):
return ('<HotQueue: \'%s\', host=\'%s\', port=%d, db=%d>' %
(,, self.__redis.port, self.__redis.db))
def key(self):
"""Return the key name used to store this queue in Redis."""
return key_for_name(
def clear(self):
"""Clear the queue of all messages, deleting the Redis key."""
def consume(self, **kwargs):
"""Return a generator that yields whenever a message is waiting in the
queue. Will block otherwise. Example:
>>> for msg in queue.consume(timeout=1):
... print msg
my message
another message
:param kwargs: any arguments that :meth:`~hotqueue.HotQueue.get` can
accept (:attr:`block` will default to ``True`` if not given)
kwargs.setdefault('block', True)
while True:
msg = self.get(**kwargs)
if msg is None:
yield msg
except KeyboardInterrupt:
print; return
def get(self, block=False, timeout=None):
"""Return a message from the queue. Example:
>>> queue.get()
'my message'
>>> queue.get()
'another message'
:param block: whether or not to wait until a msg is available in
the queue before returning; ``False`` by default
:param timeout: when using :attr:`block`, if no msg is available
for :attr:`timeout` in seconds, give up and return ``None``
if block:
if timeout is None:
timeout = 0
msg = self.__redis.blpop(self.key, timeout=timeout)
if msg is not None:
msg = msg[1]
msg = self.__redis.lpop(self.key)
if msg is not None:
msg = self.serializer.loads(msg)
return msg
def put(self, *msgs):
"""Put one or more messages onto the queue. Example:
>>> queue.put("my message")
>>> queue.put("another message")
for msg in msgs:
msg = self.serializer.dumps(msg)
self.__redis.rpush(self.key, msg)
def worker(self, *args, **kwargs):
"""Decorator for using a function as a queue worker. Example:
>>> @queue.worker(timeout=1)
... def printer(msg):
... print msg
>>> printer()
my message
another message
You can also use it without passing any keyword arguments:
>>> @queue.worker
... def printer(msg):
... print msg
>>> printer()
my message
another message
:param kwargs: any arguments that :meth:`~hotqueue.HotQueue.get` can
accept (:attr:`block` will default to ``True`` if not given)
def decorator(worker):
def wrapper(*args):
for msg in self.consume(**kwargs):
worker(*args + (msg,))
return wrapper
if args:
return decorator(*args)
return decorator