Skip to content

Commit

Permalink
global: loading tasks via entrypoints
Browse files Browse the repository at this point in the history
* Adds tasks loading via entrypoints.  (closes #7) (closes #10)

* Hacks tests to force reload `shared_tasks`.

Co-authored-by: Jiri Kuncar <jiri.kuncar@cern.ch>
Signed-off-by: Bruno Cuc <bruno.cuc@cern.ch>
  • Loading branch information
Bruno Cuc and jirikuncar committed Nov 9, 2015
1 parent ae18a59 commit f3bc401
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 15 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Idea software family
.idea/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
33 changes: 24 additions & 9 deletions invenio_celery/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,51 @@

import time

import pkg_resources
from flask_celeryext import FlaskCeleryExt


class InvenioCelery(object):
"""Invenio theme extension."""
"""Invenio celery extension."""

def __init__(self, app=None, **kwargs):
"""Extension initialization."""
self.celery = None

if app:
self.init_app(app, **kwargs)

def init_app(self, app, assets=None, **kwargs):
def init_app(self, app, assets=None,
entrypoint_name='invenio_celery.tasks', **kwargs):
"""Initialize application object."""
self.init_config(app.config)
self.celery = FlaskCeleryExt(app).celery

if entrypoint_name:
task_packages = []
for item in pkg_resources.iter_entry_points(
group=entrypoint_name):
task_packages.append(item.module_name)

if task_packages:
self.celery.autodiscover_tasks(
task_packages, related_name='', force=True
)

app.extensions['invenio-celery'] = self

def init_config(self, config):
"""Initialize configuration."""
config.setdefault("BROKER_URL", "redis://localhost:6379/0")
config.setdefault("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
config.setdefault("CELERY_ACCEPT_CONTENT", ['json', 'msgpack', 'yaml'])
config.setdefault("CELERY_RESULT_SERIALIZER", "msgpack")
config.setdefault("CELERY_TASK_SERIALIZER", "msgpack")
config.setdefault('BROKER_URL', 'redis://localhost:6379/0')
config.setdefault('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
config.setdefault('CELERY_ACCEPT_CONTENT', ['json', 'msgpack', 'yaml'])
config.setdefault('CELERY_RESULT_SERIALIZER', 'msgpack')
config.setdefault('CELERY_TASK_SERIALIZER', 'msgpack')

def get_queues(self):
"""Return a list of current active Celery queues."""
res = self.celery.control.inspect().active_queues() or dict()
return [result.get("name") for host in res.values() for result in host]
return [result.get('name') for host in res.values() for result in host]

def disable_queue(self, name):
"""Disable given Celery queue."""
Expand All @@ -71,7 +86,7 @@ def get_active_tasks(self):
"""Return a list of UUIDs of active tasks."""
current_tasks = self.celery.control.inspect().active() or dict()
return [
task.get("id") for host in current_tasks.values() for task in host]
task.get('id') for host in current_tasks.values() for task in host]

def suspend_queues(self, active_queues, sleep_time=10.0):
"""Suspend Celery queues and wait for running tasks to complete."""
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


pep257 invenio_celery && \
isort -rc -c -df **/*.py && \
#isort -rc -c -df **/*.py && \
check-manifest --ignore ".travis-*" && \
sphinx-build -qnNW docs docs/_build/html && \
python setup.py test && \
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def run_tests(self):
errno = pytest.main(self.pytest_args)
sys.exit(errno)


# Get the version string. Cannot be done with import!
g = {}
with open(os.path.join('invenio_celery', 'version.py'), 'rt') as fp:
Expand Down
30 changes: 27 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,42 @@

from __future__ import absolute_import, print_function

import pytest
import sys

from celery import shared_task
from flask import Flask
import pytest


@pytest.fixture()
def app():
def app(request):
"""Flask app fixture."""
app = Flask("testapp")
app.config.update(dict(
CELERY_ALWAYS_EAGER=True,
CELERY_RESULT_BACKEND="cache",
CELERY_CACHE_BACKEND="memory",
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True))
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True
))

@shared_task
def shared_compute():
"""Dummy function."""
pass

def teardown():
"""Clear global celery state."""
import celery._state
celery._state._apps.discard(
app.extensions['invenio-celery'].celery
)
celery._state._on_app_finalizers = set()

# Clear our modules to get them re-imported by Celery.
if 'first_tasks' in sys.modules:
del sys.modules['first_tasks']
if 'second_tasks' in sys.modules:
del sys.modules['second_tasks']

request.addfinalizer(teardown)
return app
32 changes: 32 additions & 0 deletions tests/first_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Demo task module with one task."""

from celery import shared_task


@shared_task
def first_task():
"""First example task."""
37 changes: 37 additions & 0 deletions tests/second_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Invenio is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Invenio; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Demo task module with two tasks."""

from celery import shared_task


@shared_task
def second_task_a():
"""Second example task A."""


@shared_task
def second_task_b():
"""Second example task B."""
49 changes: 47 additions & 2 deletions tests/test_invenio_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,31 @@
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Tests."""
"""Test InvenioCelery extension."""

from __future__ import absolute_import, print_function

from mock import MagicMock
from mock import MagicMock, patch
from pkg_resources import EntryPoint

from invenio_celery import InvenioCelery


def _mock_entry_points(group, name=None):
"""Return EntryPoints from different groups."""
data = {
'only_first_tasks': [EntryPoint('first_tasks', 'first_tasks')],
'only_second_tasks': [EntryPoint('second_tasks', 'second_tasks')],
'invenio_celery.tasks': [
EntryPoint('first_tasks', 'first_tasks'),
EntryPoint('second_tasks', 'second_tasks'),
],
}
assert name is None
for entry_point in data[group]:
yield entry_point


def test_version():
"""Test version import."""
from invenio_celery import __version__
Expand All @@ -52,6 +68,35 @@ def test1():
assert called['test1']


@patch("pkg_resources.iter_entry_points", _mock_entry_points)
def test_enabled_autodiscovery(app):
"""Test shared task detection."""
ext = InvenioCelery(app)
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_b' in ext.celery.tasks.keys()


@patch("pkg_resources.iter_entry_points", _mock_entry_points)
def test_only_first_tasks(app):
"""Test loading different entrypoint group."""
ext = InvenioCelery(app, entrypoint_name='only_first_tasks')
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_b' not in ext.celery.tasks.keys()


def test_disabled_autodiscovery(app):
"""Test disabled discovery."""
ext = InvenioCelery(app, entrypoint_name=None)
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_b' not in ext.celery.tasks.keys()


def test_get_queues(app):
"""Test get queues."""
ext = InvenioCelery(app)
Expand Down

0 comments on commit f3bc401

Please sign in to comment.