Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext: task pre-loading fix #21

Merged
merged 1 commit into from Feb 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 16 additions & 5 deletions invenio_celery/ext.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -30,6 +30,7 @@

import pkg_resources
from flask_celeryext import FlaskCeleryExt
from celery.signals import import_modules


class InvenioCelery(object):
Expand All @@ -47,20 +48,22 @@ def init_app(self, app, assets=None,
"""Initialize application object."""
self.init_config(app.config)
self.celery = FlaskCeleryExt(app).celery
self.entry_point_group = entry_point_group
app.extensions['invenio-celery'] = self

if entry_point_group:
def load_entry_points(self):
"""Load tasks from entry points."""
if self.entry_point_group:
task_packages = []
for item in pkg_resources.iter_entry_points(
group=entry_point_group):
group=self.entry_point_group):
task_packages.append(item.module_name)

if task_packages:
self.celery.autodiscover_tasks(
task_packages, related_name='', force=True
)

app.extensions['invenio-celery'] = self

def init_config(self, config):
"""Initialize configuration."""
config.setdefault('BROKER_URL', 'redis://localhost:6379/0')
Expand Down Expand Up @@ -94,3 +97,11 @@ def suspend_queues(self, active_queues, sleep_time=10.0):
self.disable_queue(queue)
while self.get_active_tasks():
time.sleep(sleep_time)


@import_modules.connect()
def celery_module_imports(sender, signal=None, **kwargs):
"""Load shared celery tasks."""
app = getattr(sender, 'flask_app', None)
if app:
app.extensions['invenio-celery'].load_entry_points()
4 changes: 2 additions & 2 deletions setup.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -59,7 +59,7 @@
setup_requires = []

install_requires = [
'Flask-CeleryExt>=0.1.0',
'Flask-CeleryExt>=0.2.0',
'redis>=2.10.0',
'msgpack-python>=0.4.6',
]
Expand Down
32 changes: 15 additions & 17 deletions tests/conftest.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -29,12 +29,12 @@

import sys

from celery import shared_task
from celery import shared_task, Celery
from flask import Flask
import pytest


@pytest.fixture()
@pytest.yield_fixture()
def app(request):
"""Flask app fixture."""
app = Flask("testapp")
Expand All @@ -50,19 +50,17 @@ def shared_compute():
"""Dummy function."""
pass

def teardown():
"""Clear global celery state."""
import celery._state
celery._state._apps.discard(
app.extensions['invenio-celery'].celery
)
celery._state._on_app_finalizers = set()
yield app

# Clear our modules to get them re-imported by Celery.
if 'first_tasks' in sys.modules:
del sys.modules['first_tasks']
if 'second_tasks' in sys.modules:
del sys.modules['second_tasks']
import celery._state
celery._state._apps.discard(
app.extensions['invenio-celery'].celery._get_current_object()
)
celery._state._on_app_finalizers = set()
celery._state.set_default_app(Celery())

request.addfinalizer(teardown)
return app
# Clear our modules to get them re-imported by Celery.
if 'first_tasks' in sys.modules:
del sys.modules['first_tasks']
if 'second_tasks' in sys.modules:
del sys.modules['second_tasks']
14 changes: 13 additions & 1 deletion tests/test_invenio_celery.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2015 CERN.
# Copyright (C) 2015, 2016 CERN.
#
# Invenio is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
Expand Down Expand Up @@ -72,6 +72,7 @@ def test1():
def test_enabled_autodiscovery(app):
"""Test shared task detection."""
ext = InvenioCelery(app)
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' in ext.celery.tasks.keys()
Expand All @@ -82,6 +83,7 @@ def test_enabled_autodiscovery(app):
def test_only_first_tasks(app):
"""Test loading different entrypoint group."""
ext = InvenioCelery(app, entry_point_group='only_first_tasks')
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
Expand All @@ -91,12 +93,22 @@ def test_only_first_tasks(app):
def test_disabled_autodiscovery(app):
"""Test disabled discovery."""
ext = InvenioCelery(app, entry_point_group=None)
ext.load_entry_points()
assert 'conftest.shared_compute' in ext.celery.tasks.keys()
assert 'first_tasks.first_task' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_a' not in ext.celery.tasks.keys()
assert 'second_tasks.second_task_b' not in ext.celery.tasks.keys()


@patch("pkg_resources.iter_entry_points", _mock_entry_points)
def test_worker_loading(app):
"""Test that tasks are only loaded on the worker."""
ext = InvenioCelery(app)
assert 'first_tasks.first_task' not in ext.celery.tasks.keys()
ext.celery.loader.import_default_modules()
assert 'first_tasks.first_task' in ext.celery.tasks.keys()


def test_get_queues(app):
"""Test get queues."""
ext = InvenioCelery(app)
Expand Down