Skip to content

Commit

Permalink
Emulation backend implemented with In-memory backend as the first exa…
Browse files Browse the repository at this point in the history
…mple.
  • Loading branch information
Ask Solem committed Jul 21, 2010
1 parent 61237a1 commit 2ff8ffc
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 105 deletions.
1 change: 1 addition & 0 deletions kombu/backends/__init__.py
Expand Up @@ -8,6 +8,7 @@
"amqplib": "kombu.backends.pyamqplib.Backend",
"pika": "kombu.backends.pypika.AsyncoreBackend",
"syncpika": "kombu.backends.pypika.SyncBackend",
"memory": "kombu.backends.memory.MemoryBackend",
}

_backend_cache = {}
Expand Down
5 changes: 5 additions & 0 deletions kombu/backends/base.py
Expand Up @@ -17,13 +17,16 @@ class BaseMessage(object):

def __init__(self, channel, body=None, delivery_tag=None,
content_type=None, content_encoding=None, delivery_info={},
properties=None, headers=None,
**kwargs):
self.channel = channel
self.body = body
self.delivery_tag = delivery_tag
self.content_type = content_type
self.content_encoding = content_encoding
self.delivery_info = delivery_info
self.headers = headers
self.properties = properties
self._decoded_cache = None
self._state = "RECEIVED"

Expand Down Expand Up @@ -93,6 +96,8 @@ def acknowledged(self):
class BaseBackend(object):
"""Base class for backends."""
default_port = None
connection_errors = ()
channel_errors = ()

def __init__(self, connection, **kwargs):
self.connection = connection
Expand Down
239 changes: 137 additions & 102 deletions kombu/backends/emulation.py
@@ -1,7 +1,7 @@
from carrot.backends.base import BaseBackend, BaseMessage
from anyjson import deserialize
from kombu.backends.base import BaseBackend, BaseMessage
from anyjson import deserialize, serialize
from itertools import count
from django.utils.datastructures import SortedDict
from collections import OrderedDict
import sys
import time
import atexit
Expand Down Expand Up @@ -55,11 +55,13 @@ def __repr__(self):

class QualityOfService(object):

def __init__(self, resource, prefetch_count=None, interval=None):
def __init__(self, resource, prefetch_count=None, interval=None,
do_restore=True):
self.resource = resource
self.prefetch_count = prefetch_count
self.interval = interval
self._delivered = SortedDict()
self._delivered = OrderedDict()
self.do_restore = do_restore
self._restored_once = False
atexit.register(self.restore_unacked_once)

Expand All @@ -74,7 +76,7 @@ def ack(self, delivery_tag):

def restore_unacked(self):
for message, queue_name in self._delivered.items():
self.resource.put(queue_name, message)
self.resource._put(queue_name, message)
self._delivered = SortedDict()

def requeue(self, delivery_tag):
Expand All @@ -85,159 +87,192 @@ def requeue(self, delivery_tag):
self.resource.put(queue_name, message)

def restore_unacked_once(self):
if not self._restored_once:
if self._delivered:
sys.stderr.write(
"Restoring unacknowledged messages: %s\n" % (
if self.do_restore:
if not self._restored_once:
if self._delivered:
sys.stderr.write(
"Restoring unacknowledged messages: %s\n" % (
self._delivered))
self.restore_unacked()
if self._delivered:
sys.stderr.write("UNRESTORED MESSAGES: %s\n" % (
self._delivered))
self.restore_unacked()
if self._delivered:
sys.stderr.write("UNRESTORED MESSAGES: %s\n" % (
self._delivered))


class Message(BaseMessage):

def __init__(self, backend, payload, **kwargs):
self.backend = backend

payload = deserialize(payload)
def __init__(self, channel, payload, **kwargs):
properties = payload["properties"]
kwargs["body"] = payload.get("body").encode("utf-8")
kwargs["delivery_tag"] = payload.get("delivery_tag")
kwargs["delivery_tag"] = properties["delivery_tag"]
kwargs["content_type"] = payload.get("content-type")
kwargs["content_encoding"] = payload.get("content-encoding")
kwargs["priority"] = payload.get("priority")
kwargs["headers"] = payload.get("headers")
kwargs["properties"] = properties
kwargs["delivery_info"] = properties.get("delivery_info")
self.destination = payload.get("destination")

super(Message, self).__init__(backend, **kwargs)
super(Message, self).__init__(channel, **kwargs)

def reject(self):
raise NotImplementedError(
"This backend does not implement basic.reject")


class EmulationBase(BaseBackend):
_exchanges = {}
_queues = {}
_consumers = {}
_callbacks = {}


class Channel(object):
Message = Message
default_port = None

interval = 1
_prefetch_count = None
do_restore = True

QueueSet = QueueSet
_next_delivery_tag = count(1).next
_prefetch_count = None

def __init__(self, connection, **kwargs):
self.connection = connection
self._consumers = {}
self._callbacks = {}
self._consumers = set()
self._qos_manager = None

def establish_connection(self):
return self # for drain events

def close_connection(self, connection):
pass

def queue_exists(self, queue):
return True

def queue_purge(self, queue, **kwargs):
return self._purge(queue, **kwargs)

def _poll(self, resource):
while True:
if self.qos_manager.can_consume():
try:
return resource.get()
except QueueEmpty:
pass
time.sleep(self.interval)
def _get(self, queue):
raise NotImplementedError("Emulations must implement _get")

def declare_consumer(self, queue, no_ack, callback, consumer_tag,
**kwargs):
self._consumers[consumer_tag] = queue
self._callbacks[queue] = callback
def _put(self, queue, message):
raise NotImplementedError("Emulations must implement _put")

def drain_events(self, timeout=None):
queueset = self.QueueSet(self._consumers.values())
payload, queue = self._poll(queueset)
def _purge(self, queue):
raise NotImplementedError("Emulations must implement _purge")

if not queue or queue not in self._callbacks:
return
def _new_queue(self, queue):
raise NotImplementedError("Emulations must implement _new_queue")

self._callbacks[queue](payload)
def exchange_declare(self, exchange, type="direct", durable=False,
auto_delete=False, arguments=None):
if exchange not in _exchanges:
_exchanges[exchange] = {"type": type,
"durable": durable,
"auto_delete": auto_delete,
"arguments": arguments or {},
"table": {}}

def consume(self, limit=None):
for total_message_count in count():
if limit and total_message_count >= limit:
raise StopIteration
def queue_declare(self, queue, **kwargs):
if queue not in _queues:
_queues[queue] = self._new_queue(queue, **kwargs)

self.drain_events()
def queue_bind(self, queue, exchange, routing_key, arguments=None):
table = _exchanges[exchange].setdefault("table", {})
table[routing_key] = queue

yield True
def queue_purge(self, queue, **kwargs):
return self._purge(queue, **kwargs)

def queue_declare(self, queue, *args, **kwargs):
def flow(self, active=True):
pass

def _get_many(self, queues):
raise NotImplementedError("Emulations must implement _get_many")

def _get(self, queue):
raise NotImplementedError("Emulations must implement _get")

def _put(self, queue, message):
raise NotImplementedError("Emulations must implement _put")

def _purge(self, queue, message):
raise NotImplementedError("Emulations must implement _purge")
def basic_qos(self, prefetch_size, prefetch_count, apply_global=False):
self._prefetch_count = prefetch_count

def get(self, queue, **kwargs):
def basic_get(self, queue, **kwargs):
try:
payload = self._get(queue)
return self._get(queue)
except QueueEmpty:
return None
else:
return self.message_to_python(payload)
pass

def ack(self, delivery_tag):
def basic_ack(self, delivery_tag):
self.qos_manager.ack(delivery_tag)

def requeue(self, delivery_tag):
self.qos_manager.requeue(delivery_tag)
def basic_reject(self, delivery_tag, requeue=False):
if requeue:
self.qos_manager.requeue(delivery_tag)

def basic_consume(self, queue, no_ack, callback, consumer_tag,
**kwargs):
_consumers[consumer_tag] = queue
_callbacks[queue] = callback
self._consumers.add(consumer_tag)

def basic_publish(self, message, exchange, routing_key, **kwargs):
message["destination"] = exchange
message["properties"]["delivery_tag"] = self._next_delivery_tag()
table = _exchanges[exchange]["table"]
if routing_key in table:
self._put(table[routing_key], message)

def basic_cancel(self, consumer_tag):
queue = _consumers.pop(consumer_tag, None)
self._consumers.remove(consumer_tag)
_callbacks.pop(queue, None)

def message_to_python(self, raw_message):
message = self.Message(backend=self, payload=raw_message)
message = self.Message(self, payload=raw_message)
self.qos_manager.append(message, message.destination,
message.delivery_tag)
return message

def prepare_message(self, message_data, delivery_mode, priority=0,
content_type=None, content_encoding=None):
def prepare_message(self, message_data, priority=None,
content_type=None, content_encoding=None, headers=None,
properties=None):
return {"body": message_data,
"priority": priority or 0,
"content-encoding": content_encoding,
"content-type": content_type}

def publish(self, message, exchange, routing_key, **kwargs):
message["destination"] = exchange
self._put(exchange, message)

def cancel(self, consumer_tag):
queue = self._consumers.pop(consumer_tag, None)
self._callbacks.pop(queue, None)

def close(self):
for consumer_tag in self._consumers.keys():
self.cancel(consumer_tag)

def basic_qos(self, prefetch_size, prefetch_count, apply_global=False):
self._prefetch_count = prefetch_count
"content-type": content_type,
"headers": headers or {},
"properties": properties or {}}

@property
def qos_manager(self):
if self._qos_manager is None:
self._qos_manager = QualityOfService(self)
self._qos_manager = QualityOfService(self,
do_restore=self.do_restore)

# Update prefetch count / interval
self._qos_manager.prefetch_count = self._prefetch_count
self._qos_manager.interval = self.interval

return self._qos_manager

def close(self):
map(self.basic_cancel, self._consumers)


class EmulationBase(BaseBackend):
Channel = Channel
QueueSet = QueueSet

default_port = None

def __init__(self, connection, **kwargs):
self.connection = connection

def create_channel(self, connection):
return self.Channel(connection)

def establish_connection(self):
return self # for drain events

def close_connection(self, connection):
pass

def _poll(self, resource):
while True:
if self.qos_manager.can_consume():
try:
return resource.get()
except QueueEmpty:
pass
time.sleep(self.interval)

def drain_events(self, timeout=None):
queueset = self.QueueSet(self._consumers.values())
payload, queue = self._poll(queueset)

if not queue or queue not in _callbacks:
return

_callbacks[queue](payload)
26 changes: 26 additions & 0 deletions kombu/backends/memory.py
@@ -0,0 +1,26 @@
from Queue import Queue

from kombu.backends import emulation


class MemoryChannel(emulation.Channel):
queues = {}
do_restore = False

def _new_queue(self, queue, **kwargs):
self.queues[queue] = Queue()

def _get(self, queue):
return self.queues[queue].get(block=False)

def _put(self, queue, message):
self.queues[queue].put(message)

def _purge(self, queue):
size = self.queues[queue].qsize()
self.queues[queue].queue.clear()
return size


class MemoryBackend(emulation.EmulationBase):
Channel = MemoryChannel
2 changes: 1 addition & 1 deletion kombu/compat.py
Expand Up @@ -63,7 +63,7 @@ def __init__(self, connection, exchange=None, routing_key=None,
self.durable = durable

if not isinstance(self.exchange, entity.Exchange):
self.exchange = entity.Exchange(exchange=self.exchange,
self.exchange = entity.Exchange(name=self.exchange,
type=self.exchange_type,
routing_key=self.routing_key,
auto_delete=self.auto_delete,
Expand Down

0 comments on commit 2ff8ffc

Please sign in to comment.