Skip to content

Commit

Permalink
Merge pull request #8 from jcollado/blocks-and-flows
Browse files Browse the repository at this point in the history
Blocks and flows
  • Loading branch information
jcollado committed Jan 12, 2017
2 parents 1439186 + 85581df commit e26fd60
Show file tree
Hide file tree
Showing 11 changed files with 601 additions and 359 deletions.
22 changes: 11 additions & 11 deletions docs/rabbithole.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ rabbithole package
Submodules
----------

rabbithole.amqp module
----------------------

.. automodule:: rabbithole.amqp
:members:
:undoc-members:
:show-inheritance:

rabbithole.batcher module
-------------------------

Expand All @@ -20,18 +28,10 @@ rabbithole.cli module
:undoc-members:
:show-inheritance:

rabbithole.consumer module
--------------------------

.. automodule:: rabbithole.consumer
:members:
:undoc-members:
:show-inheritance:

rabbithole.db module
--------------------
rabbithole.sql module
---------------------

.. automodule:: rabbithole.db
.. automodule:: rabbithole.sql
:members:
:undoc-members:
:show-inheritance:
Expand Down
151 changes: 116 additions & 35 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,136 @@ where *config.yml* is a YAML configuration file. For example:
size_limit: 5
time_limit: 15
amqp: '172.20.0.2'
sql: 'postgres://postgres@172.20.0.3/database'
output:
logs:
INSERT INTO logs (message, message_vector)
VALUES (:message, to_tsvector('english', :message))
events:
INSERT INTO events (message, message_vector)
VALUES (:message, to_tsvector('english', :message))
blocks:
- name: input
type: amqp
kwargs:
server: '172.20.0.2'
- name: output
type: sql
kwargs:
url: 'postgres://postgres@172.20.0.3/database'
flows:
- - name: input
kwargs:
exchange_name: logs
- name: output
kwargs:
query:
INSERT INTO logs (message, message_vector)
VALUES (:message, to_tsvector('english', :message))
- - name: input
kwargs:
exchange_name: events
- name: output
kwargs:
query:
INSERT INTO events (message, message_vector)
VALUES (:message, to_tsvector('english', :message))
where:
- *size_limit*: batcher size limit
- *time_limit*: batcher size limit
- *amqp*: AMQP server address
- *sql*: Database connection URL
- *output*: Mapping from AMQP exchange names to SQL queries
- *blocks*: list of building blocks to use in the flows
- *flows*: list of blocks connected to transfer information
information


Building blocks
===============
Blocks
======

Input
-----
A block rabbithole is the name of the little piece that can be added to a flow
to receive/send messages as needed to build the desired flow of information.
There are currently three different kinds of blocks:

AMQP is assumed to be the messages input. What can be configured is the IP
address of the server and the exchanges for which messages should be delivered
specified as the keys of the *output* field.
input

Batchers
--------
an input block is a block that receives a messages from an external
source, such as an amqp server, and transfers them as they are received
to the next block in the flow.

Rabbit Hole uses the concept of batchers that is also used in logstash_. A
batcher is just an in-memory queue whose goal is to output data more
efficiently by writing multiple messages at once.
batchers

The batcher keeps messages in memory until its capacity has been filled up or
until a time limit is exceeded. Both parameters can be set in the configuration
file.
rabbithole uses the concept of batchers that is also used in
logstash_. A batcher is just an in-memory queue whose goal is to output
data more efficiently by writing multiple messages at once. It keeps
messages in memory until its capacity has been filled up or until a
time limit is exceeded. Both parameters can be set in the configuration
file.

Output
------
Batchers are automatically added between blocks in a flow, so there's
no need to include them explicitly in the configuration file.

A SQL database is assumed to be the messages output. What can be configured is
the `database URL`_ and the queries_ to execute for the messages received for
each exchange. Note that the underlying implementation uses sqlalchemy_, so
please refer to its documentation for more information about their format.
output

an output block is a block that receives messages from the previous
block and sends them to an external output such as a database.

Flow
====

A flow is a sequence of blocks that are connected to transfer information from
the initial input block to the final output one.

Available blocks
================

The following blocks are available in rabbithole.

amqp
----

ampq is an input flow that can receive data from amqp servers.

.. code-block:: yaml
blocks:
- name: input
type: amqp
kwargs:
server: '172.20.0.2'
flows:
- - name: input
kwargs:
exchange_name: logs
where:

*server*: is the IP address of the amq server to connect to
*exchange_name* is the name of the exchange for which messages will be
transferred in a given flow


sql
---

sql is an output flow that can write data to SQL databases.

.. code-block:: yaml
blocks:
- name: output
type: sql
kwargs:
url: 'postgres://postgres@172.20.0.3/database'
flows:
- - name: output
kwargs:
query:
INSERT INTO logs (message, message_vector)
VALUES (:message, to_tsvector('english', :message))
where:

*url* is the `connection string`_ to the database.
*query* is the `query`_ to execute when a message is received in a given
flow.

Note that the underlying implementation uses sqlalchemy_, so please refer to
its documentation for more information about their format.


.. _logstash: https://www.elastic.co/products/logstash
.. _database URL: http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls
.. _queries: http://docs.sqlalchemy.org/en/latest/core/sqlelement.html?highlight=text#sqlalchemy.sql.expression.text
.. _connection string: http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls
.. _query: http://docs.sqlalchemy.org/en/latest/core/sqlelement.html?highlight=text#sqlalchemy.sql.expression.text
.. _sqlalchemy: http://www.sqlalchemy.org/
55 changes: 33 additions & 22 deletions rabbithole/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ class Consumer(object):
:param server: AMQP server IP address
:type server: str
:param exchange_names: Exchange names to bind to
:type exchange_names: list(str)
"""

def __init__(self, server, exchange_names):
"""Configure exchanges and queue."""
def __init__(self, server):
"""Configure queue."""
LOGGER.info('Connecting to %r...', server)
parameters = pika.ConnectionParameters(server)
connection = pika.BlockingConnection(parameters)
Expand All @@ -43,22 +41,38 @@ def __init__(self, server, exchange_names):
queue_name = result.method.queue
LOGGER.debug('Declared queue %r', queue_name)

for exchange_name in exchange_names:
channel.exchange_declare(
exchange=exchange_name,
exchange_type='fanout',
)
channel.queue_bind(
exchange=exchange_name,
queue=queue_name,
)
LOGGER.debug(
'Queue %r bound to exchange %r', queue_name, exchange_name)

channel.basic_consume(self.message_received_cb, queue=queue_name)

self.channel = channel
self.message_received = blinker.Signal()
self.queue_name = queue_name
self.signals = {}

def __call__(self, exchange_name):
"""Create signal to send when a message from a exchange is received.
:param exchange_name: Exchange name to bind to the queue
:type exchange_name: str
:returns: The signal that will be send, so that it can be connected
:rtype: :class:`blinker.Signal`
"""
if exchange_name in self.signals:
return self.signals[exchange_name]

self.channel.exchange_declare(
exchange=exchange_name,
exchange_type='fanout',
)
self.channel.queue_bind(
exchange=exchange_name,
queue=self.queue_name,
)
LOGGER.debug(
'Queue %r bound to exchange %r', self.queue_name, exchange_name)

signal = blinker.Signal()
self.signals[exchange_name] = signal
return signal

def run(self):
"""Run ioloop and consume messages."""
Expand Down Expand Up @@ -94,8 +108,5 @@ def message_received_cb(self, channel, method_frame, header_frame, body):
except ValueError:
LOGGER.warning('Body decoding error: %r', body)
else:
self.message_received.send(
self,
exchange_name=exchange_name,
payload=payload,
)
signal = self.signals[exchange_name]
signal.send(self, payload=payload)

0 comments on commit e26fd60

Please sign in to comment.