Skip to content

Commit

Permalink
Merge 393103e into 4fd5d31
Browse files Browse the repository at this point in the history
  • Loading branch information
lnielsen committed Feb 5, 2016
2 parents 4fd5d31 + 393103e commit 9d30cac
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 25 deletions.
21 changes: 16 additions & 5 deletions invenio_celery/ext.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -30,6 +30,7 @@

import pkg_resources
from flask_celeryext import FlaskCeleryExt
from celery.signals import import_modules


class InvenioCelery(object):
Expand All @@ -47,20 +48,22 @@ def init_app(self, app, assets=None,
"""Initialize application object."""
self.init_config(app.config)
self.celery = FlaskCeleryExt(app).celery
self.entry_point_group = entry_point_group
app.extensions['invenio-celery'] = self

if entry_point_group:
def load_entry_points(self):
"""Load tasks from entry points."""
if self.entry_point_group:
task_packages = []
for item in pkg_resources.iter_entry_points(
group=entry_point_group):
group=self.entry_point_group):
task_packages.append(item.module_name)

if task_packages:
self.celery.autodiscover_tasks(
task_packages, related_name='', force=True
)

app.extensions['invenio-celery'] = self

def init_config(self, config):
"""Initialize configuration."""
config.setdefault('BROKER_URL', 'redis://localhost:6379/0')
Expand Down Expand Up @@ -94,3 +97,11 @@ def suspend_queues(self, active_queues, sleep_time=10.0):
self.disable_queue(queue)
while self.get_active_tasks():
time.sleep(sleep_time)


@import_modules.connect()
def celery_module_imports(sender, signal=None, **kwargs):
"""Load shared celery tasks."""
app = getattr(sender, 'flask_app', None)
if app:
app.extensions['invenio-celery'].load_entry_points()
4 changes: 2 additions & 2 deletions setup.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -59,7 +59,7 @@
setup_requires = []

install_requires = [
'Flask-CeleryExt>=0.1.0',
'Flask-CeleryExt>=0.2.0',
'redis>=2.10.0',
'msgpack-python>=0.4.6',
]
Expand Down
32 changes: 15 additions & 17 deletions tests/conftest.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -29,12 +29,12 @@

import sys

from celery import shared_task
from celery import shared_task, Celery
from flask import Flask
import pytest


@pytest.fixture()
@pytest.yield_fixture()
def app(request):
"""Flask app fixture."""
app = Flask("testapp")
Expand All @@ -50,19 +50,17 @@ def shared_compute():
"""Dummy function."""
pass

def teardown():
"""Clear global celery state."""
import celery._state
celery._state._apps.discard(
app.extensions['invenio-celery'].celery
)
celery._state._on_app_finalizers = set()
yield app

# Clear our modules to get them re-imported by Celery.
if 'first_tasks' in sys.modules:
del sys.modules['first_tasks']
if 'second_tasks' in sys.modules:
del sys.modules['second_tasks']
import celery._state
celery._state._apps.discard(
app.extensions['invenio-celery'].celery._get_current_object()
)
celery._state._on_app_finalizers = set()
celery._state.set_default_app(Celery())

request.addfinalizer(teardown)
return app
# Clear our modules to get them re-imported by Celery.
if 'first_tasks' in sys.modules:
del sys.modules['first_tasks']
if 'second_tasks' in sys.modules:
del sys.modules['second_tasks']
14 changes: 13 additions & 1 deletion tests/test_invenio_celery.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -72,6 +72,7 @@ def test1():
def test_enabled_autodiscovery(app):
"""Test shared task detection."""
ext = InvenioCelery(app)
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' in ext.celery.tasks.keys()
Expand All @@ -82,6 +83,7 @@ def test_enabled_autodiscovery(app):
def test_only_first_tasks(app):
"""Test loading different entrypoint group."""
ext = InvenioCelery(app, entry_point_group='only_first_tasks')
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
Expand All @@ -91,12 +93,22 @@ def test_only_first_tasks(app):
def test_disabled_autodiscovery(app):
"""Test disabled discovery."""
ext = InvenioCelery(app, entry_point_group=None)
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_b' not in ext.celery.tasks.keys()


@patch("pkg_resources.iter_entry_points", _mock_entry_points)
def test_worker_loading(app):
"""Test that tasks are only loaded on the worker."""
ext = InvenioCelery(app)
assert 'first_tasks.first_task' not in ext.celery.tasks.keys()
ext.celery.loader.import_default_modules()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()


def test_get_queues(app):
"""Test get queues."""
ext = InvenioCelery(app)
Expand Down

0 comments on commit 9d30cac

Please sign in to comment.