Skip to content

Commit

Permalink
modifications: to be squashed once reviewed
Browse files Browse the repository at this point in the history
  • Loading branch information
frankois authored and slint committed Mar 25, 2020
1 parent 3cb8845 commit 9ea792a
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 83 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Expand Up @@ -22,6 +22,9 @@ addons:
packages:
- rabbitmq-server

service:
- redis-server

env:
- REQUIREMENTS=lowest
- REQUIREMENTS=release DEPLOY=true
Expand Down
22 changes: 3 additions & 19 deletions docs/usage.rst
@@ -1,25 +1,9 @@
..
This file is part of Invenio.
Copyright (C) 2017 CERN.
Copyright (C) 2017-2020 CERN.
Invenio is free software; you can redistribute it
and/or modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of the
License, or (at your option) any later version.

Invenio is distributed in the hope that it will be
useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.

You should have received a copy of the GNU General Public License
along with Invenio; if not, write to the
Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
MA 02111-1307, USA.

In applying this license, CERN does not
waive the privileges and immunities granted to it by virtue of its status
as an Intergovernmental Organization or submit itself to any jurisdiction.
Invenio is free software; you can redistribute it and/or modify it
under the terms of the MIT License; see LICENSE file for more details.


Usage
Expand Down
112 changes: 59 additions & 53 deletions invenio_queues/__init__.py
Expand Up @@ -18,100 +18,95 @@
---------------
You will learn how to register queues and to interact with it. To begin with,
you need to setup your virtual environment and install this module.
we assume that you already have setup your virtual environment and install
the module:
You need an application to work with, that can be created with the following
commands in a Python shell:
You now need an application to work with, that can be created with the
following commands in a Python shell:
>>> from flask import current_app
.. code-block:: python
from flask import Flask
app = Flask('myapp')
You can then initialize the module:
>>> from invenio_queues.ext import InvenioQueues
>>> ext_queues = InvenioQueues(app)
.. code-block:: python
from invenio_queues.ext import InvenioQueues
ext_queues = InvenioQueues(app)
app.app_context().push()
In our example, we are using RabbitMQ as a broker, which can be configured
as follow:
as follow::
>>> current_app.config['QUEUES_BROKER_URL'] = 'amqp://localhost:5672'
app.config['QUEUES_BROKER_URL'] = 'amqp://localhost:5672'
Register queues
^^^^^^^^^^^^^^^
To register queues, you need to start by creating an exchange for the queues:
To register queues, need to add it to the configuration.
.. code-block:: python
from kombu import Exchange
default_exchange = Exchange(
'example',
type='direct',
delivery_mode='transient', # in-memory queue
)
You can now configure the queues as followed:
.. code-block:: python
from invenio_queues.proxies import current_queues
from invenio_queues.queue import Queue
app.config['QUEUES_DEFINITIONS'] = [[
{
'name': 'notifications',
'exchange': Exchange(
'example',
type='direct',
delivery_mode='transient', # in-memory queue
),
},
]]
current_queues.queues = dict()
connection_pool = current_app.config.get('QUEUES_CONNECTION_POOL')
current_queues.queues['notifications'] = Queue(
default_exchange,
'notifications',
connection_pool
)
current_queues.queues['jobs'] = Queue(
default_exchange,
'jobs',
connection_pool
)
For more information about how to set an Exchange:
(https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.html#exchange)
Create queues
^^^^^^^^^^^^^
Now that the queues are configured, you can create them:
Now that the queues are configured, you can create them::
>>> current_queues.declare()
ext_queues.declare()
If you want to delete them, this can be done in the same way:
If you want to delete them, this can be done in the same way::
>>> current_queues.delete()
ext_queues.delete()
Access queues
^^^^^^^^^^^^^
You can list the available queues by using the command line interface
>>> invenio queues list
or programmatically
You can list the available queues by using::
>>> from invenio_queues.proxies import current_queues
>>> current_queues.queues.key()
ext_queues.queues
Suppose you have a queue with name "my_queue" you can directly access it by name
Suppose you have a queue with name "notifications" you can directly access it
by name::
>>> my_queue = current_queues.queues["my_queue"]
notifications_queue = ext_queues.queues["notifications"]
Publish events
^^^^^^^^^^^^^^
After we have defined and instantiated (declare) our Queue we can start using it.
After we have defined and instantiated (declare) our Queue we can start using
it.
This operation pushes an event or events to the queue:
.. code-block:: python
# NOTE: publish expects and array of events
events = [
{
'user_id': 123,
'type': 'record-published',
'record_id': '1234-5678'
}
]
events = [1, 2, 3]
current_queues.queues["my_queue"].publish(events)
notifications_queue.publish(events)
Comsume events
Expand All @@ -122,9 +117,20 @@
.. code-block:: python
queue_gen = current_queues.queues["my_queue"].consume()
queue_gen = notifications_queue.consume()
list(queue_gen)
You can as well add this in a task like:
.. code-block:: python
for msg in notifications_queue.consume():
if msg['type'] == 'record-published':
user = fetch_user(msg['user_id'])
send_email(
user.email, "New record {record_id} was published".format(**msg)
)
"""

from __future__ import absolute_import, print_function
Expand Down
10 changes: 7 additions & 3 deletions invenio_queues/config.py
Expand Up @@ -11,11 +11,15 @@
from .utils import get_connection_pool

QUEUES_BROKER_URL = None
"""Provide a specific broker_url for queues.
"""Broker URL for queues.
If the variable is not configured it falls back to the default BROKER_URL of
our application.
If the variable is not configured it falls back to the default ``BROKER_URL``
of our application. For more information about how to define your broker here:
https://kombu.readthedocs.io/en/latest/reference/kombu.connection.html#connection
"""

QUEUES_CONNECTION_POOL = get_connection_pool
"""Default queues connection pool."""

QUEUES_DEFINITIONS = []
"""Static queue definitions."""
21 changes: 15 additions & 6 deletions invenio_queues/ext.py
Expand Up @@ -10,6 +10,8 @@

from __future__ import absolute_import, print_function

from itertools import chain

from werkzeug.utils import cached_property

from . import config
Expand All @@ -32,12 +34,17 @@ def queues(self):
from pkg_resources import iter_entry_points
if self._queues is None:
self._queues = dict()
for ep in iter_entry_points(group=self.entry_point_group):
for cfg in ep.load()():
from_entry_point = [
ep.load()()
for ep in iter_entry_points(group=self.entry_point_group)
]

for queue in chain(
from_entry_point, [self.app.config['QUEUES_DEFINITIONS']]):
for cfg in queue:
if cfg['name'] in self._queues:
raise DuplicateQueueError(
'Duplicate queue {0} in entry point '
'{1}'.format(cfg['name'], ep.name))
'Duplicate queue {0} found'.format(cfg['name']))

self._queues[cfg['name']] = Queue(
cfg['exchange'], cfg['name'], self.connection_pool
Expand Down Expand Up @@ -76,16 +83,18 @@ def __init__(self, app=None, **kwargs):

def __getattr__(self, name):
"""Proxy to state object."""
return getattr(self._state, name, None)
return getattr(object.__getattribute__(self, "_state"), name)

def init_app(self, app, entry_point_group='invenio_queues.queues'):
"""Initialize application."""
self.init_config(app)
app.extensions['invenio-queues'] = _InvenioQueuesState(
state = _InvenioQueuesState(
app,
app.config['QUEUES_CONNECTION_POOL'],
entry_point_group=entry_point_group
)
self._state = state
app.extensions['invenio-queues'] = state
return app

def init_config(self, app):
Expand Down
1 change: 0 additions & 1 deletion invenio_queues/utils.py
Expand Up @@ -15,7 +15,6 @@

def get_connection_pool():
"""Retrieve the broker connection pool."""
# NOTE: Allow invenio-queues to have a different broker than default.
broker_url = current_app.config.get('QUEUES_BROKER_URL') or \
current_app.config.get('BROKER_URL', 'amqp://')
return connections[Connection(broker_url)]
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -43,7 +43,7 @@
install_requires = [
'invenio-base>=1.2.2',
'invenio-celery>=1.2.0',
'redis>=3.2.0',
'redis>=3.2.1',
]

packages = find_packages()
Expand Down

0 comments on commit 9ea792a

Please sign in to comment.