Skip to content

Commit

Permalink
Implemented RabbitMQSubscriber.
Browse files Browse the repository at this point in the history
Also simplified RabbitMQHandler by using SimpleQueues from kombu.
  • Loading branch information
vals committed Oct 5, 2012
1 parent a4678a3 commit 2b765e8
Showing 1 changed file with 74 additions and 15 deletions.
89 changes: 74 additions & 15 deletions logbook/queues.py
Expand Up @@ -25,38 +25,31 @@ class RabbitMQHandler(Handler):
Example setup::
handler = RabbitMQHandler('amqp://127.0.0.1', exchange='logging',
queue='my_application')
handler = RabbitMQHandler('amqp://guest:guest@localhost//', queue='my_log')
"""
def __init__(self, uri=None, exchange='logging', queue='log', level=NOTSET,
def __init__(self, uri=None, queue='logging', level=NOTSET,
filter=None, bubble=False, context=None):
Handler.__init__(self, level, filter, bubble)
try:
import kombu
except ImportError:
raise RuntimeError('The kombu library is required for '
'the RabbitMQHandler.')

self.exchange = kombu.Exchange(exchange, 'direct', durable=True)
self.queue = kombu.Queue(queue, exchange=self.exchange, routing_key=queue)

'the RabbitMQSubscriber.')
if uri:
self.connection = kombu.Connection(uri)
connection = kombu.Connection(uri)

self.queue = connection.SimpleQueue(queue)

def export_record(self, record):
"""Exports the record into a dictionary ready for JSON dumping.
"""
return record.to_dict(json_safe=True)

def emit(self, record):
with self.connection.Producer(serializer='json') as producer:
producer.publish(self.export_record(record), \
exchange=self.exchange, \
routing_key=self.queue.routing_key, \
declare=[self.queue])
self.queue.put(self.export_record(record))

def close(self):
self.connection.close()
self.queue.close()


class ZeroMQHandler(Handler):
Expand Down Expand Up @@ -177,6 +170,72 @@ def dispatch_in_background(self, setup=None):
return controller


class RabbitMQSubscriber(SubscriberBase):
"""A helper that acts as RabbitMQ subscriber and will dispatch received
log records to the active handler setup. There are multiple ways to
use this class.
It can be used to receive log records from a queue::
subscriber = RabbitMQSubscriber('amqp://guest:guest@localhost//')
record = subscriber.recv()
But it can also be used to receive and dispatch these in one go::
with target_handler:
subscriber = RabbitMQSubscriber('amqp://guest:guest@localhost//')
subscriber.dispatch_forever()
This will take all the log records from that queue and dispatch them
over to `target_handler`. If you want you can also do that in the
background::
subscriber = RabbitMQSubscriber('amqp://guest:guest@localhost//')
controller = subscriber.dispatch_in_background(target_handler)
The controller returned can be used to shut down the background
thread::
controller.stop()
"""

def __init__(self, uri=None, queue='logging'):
try:
import kombu
except ImportError:
raise RuntimeError('The kombu library is required for '
'the RabbitMQSubscriber.')
if uri:
connection = kombu.Connection(uri)

self.queue = connection.SimpleQueue(queue)

def __del__(self):
try:
self.close()
except AttributeError:
# subscriber partially created
pass

def close(self):
self.queue.close()

def recv(self, timeout=None):
"""Receives a single record from the socket. Timeout of 0 means nonblocking,
`None` means blocking and otherwise it's a timeout in seconds after which
the function just returns with `None`.
"""
if timeout == 0:
try:
rv = self.queue.get(block=False)
except Exception:
return
else:
rv = self.queue.get(timeout=timeout)

return LogRecord.from_dict(rv.payload)


class ZeroMQSubscriber(SubscriberBase):
"""A helper that acts as ZeroMQ subscriber and will dispatch received
log records to the active handler setup. There are multiple ways to
Expand Down

0 comments on commit 2b765e8

Please sign in to comment.