Skip to content

Commit

Permalink
ext: task pre-loading fix
Browse files Browse the repository at this point in the history
* Fixes task pre-loading to only happen in workers.

Signed-off-by: Lars Holm Nielsen <lars.holm.nielsen@cern.ch>
  • Loading branch information
lnielsen committed Feb 3, 2016
1 parent 4fd5d31 commit ecbbf1f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 20 deletions.
19 changes: 15 additions & 4 deletions invenio_celery/ext.py
Expand Up @@ -30,6 +30,7 @@

import pkg_resources
from flask_celeryext import FlaskCeleryExt
from celery.signals import import_modules


class InvenioCelery(object):
Expand All @@ -47,20 +48,22 @@ def init_app(self, app, assets=None,
"""Initialize application object."""
self.init_config(app.config)
self.celery = FlaskCeleryExt(app).celery
self.entry_point_group = entry_point_group
app.extensions['invenio-celery'] = self

if entry_point_group:
def load_entry_points(self):
"""Load tasks from entry points."""
if self.entry_point_group:
task_packages = []
for item in pkg_resources.iter_entry_points(
group=entry_point_group):
group=self.entry_point_group):
task_packages.append(item.module_name)

if task_packages:
self.celery.autodiscover_tasks(
task_packages, related_name='', force=True
)

app.extensions['invenio-celery'] = self

def init_config(self, config):
"""Initialize configuration."""
config.setdefault('BROKER_URL', 'redis://localhost:6379/0')
Expand Down Expand Up @@ -94,3 +97,11 @@ def suspend_queues(self, active_queues, sleep_time=10.0):
self.disable_queue(queue)
while self.get_active_tasks():
time.sleep(sleep_time)


@import_modules.connect()
def celery_module_imports(sender, signal=None, **kwargs):
"""Load shared celery tasks."""
app = getattr(sender, 'flask_app', None)
if app:
app.extensions['invenio-celery'].load_entry_points()
30 changes: 14 additions & 16 deletions tests/conftest.py
Expand Up @@ -29,12 +29,12 @@

import sys

from celery import shared_task
from celery import shared_task, Celery
from flask import Flask
import pytest


@pytest.fixture()
@pytest.yield_fixture()
def app(request):
"""Flask app fixture."""
app = Flask("testapp")
Expand All @@ -50,19 +50,17 @@ def shared_compute():
"""Dummy function."""
pass

def teardown():
"""Clear global celery state."""
import celery._state
celery._state._apps.discard(
app.extensions['invenio-celery'].celery
)
celery._state._on_app_finalizers = set()
yield app

# Clear our modules to get them re-imported by Celery.
if 'first_tasks' in sys.modules:
del sys.modules['first_tasks']
if 'second_tasks' in sys.modules:
del sys.modules['second_tasks']
import celery._state
celery._state._apps.discard(
app.extensions['invenio-celery'].celery._get_current_object()
)
celery._state._on_app_finalizers = set()
celery._state.set_default_app(Celery())

request.addfinalizer(teardown)
return app
# Clear our modules to get them re-imported by Celery.
if 'first_tasks' in sys.modules:
del sys.modules['first_tasks']
if 'second_tasks' in sys.modules:
del sys.modules['second_tasks']
12 changes: 12 additions & 0 deletions tests/test_invenio_celery.py
Expand Up @@ -72,6 +72,7 @@ def test1():
def test_enabled_autodiscovery(app):
"""Test shared task detection."""
ext = InvenioCelery(app)
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' in ext.celery.tasks.keys()
Expand All @@ -82,6 +83,7 @@ def test_enabled_autodiscovery(app):
def test_only_first_tasks(app):
"""Test loading different entrypoint group."""
ext = InvenioCelery(app, entry_point_group='only_first_tasks')
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
Expand All @@ -91,12 +93,22 @@ def test_only_first_tasks(app):
def test_disabled_autodiscovery(app):
"""Test disabled discovery."""
ext = InvenioCelery(app, entry_point_group=None)
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_b' not in ext.celery.tasks.keys()


@patch("pkg_resources.iter_entry_points", _mock_entry_points)
def test_worker_loading(app):
"""Test that tasks are only loaded on the worker."""
ext = InvenioCelery(app)
assert 'first_tasks.first_task' not in ext.celery.tasks.keys()
ext.celery.loader.import_default_modules()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()


def test_get_queues(app):
"""Test get queues."""
ext = InvenioCelery(app)
Expand Down

0 comments on commit ecbbf1f

Please sign in to comment.