Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Adds logging if Pulp is missing critical Celery services #1976

Merged
merged 1 commit into from Jul 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/pulp/common/constants.py
Expand Up @@ -60,4 +60,5 @@
# this is used by both platform and plugins to find the default CA path
DEFAULT_CA_PATH = '/etc/pki/tls/certs/ca-bundle.crt'

SCHEDULER_WORKER_NAME = "scheduler"
SCHEDULER_WORKER_NAME = 'scheduler'
RESOURCE_MANAGER_WORKER_NAME = 'resource_manager'
84 changes: 54 additions & 30 deletions server/pulp/server/async/scheduler.py
Expand Up @@ -10,9 +10,9 @@
from celery import beat
from celery.result import AsyncResult

from pulp.common.constants import SCHEDULER_WORKER_NAME
from pulp.common.constants import RESOURCE_MANAGER_WORKER_NAME, SCHEDULER_WORKER_NAME

from pulp.server.async.celery_instance import celery as app, RESOURCE_MANAGER_QUEUE
from pulp.server.async.celery_instance import celery as app
from pulp.server.async.tasks import _delete_worker
from pulp.server.async import worker_watcher
from pulp.server.db import connection as db_connection
Expand Down Expand Up @@ -162,9 +162,9 @@ def run(self):

def monitor_events(self):
"""
Process celery events.
Process Celery events.

Receives events from celery and matches each with the appropriate handler function. The
Receives events from Celery and matches each with the appropriate handler function. The
following events are monitored for: 'task-failed', 'task-succeeded', 'worker-heartbeat',
and 'worker-offline'. The call to capture() is blocking, and does not return.

Expand All @@ -186,57 +186,81 @@ def monitor_events(self):
recv.capture(limit=None, timeout=None, wakeup=True)


class WorkerTimeoutMonitor(threading.Thread):
class CeleryProcessTimeoutMonitor(threading.Thread):
"""
A thread dedicated to processing Celery events.
A thread dedicated to monitoring Celery processes that have stopped checking in.

This object is designed to wakeup periodically and look for workers who have gone missing.
Once a Celery process is determined to be missing it logs and handles cleanup appropriately.
"""

# The amount of time in seconds before a worker is considered missing
WORKER_TIMEOUT_SECONDS = 300
# The amount of time in seconds before a Celery process is considered missing
CELERY_TIMEOUT_SECONDS = 300

# The frequency in seconds with which this thread should look for missing workers.
# The frequency in seconds with which this thread should look for missing Celery processes.
FREQUENCY = 60

def run(self):
"""
The thread entry point. It sleeps for FREQUENCY seconds, and then calls check_workers()
The thread entry point. Sleep for FREQUENCY seconds, then call check_celery_processes()

This method has a try/except block around check_workers() to add durability to this
background thread.
This method has a try/except block around check_celery_processes() to add durability to
this background thread.
"""
_logger.info(_('Worker Timeout Monitor Started'))
while True:
time.sleep(self.FREQUENCY)
try:
self.check_workers()
self.check_celery_processes()
except Exception as e:
_logger.error(e)

def check_workers(self):
def check_celery_processes(self):
"""
Look for missing workers, and dispatch a cleanup task if one goes missing.
Look for missing Celery processes, log and cleanup as needed.

To find a missing worker, filter the Workers model for entries older than
To find a missing Celery process, filter the Workers model for entries older than
utcnow() - WORKER_TIMEOUT_SECONDS. The heartbeat times are stored in native UTC, so this is
a comparable datetime.

For each missing worker found, dispatch a _delete_worker task requesting that the resource
manager delete the Worker and cleanup any associated work.
a comparable datetime. For each missing worker found, call _delete_worker() synchronously
for cleanup.

This method logs and the debug and error levels.
This method also checks that at least one resource_manager and one scheduler process is
present. If there are zero of either, log at the error level that Pulp will not operate
correctly.
"""
msg = _('Looking for workers missing for more than %s seconds') % self.WORKER_TIMEOUT_SECONDS
msg = _('Checking if pulp_workers, pulp_celerybeat, or pulp_resource_manager '
'processes are missing for more than %d seconds') % self.CELERY_TIMEOUT_SECONDS
_logger.debug(msg)
oldest_heartbeat_time = datetime.utcnow() - timedelta(seconds=self.WORKER_TIMEOUT_SECONDS)
worker_criteria = Criteria(filters={'last_heartbeat': {'$lt': oldest_heartbeat_time}},
fields=('_id', 'last_heartbeat'))
worker_list = list(resources.filter_workers(worker_criteria))
oldest_heartbeat_time = datetime.utcnow() - timedelta(seconds=self.CELERY_TIMEOUT_SECONDS)
all_worker_criteria = Criteria(filters={}, fields=('_id', 'last_heartbeat'))
worker_list = list(resources.filter_workers(all_worker_criteria))
worker_count = 0
resource_manager_count = 0
scheduler_count = 0
for worker in worker_list:
msg = _("Workers '%s' has gone missing, removing from list of workers") % worker.name
if worker['last_heartbeat'] < oldest_heartbeat_time:
msg = _("Worker '%s' has gone missing, removing from list of workers") % worker.name
_logger.error(msg)
_delete_worker(worker.name)
elif worker['name'].startswith(SCHEDULER_WORKER_NAME):
scheduler_count = scheduler_count + 1
elif worker['name'].startswith(RESOURCE_MANAGER_WORKER_NAME):
resource_manager_count = resource_manager_count + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worthwhile to add a debug-level logging statement here for total number of schedulers and resource managers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea; I'm doing it and an associated test.

else:
worker_count = worker_count + 1
if resource_manager_count == 0:
msg = _("There are 0 pulp_resource_manager processes running. Pulp will not operate "
"correctly without at least one pulp_resource_mananger process running.")
_logger.error(msg)
_delete_worker(worker.name)
if scheduler_count == 0:
msg = _("There are 0 pulp_celerybeat processes running. Pulp will not operate "
"correctly without at least one pulp_celerybeat process running.")
_logger.error(msg)
output_dict = {'workers': worker_count, 'celerybeat': scheduler_count,
'resource_manager': resource_manager_count}
msg = _("%(workers)d pulp_worker processes, %(celerybeat)d "
"pulp_celerybeat processes, and %(resource_manager)d "
"pulp_resource_manager processes") % output_dict
_logger.debug(msg)


class Scheduler(beat.Scheduler):
Expand Down Expand Up @@ -304,7 +328,7 @@ def spawn_pulp_monitor_threads(self):
event_monitor.daemon = True
event_monitor.start()
# start monitoring workers who may timeout
worker_timeout_monitor = WorkerTimeoutMonitor()
worker_timeout_monitor = CeleryProcessTimeoutMonitor()
worker_timeout_monitor.daemon = True
worker_timeout_monitor.start()

Expand Down
119 changes: 85 additions & 34 deletions server/test/unit/server/async/test_scheduler.py
Expand Up @@ -5,9 +5,9 @@
from celery.beat import ScheduleEntry
import mock

from pulp.common.constants import RESOURCE_MANAGER_WORKER_NAME, SCHEDULER_WORKER_NAME
from pulp.server.async import scheduler
from pulp.server.async.celery_instance import celery as app
from pulp.server.async.celery_instance import RESOURCE_MANAGER_QUEUE
from pulp.server.db.model import dispatch, resources
from pulp.server.db.model.criteria import Criteria
from pulp.server.managers.factory import initialize
Expand Down Expand Up @@ -234,10 +234,11 @@ def test__init__lazy_is_False(self, mock_event_monitor, mock_spawn_pulp_monitor_


class TestSchedulerSpawnPulpMonitorThreads(unittest.TestCase):

@mock.patch('celery.beat.Scheduler.__init__', new=mock.Mock())
@mock.patch('pulp.server.async.scheduler.EventMonitor')
@mock.patch('pulp.server.async.scheduler.WorkerTimeoutMonitor')
def test_spawn_pulp_monitor_threads(self, mock_worker_timeout_monitor, mock_event_monitor):
@mock.patch('pulp.server.async.scheduler.CeleryProcessTimeoutMonitor')
def test_spawn_pulp_monitor_threads(self, mock_celery_timeout_monitor, mock_event_monitor):
my_scheduler = scheduler.Scheduler()

my_scheduler.spawn_pulp_monitor_threads()
Expand All @@ -246,9 +247,9 @@ def test_spawn_pulp_monitor_threads(self, mock_worker_timeout_monitor, mock_even
self.assertTrue(mock_event_monitor.return_value.daemon)
mock_event_monitor.return_value.start.assert_called_once()

mock_worker_timeout_monitor.assert_called_once_with()
self.assertTrue(mock_worker_timeout_monitor.return_value.daemon)
mock_worker_timeout_monitor.return_value.start.assert_called_once()
mock_celery_timeout_monitor.assert_called_once_with()
self.assertTrue(mock_celery_timeout_monitor.return_value.daemon)
mock_celery_timeout_monitor.return_value.start.assert_called_once()


class TestSchedulerTick(unittest.TestCase):
Expand Down Expand Up @@ -533,83 +534,133 @@ def test_logs_exception(self, mock_sleep, mock_monitor_events, mock_log_error):
self.assertEqual(mock_log_error.call_count, 1)


class TestWorkerTimeoutMonitorRun(unittest.TestCase):
class TestCeleryProcessTimeoutMonitorRun(unittest.TestCase):

class SleepException(Exception):
pass

@mock.patch.object(scheduler.WorkerTimeoutMonitor, 'check_workers', spec_set=True)
@mock.patch.object(scheduler.CeleryProcessTimeoutMonitor, 'check_celery_processes',
spec_set=True)
@mock.patch.object(scheduler.time, 'sleep', spec_set=True)
def test_sleeps(self, mock_sleep, mock_check_workers):
def test_sleeps(self, mock_sleep, mock_check_celery_processes):
# raising an exception is the only way we have to break out of the
# infinite loop
mock_sleep.side_effect = self.SleepException

self.assertRaises(self.SleepException, scheduler.WorkerTimeoutMonitor().run)
self.assertRaises(self.SleepException, scheduler.CeleryProcessTimeoutMonitor().run)

# verify the frequency
mock_sleep.assert_called_once_with(60)

@mock.patch.object(scheduler._logger, 'error', spec_set=True)
@mock.patch.object(scheduler.WorkerTimeoutMonitor, 'check_workers', spec_set=True)
@mock.patch.object(scheduler.CeleryProcessTimeoutMonitor, 'check_celery_processes',
spec_set=True)
@mock.patch.object(scheduler.time, 'sleep', spec_set=True)
def test_checks_workers(self, mock_sleep, mock_check_workers, mock_log_error):
def test_checks_workers(self, mock_sleep, mock_check_celery_processes, mock_log_error):

# raising an exception is the only way we have to break out of the
# infinite loop
mock_check_workers.side_effect = self.SleepException
mock_check_celery_processes.side_effect = self.SleepException
mock_log_error.side_effect = self.SleepException

self.assertRaises(self.SleepException, scheduler.WorkerTimeoutMonitor().run)
self.assertRaises(self.SleepException, scheduler.CeleryProcessTimeoutMonitor().run)

mock_check_workers.assert_called_once_with()
mock_check_celery_processes.assert_called_once_with()

@mock.patch.object(scheduler._logger, 'error', spec_set=True)
@mock.patch.object(scheduler.WorkerTimeoutMonitor, 'check_workers', spec_set=True)
@mock.patch.object(scheduler.CeleryProcessTimeoutMonitor, 'check_celery_processes',
spec_set=True)
@mock.patch.object(scheduler.time, 'sleep', spec_set=True)
def test_logs_exception(self, mock_sleep, mock_check_workers, mock_log_error):
def test_logs_exception(self, mock_sleep, mock_check_celery_processes, mock_log_error):

# raising an exception is the only way we have to break out of the
# infinite loop
mock_check_workers.side_effect = self.SleepException
mock_check_celery_processes.side_effect = self.SleepException
mock_log_error.side_effect = self.SleepException

self.assertRaises(self.SleepException, scheduler.WorkerTimeoutMonitor().run)
self.assertRaises(self.SleepException, scheduler.CeleryProcessTimeoutMonitor().run)

self.assertEqual(mock_log_error.call_count, 1)


class TestWorkerTimeoutMonitorCheckWorkers(unittest.TestCase):
@mock.patch('pulp.server.managers.resources.filter_workers', spec_set=True)
def test_calls_filter(self, mock_filter):
class TestCeleryProcessTimeoutMonitorCheckCeleryProcesses(unittest.TestCase):

@mock.patch('pulp.server.async.scheduler.resources.filter_workers', spec_set=True)
def test_queries_all_workers(self, mock_filter):
mock_filter.return_value = []

scheduler.WorkerTimeoutMonitor().check_workers()
scheduler.CeleryProcessTimeoutMonitor().check_celery_processes()

# verify that filter was called with the right argument
self.assertEqual(mock_filter.call_count, 1)
self.assertEqual(len(mock_filter.call_args[0]), 1)
call_arg = mock_filter.call_args[0][0]
self.assertTrue(isinstance(call_arg, Criteria))

# make sure the timestamp being searched for is within the appropriate timeout,
# plus a 1-second grace period to account for execution time of test code.
timestamp = call_arg.filters['last_heartbeat']['$lt']
self.assertTrue(isinstance(timestamp, datetime))
self.assertTrue(datetime.utcnow() - timestamp <
timedelta(scheduler.WorkerTimeoutMonitor.WORKER_TIMEOUT_SECONDS + 1))
self.assertEqual(call_arg.filters, {})
self.assertEqual(call_arg.fields, ('_id', 'last_heartbeat'))

@mock.patch('pulp.server.async.scheduler._delete_worker', spec_set=True)
@mock.patch('pulp.server.managers.resources.filter_workers', spec_set=True)
@mock.patch('pulp.server.async.scheduler.resources.filter_workers', spec_set=True)
def test_deletes_workers(self, mock_filter, mock_delete_worker):
mock_filter.return_value = [
resources.Worker('name1', datetime.utcnow()),
resources.Worker('name1', datetime.utcnow() - timedelta(seconds=400)),
resources.Worker('name2', datetime.utcnow()),
]

scheduler.CeleryProcessTimeoutMonitor().check_celery_processes()

# make sure _delete_worker is only called for the old worker
mock_delete_worker.assert_has_calls([mock.call('name1')])

@mock.patch('pulp.server.async.scheduler._delete_worker', spec_set=True)
@mock.patch('pulp.server.async.scheduler.resources.filter_workers', spec_set=True)
@mock.patch('pulp.server.async.scheduler._logger', spec_set=True)
def test_logs_scheduler_missing(self, mock__logger, mock_filter, mock_delete_worker):
mock_filter.return_value = [
resources.Worker(RESOURCE_MANAGER_WORKER_NAME, datetime.utcnow()),
resources.Worker('name2', datetime.utcnow()),
]

scheduler.WorkerTimeoutMonitor().check_workers()
scheduler.CeleryProcessTimeoutMonitor().check_celery_processes()

mock__logger.error.assert_called_once_with(
'There are 0 pulp_celerybeat processes running. Pulp will not operate '
'correctly without at least one pulp_celerybeat process running.')

@mock.patch('pulp.server.async.scheduler._delete_worker', spec_set=True)
@mock.patch('pulp.server.async.scheduler.resources.filter_workers', spec_set=True)
@mock.patch('pulp.server.async.scheduler._logger', spec_set=True)
def test_logs_resource_manager_missing(self, mock__logger, mock_filter, mock_delete_worker):
mock_filter.return_value = [
resources.Worker(SCHEDULER_WORKER_NAME, datetime.utcnow()),
resources.Worker('name2', datetime.utcnow()),
]

scheduler.CeleryProcessTimeoutMonitor().check_celery_processes()

mock__logger.error.assert_called_once_with(
'There are 0 pulp_resource_manager processes running. Pulp will not operate '
'correctly without at least one pulp_resource_mananger process running.')

@mock.patch('pulp.server.async.scheduler._delete_worker', spec_set=True)
@mock.patch('pulp.server.async.scheduler.resources.filter_workers', spec_set=True)
@mock.patch('pulp.server.async.scheduler._logger', spec_set=True)
def test_debug_logging(self, mock__logger, mock_filter, mock_delete_worker):
mock_filter.return_value = [
resources.Worker('name1', datetime.utcnow() - timedelta(seconds=400)),
resources.Worker('name2', datetime.utcnow()),
resources.Worker(RESOURCE_MANAGER_WORKER_NAME, datetime.utcnow()),
resources.Worker(SCHEDULER_WORKER_NAME, datetime.utcnow()),
]

# make sure _delete_worker is only called for the two expected calls
mock_delete_worker.assert_has_calls([mock.call('name1'), mock.call('name2')])
scheduler.CeleryProcessTimeoutMonitor().check_celery_processes()
mock__logger.debug.assert_has_calls([
mock.call('Checking if pulp_workers, pulp_celerybeat, or '
'pulp_resource_manager processes are missing for more than 300 seconds'),
mock.call('1 pulp_worker processes, 1 pulp_celerybeat processes, '
'and 1 pulp_resource_manager processes')
])


SCHEDULES = [
Expand Down