Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.
/ pulp Public archive

Commit

Permalink
Perform all Worker setup in the same signal.
Browse files Browse the repository at this point in the history
We had two different Celery startup signal handlers registered, and it
seemed that the second was no longer being sent. This commit ensures
that all worker startup code happens in the same signal handler, and
that it occurs in the correct order.

https://pulp.plan.io/issues/988

fixed #988
  • Loading branch information
Randy Barlow committed Jun 2, 2015
1 parent 64436f6 commit 951cf3f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 50 deletions.
36 changes: 29 additions & 7 deletions server/pulp/server/async/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,53 @@
will see the "celery" module attribute and use it. This module also initializes the Pulp app after
Celery setup finishes.
"""
from celery.signals import celeryd_after_setup
from celery.signals import worker_init

# This import will load our configs
from pulp.server import config # noqa
from pulp.server import initialization
# We need this import so that the Celery setup_logging signal gets registered
from pulp.server import logs # noqa
from pulp.server.async import tasks
# This import is here so that Celery will find our application instance
from pulp.server.async.celery_instance import celery # noqa
from pulp.server.managers.repo import _common as common_utils


@celeryd_after_setup.connect
def initialize_pulp(sender, instance, **kwargs):
@worker_init.connect
def initialize_worker(sender, instance, **kwargs):
"""
This function makes the call to Pulp's initialization code. It uses the celeryd_after_setup
signal[0] so that it gets called by Celery after logging is initialized, but before Celery
starts to run tasks.
This function performs all the necessary initialization of the Celery worker.
It starts by cleaning up old state if this worker was previously running, but died unexpectedly.
In such cases, any Pulp tasks that were running or waiting on this worker will show incorrect
state. Any reserved_resource reservations associated with the previous worker will also be
removed along with the worker entry in the database itself. The working directory specified in
/etc/pulp/server.conf (/var/cache/pulp/<worker_name>) by default is removed and recreated. This
is called early in the worker start process, and later when it's fully online, pulp_celerybeat
will discover the worker as usual to allow new work to arrive at this worker. If there is no
previous work to cleanup, this method still runs, but has no effect on the database.
After cleaning up old state, it ensures the existence of the worker's working directory.
Lastly, this function makes the call to Pulp's initialization code.
It uses the celeryd_after_setup signal[0] so that it gets called by Celery after logging is
initialized, but before Celery starts to run tasks.
[0] http://celery.readthedocs.org/en/latest/userguide/signals.html#celeryd-after-setup
:param sender: The hostname of the worker (unused)
:param sender: The hostname of the worker
:type sender: basestring
:param instance: The Worker instance to be initialized (unused)
:type instance: celery.apps.worker.Worker
:param kwargs: Other params (unused)
:type kwargs: dict
"""
initialization.initialize()

name = sender
tasks._delete_worker(name, normal_shutdown=True)

# Create a new working directory for worker that is starting now
common_utils.create_worker_working_directory(name)
26 changes: 0 additions & 26 deletions server/pulp/server/async/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from celery import task, Task as CeleryTask, current_task
from celery.app import control, defaults
from celery.result import AsyncResult
from celery.signals import worker_init
from mongoengine.queryset import DoesNotExist

from pulp.common import constants, dateutils
Expand Down Expand Up @@ -559,28 +558,3 @@ def wrap_f(*args, **kwargs):
signal.signal(signal.SIGTERM, old_signal)

return wrap_f


@worker_init.connect
def cleanup_old_worker(*args, **kwargs):
"""
Cleans up old state if this worker was previously running, but died unexpectedly.
In those cases, any Pulp tasks that were running or waiting on this worker will show incorrect
state. Any reserved_resource reservations associated with the previous worker will also be
removed along with the worker entry in the database itself. The working directory specified in
/etc/pulp/server.conf (/var/cache/pulp/<worker_name>) by default is removed and recreated. This
is called early in the worker start process, and later when its fully online pulp_celerybeat
will discover the worker as usual to allow new work to arrive at this worker.
If there is no previous work to cleanup this method still runs, but has not effect on the
database.
:param args: For positional arguments; and not used otherwise
:param kwargs: For keyword arguments, and expected to have one named 'sender'
:return: None
"""
name = kwargs['sender'].hostname
_delete_worker(name, normal_shutdown=True)
# Recreate a new working directory for worker that is starting now
common_utils.create_worker_working_directory(name)
16 changes: 11 additions & 5 deletions server/test/unit/server/async/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@
from pulp.server.async import app


class InitializePulpTestCase(unittest.TestCase):
class InitializeWorkerTestCase(unittest.TestCase):
"""
This class contains tests for the initialize_pulp() function.
This class contains tests for the initialize_worker() function.
"""
@mock.patch('pulp.server.async.app.common_utils.create_worker_working_directory')
@mock.patch('pulp.server.async.app.initialization.initialize')
def test_initialize_pulp(self, initialize):
@mock.patch('pulp.server.async.app.tasks._delete_worker')
def test_initialize_worker(self, _delete_worker, initialize, create_worker_working_directory):
"""
Assert that initialize_pulp() calls Pulp's initialization code.
Assert that initialize_worker() calls Pulp's initialization code and the appropriate worker
monitoring code.
"""
sender = mock.MagicMock()
# The args aren't used and don't matter, so we'll just pass some mocks
app.initialize_pulp(mock.MagicMock(), mock.MagicMock())
app.initialize_worker(sender, mock.MagicMock())

initialize.assert_called_once_with()
_delete_worker.assert_called_once_with(sender, normal_shutdown=True)
create_worker_working_directory.assert_called_once_with(sender)
12 changes: 0 additions & 12 deletions server/test/unit/server/async/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,18 +792,6 @@ def test_get_task_id_not_in_task(self):
self.assertEquals(None, tasks.get_current_task_id())


class TestCleanupOldWorker(unittest.TestCase):

@mock.patch('pulp.server.managers.repo._common.create_worker_working_directory')
@mock.patch('pulp.server.async.tasks._delete_worker')
def test_assert_calls__delete_worker_synchronously(self, mock__delete_worker,
mock__create_worker_working_directory):
sender = mock.Mock()
tasks.cleanup_old_worker(sender=sender)
mock__delete_worker.assert_called_once_with(sender.hostname, normal_shutdown=True)
mock__create_worker_working_directory.assert_called_once_with(sender.hostname)


class TestScheduledTasks(unittest.TestCase):

@mock.patch('pulp.server.db.reaper.reap_expired_documents.apply_async')
Expand Down

0 comments on commit 951cf3f

Please sign in to comment.