Skip to content

Commit

Permalink
Reduced heartbeat/timeout intervals
Browse files Browse the repository at this point in the history
Simplified and reduced the timings so that all failure detection occurs
within 30 seconds. Changed resource_manager to use a timestamp-based
locking mechanism.

closes #2509
https://pulp.plan.io/issues/2509
  • Loading branch information
dralley committed Jan 13, 2017
1 parent dd0d013 commit f9355d2
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 79 deletions.
30 changes: 11 additions & 19 deletions common/pulp/common/constants.py
Expand Up @@ -50,29 +50,21 @@
# this is used by both platform and plugins to find the default CA path
DEFAULT_CA_PATH = '/etc/pki/tls/certs/ca-bundle.crt'

# celerybeat constants

# Scheduler worker name
SCHEDULER_WORKER_NAME = 'scheduler'
# Constant used as the default wait time for celerybeat instances with no lock
CELERY_TICK_DEFAULT_WAIT_TIME = 90
# Constant used to determine whether a CeleryBeatLock should be removed due to age
CELERYBEAT_LOCK_MAX_AGE = 200
# The amount of time in seconds before a Celery process is considered missing
CELERY_TIMEOUT_SECONDS = 300
# The interval in seconds for which the Celery Process monitor thread sleeps between
# checking for missing Celery processes.
CELERY_CHECK_INTERVAL = 60
# The maximum number of seconds that will ever elapse before the scheduler looks for
# new or changed schedules
CELERY_MAX_INTERVAL = 90

# Resource manager worker name
RESOURCE_MANAGER_WORKER_NAME = 'resource_manager'

# The amount of time (in seconds) between process wakeups to "heartbeat" and perform
# their tasks.
PULP_PROCESS_HEARTBEAT_INTERVAL = 5

# The amount of time (in seconds) after which a Celery process is considered missing.
PULP_PROCESS_TIMEOUT_INTERVAL = 25

# The amount of time the migration script will wait to confirm that no processes are running.
# This is the 90s CELERY_TICK_DEFAULT_WAIT_TIME used in Pulp version < 2.12 and a 2s buffer.
# This ensures that the process check feature works correctly even in cases where a user
# forgot to restart pulp_celerybeat while upgrading from Pulp 2.11 or earlier.
MIGRATION_WAIT_TIME = 92

# resource manager constants

# Resource manager worker name
RESOURCE_MANAGER_WORKER_NAME = 'resource_manager'
18 changes: 13 additions & 5 deletions docs/user-guide/release-notes/2.12.x.rst
Expand Up @@ -8,11 +8,19 @@ Pulp 2.12.0
New Features
------------

* Task profiling can now be enabled. This will use cProfiling on an individual task and write the profile to a directory for a given task. While this can impact performance, this can enable users to get some insight into what a task is doing or use the output to give to a developer for debugging.
* ``pulp-manage-db`` will not continue if pulp_celerybeat, pulp_resource_manager, or pulp_workers
processes are running. This will prevent the user from corrupting their Pulp installation by
migrating with running workers. This works in both standalone and clustered installations. Pulp
may wait up to 92 seconds to determine if workers are running.
* Task profiling can now be enabled. This will use `cProfile
<https://docs.python.org/2/library/profile.html#module-cProfile>`_ on an individual task and write
the profile to a directory for a given task. While this can impact performance, this enables users
to get some insight into what a task is doing or give the output to give to a developer for debugging.

* ``pulp-manage-db`` will not continue if pulp_celerybeat, pulp_resource_manager, or pulp_workers
processes are running. This prevents the user from corrupting their Pulp installation by applying
migrations while workers or pulp_celerybeat are running. This works in both standalone and clustered
installations. Pulp may wait up to 92 seconds to determine if workers are running.

* Worker failure detection and high availability failover occurs within 30 seconds. Pulp processes are
considered missing after 25 seconds, and heartbeats occur every 5 seconds.


Deprecation
-----------
Expand Down
10 changes: 5 additions & 5 deletions docs/user-guide/server.rst
Expand Up @@ -18,12 +18,12 @@ For a recap of Pulp components and the work they are responsible for, read :ref:

* If a ``pulp_worker`` dies, the dispatched Pulp tasks destined for that worker (both the task
currently being worked on and queued/related tasks) will not be processed. They will stall for
at most six minutes before being canceled. Status of the tasks is marked as canceled after
5 minutes or in case the worker has been re-started, whichever action occurs first.
Cancellation after 5 minutes is dependent on ``pulp_celerybeat`` service running. A monitoring
at most 30 seconds before being canceled. Status of the tasks is marked as canceled after
30 seconds or in case the worker has been re-started, whichever action occurs first.
Cancellation after 30 seconds is dependent on ``pulp_celerybeat`` service running. A monitoring
component inside of ``pulp_celerybeat`` monitors all workers' heartbeats. If a worker does not
heartbeat within five minutes, it is considered missing. This check occurs once a minute, causing
a maximum delay of six minutes before a worker is considered missing and tasks canceled by Pulp.
heartbeat within 25 seconds, it is considered missing. This check occurs every 5 seconds, causing
a maximum delay of 30 seconds before a worker is considered missing and tasks canceled by Pulp.

A missing worker has all tasks destined for it canceled, and no new work is assigned to the
missing worker. This causes new Pulp operations dispatched to continue normally with the other
Expand Down
2 changes: 1 addition & 1 deletion server/etc/default/upstart_pulp_resource_manager
Expand Up @@ -25,7 +25,7 @@ CELERYD_NODES="resource_manager"

# Set the concurrency of each worker node to 1, tell the worker to participate in event
# broadcasting, and subscribe to the resource_manager queue. DO NOT CHANGE THIS SETTING!
CELERYD_OPTS="-c 1 --events -Q resource_manager --umask=18 --heartbeat-interval=30"
CELERYD_OPTS="-c 1 --events -Q resource_manager --umask=18 --heartbeat-interval=5"

CELERYD_USER="apache"

Expand Down
2 changes: 1 addition & 1 deletion server/etc/default/upstart_pulp_workers
Expand Up @@ -33,7 +33,7 @@ CELERY_CREATE_DIRS=1
CELERYD_NODES=""

# Set the concurrency of each worker node to 1. DO NOT CHANGE THE CONCURRENCY!
CELERYD_OPTS="-c 1 --events --umask=18 --heartbeat-interval=30"
CELERYD_OPTS="-c 1 --events --umask=18 --heartbeat-interval=5"

CELERYD_USER="apache"

Expand Down
20 changes: 12 additions & 8 deletions server/pulp/server/async/app.py
Expand Up @@ -6,13 +6,13 @@

import logging
import time
from datetime import datetime
from datetime import datetime, timedelta
from gettext import gettext as _

import mongoengine
from celery.signals import celeryd_after_setup, worker_shutdown

from pulp.common import constants
from pulp.common import constants, dateutils
from pulp.server import initialization
from pulp.server.async import tasks
from pulp.server.db.model import ResourceManagerLock, Worker
Expand Down Expand Up @@ -122,10 +122,17 @@ def get_resource_manager_lock(name):
_first_check = True

while True:

now = dateutils.ensure_tz(datetime.utcnow())
old_timestamp = now - timedelta(seconds=constants.PULP_PROCESS_TIMEOUT_INTERVAL)

ResourceManagerLock.objects(timestamp__lte=old_timestamp).delete()

# Create / update the worker record so that Pulp knows we exist
Worker.objects(name=name).update_one(set__last_heartbeat=datetime.utcnow(),
upsert=True)
try:
lock.timestamp = now
lock.save()

msg = _("Resource manager '%s' has acquired the resource manager lock") % name
Expand All @@ -134,11 +141,8 @@ def get_resource_manager_lock(name):
except mongoengine.NotUniqueError:
# Only log the message the first time
if _first_check:
msg = _("Resource manager '%(name)s' attempted to acquire the the resource manager "
"lock but was unable to do so. It will retry every %(interval)d seconds "
"until the lock can be acquired.") % \
{'name': name, 'interval': constants.CELERY_CHECK_INTERVAL}
_logger.info(msg)
_logger.info(_("Hot spare pulp_resource_manager instance '%(name)s' detected.")
% {'name': name})
_first_check = False

time.sleep(constants.CELERY_CHECK_INTERVAL)
time.sleep(constants.PULP_PROCESS_HEARTBEAT_INTERVAL)
2 changes: 1 addition & 1 deletion server/pulp/server/async/manage_workers.py
Expand Up @@ -22,7 +22,7 @@
WorkingDirectory=/var/run/pulp/
ExecStart=/usr/bin/celery worker -n reserved_resource_worker-%(num)s@%%%%h -A pulp.server.async.app\
-c 1 --events --umask 18 --pidfile=/var/run/pulp/reserved_resource_worker-%(num)s.pid\
--heartbeat-interval=30 %(max_tasks_argument)s
--heartbeat-interval=5 %(max_tasks_argument)s
KillSignal=SIGQUIT
"""

Expand Down
27 changes: 15 additions & 12 deletions server/pulp/server/async/scheduler.py
Expand Up @@ -97,7 +97,7 @@ def run(self):
"""
_logger.info(_('Worker Timeout Monitor Started'))
while True:
time.sleep(constants.CELERY_CHECK_INTERVAL)
time.sleep(constants.PULP_PROCESS_HEARTBEAT_INTERVAL)
try:
self.check_celery_processes()
except Exception as e:
Expand All @@ -117,10 +117,10 @@ def check_celery_processes(self):
correctly.
"""
msg = _('Checking if pulp_workers, pulp_celerybeat, or pulp_resource_manager processes '
'are missing for more than %d seconds') % constants.CELERY_TIMEOUT_SECONDS
'are missing for more than %d seconds') % constants.PULP_PROCESS_TIMEOUT_INTERVAL
_logger.debug(msg)
now = ensure_tz(datetime.utcnow())
oldest_heartbeat_time = now - timedelta(seconds=constants.CELERY_TIMEOUT_SECONDS)
oldest_heartbeat_time = now - timedelta(seconds=constants.PULP_PROCESS_TIMEOUT_INTERVAL)
worker_list = Worker.objects.all()
worker_count = 0
resource_manager_count = 0
Expand Down Expand Up @@ -179,7 +179,7 @@ class Scheduler(beat.Scheduler):

# the superclass reads this attribute, which is the maximum number of seconds
# that will ever elapse before the scheduler looks for new or changed schedules.
max_interval = constants.CELERY_MAX_INTERVAL
max_interval = constants.PULP_PROCESS_HEARTBEAT_INTERVAL

# allows mongo initialization to occur exactly once during the first call to setup_schedule()
_mongo_initialized = False
Expand All @@ -194,6 +194,7 @@ def __init__(self, *args, **kwargs):
"""
self._schedule = None
self._loaded_from_db_count = 0
self._retry_msg_already_logged = False

# Force the use of the Pulp celery_instance when this custom Scheduler is used.
kwargs['app'] = app
Expand Down Expand Up @@ -255,10 +256,11 @@ def tick(self):

worker_watcher.handle_worker_heartbeat(scheduler_event)

old_timestamp = datetime.utcnow() - timedelta(seconds=constants.CELERYBEAT_LOCK_MAX_AGE)
now = ensure_tz(datetime.utcnow())
old_timestamp = now - timedelta(seconds=constants.PULP_PROCESS_TIMEOUT_INTERVAL)

# Updating the current lock if lock is on this instance of celerybeat
result = CeleryBeatLock.objects(celerybeat_name=CELERYBEAT_NAME).\
result = CeleryBeatLock.objects(name=CELERYBEAT_NAME).\
update(set__timestamp=datetime.utcnow())

# If current instance has lock and updated lock_timestamp, call super
Expand All @@ -273,8 +275,7 @@ def tick(self):
lock_timestamp = datetime.utcnow()

# Insert new lock entry
new_lock = CeleryBeatLock(celerybeat_name=CELERYBEAT_NAME,
timestamp=lock_timestamp)
new_lock = CeleryBeatLock(name=CELERYBEAT_NAME, timestamp=lock_timestamp)
new_lock.save()
_logger.info(_("New lock acquired by %(celerybeat_name)s") %
{'celerybeat_name': CELERYBEAT_NAME})
Expand All @@ -283,10 +284,12 @@ def tick(self):

except mongoengine.NotUniqueError:
# Setting a default wait time for celerybeat instances with no lock
ret = constants.CELERY_TICK_DEFAULT_WAIT_TIME
_logger.info(_("Duplicate or new celerybeat Instance, "
"ticking again in %(ret)s seconds.")
% {'ret': ret})
ret = constants.PULP_PROCESS_HEARTBEAT_INTERVAL

if not self._retry_msg_already_logged:
_logger.info(_("Hot spare celerybeat instance '%(celerybeat_name)s' detected.")
% {'celerybeat_name': CELERYBEAT_NAME})
self._retry_msg_already_logged = True
return ret

def setup_schedule(self):
Expand Down
9 changes: 7 additions & 2 deletions server/pulp/server/async/tasks.py
Expand Up @@ -15,14 +15,15 @@
from celery.result import AsyncResult
from mongoengine.queryset import DoesNotExist

from pulp.common.constants import SCHEDULER_WORKER_NAME, RESOURCE_MANAGER_WORKER_NAME
from pulp.common.constants import RESOURCE_MANAGER_WORKER_NAME, SCHEDULER_WORKER_NAME
from pulp.common import constants, dateutils, tags
from pulp.server.async.celery_instance import celery, RESOURCE_MANAGER_QUEUE, \
DEDICATED_QUEUE_EXCHANGE
from pulp.server.exceptions import PulpException, MissingResource, \
PulpCodedException
from pulp.server.config import config
from pulp.server.db.model import Worker, ReservedResource, TaskStatus, ResourceManagerLock
from pulp.server.db.model import Worker, ReservedResource, TaskStatus, \
ResourceManagerLock, CeleryBeatLock
from pulp.server.exceptions import NoWorkers
from pulp.server.managers.repo import _common as common_utils
from pulp.server.managers import factory as managers
Expand Down Expand Up @@ -261,6 +262,10 @@ def _delete_worker(name, normal_shutdown=False):
if name.startswith(RESOURCE_MANAGER_WORKER_NAME):
ResourceManagerLock.objects(name=name).delete()

# If the worker is a scheduler, we also need to delete the associated lock
if name.startswith(SCHEDULER_WORKER_NAME):
CeleryBeatLock.objects(name=name).delete()

# Cancel all of the tasks that were assigned to this worker's queue
for task_status in TaskStatus.objects(worker_name=name,
state__in=constants.CALL_INCOMPLETE_STATES):
Expand Down
14 changes: 11 additions & 3 deletions server/pulp/server/async/worker_watcher.py
Expand Up @@ -17,8 +17,9 @@
from gettext import gettext as _
import logging

from pulp.common import constants
from pulp.server.async.tasks import _delete_worker
from pulp.server.db.model import Worker
from pulp.server.db.model import Worker, ResourceManagerLock


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -72,15 +73,22 @@ def handle_worker_heartbeat(event):
:type event: dict
"""
event_info = _parse_and_log_event(event)
worker = Worker.objects(name=event_info['worker_name']).first()
existing_worker = Worker.objects(name=event_info['worker_name']).first()

if not worker:
if not existing_worker:
msg = _("New worker '%(worker_name)s' discovered") % event_info
_logger.info(msg)

# Update the worker timestamp, and create a new worker record if the worker did not previously
# exist
Worker.objects(name=event_info['worker_name']).\
update_one(set__last_heartbeat=event_info['local_received'], upsert=True)

# If the worker is a resource manager, update the associated ResourceManagerLock timestamp
if event_info['worker_name'].startswith(constants.RESOURCE_MANAGER_WORKER_NAME):
ResourceManagerLock.objects(name=event_info['worker_name']).\
update_one(set__timestamp=datetime.utcnow(), upsert=False)


def handle_worker_offline(event):
"""
Expand Down
9 changes: 6 additions & 3 deletions server/pulp/server/db/model/__init__.py
Expand Up @@ -989,16 +989,16 @@ class CeleryBeatLock(AutoRetryDocument):
"""
Single document collection which gives information about the current celerybeat lock.
:ivar celerybeat_name: string representing the celerybeat instance name
:type celerybeat_name: basestring
:ivar name: string representing the celerybeat instance name
:type name: basestring
:ivar timestamp: The timestamp(UTC) at which lock is acquired
:type timestamp: datetime.datetime
:ivar lock: A unique key set to "locked" when lock is acquired.
:type lock: basestring
:ivar _ns: (Deprecated), Contains the name of the collection this model represents
:type _ns: mongoengine.StringField
"""
celerybeat_name = StringField(required=True)
name = StringField(required=True)
timestamp = UTCDateTimeField(required=True)
lock = StringField(required=True, default="locked", unique=True)

Expand All @@ -1012,12 +1012,15 @@ class ResourceManagerLock(AutoRetryDocument):
:ivar name: string representing the resource manager instance name
:type name: basestring
:ivar timestamp: The timestamp(UTC) at which lock is acquired
:type timestamp: datetime.datetime
:ivar lock: A unique key set to "locked" when lock is acquired.
:type lock: basestring
:ivar _ns: (Deprecated), Contains the name of the collection this model represents
:type _ns: mongoengine.StringField
"""
name = StringField(required=True)
timestamp = UTCDateTimeField(required=True)
lock = StringField(required=True, default="locked", unique=True)

# For backward compatibility
Expand Down
4 changes: 2 additions & 2 deletions server/test/unit/server/async/test_app.py
Expand Up @@ -8,7 +8,7 @@

import mock

from pulp.common.constants import RESOURCE_MANAGER_WORKER_NAME, CELERY_CHECK_INTERVAL
from pulp.common.constants import RESOURCE_MANAGER_WORKER_NAME, PULP_PROCESS_HEARTBEAT_INTERVAL
from pulp.server.async import app
from pulp.server.managers.factory import initialize

Expand Down Expand Up @@ -86,4 +86,4 @@ def test_get_resource_manager_lock(self, mock_rm_lock, mock_worker, mock_datetim
assert_called_with(set__last_heartbeat=mock_datetime.utcnow(), upsert=True)

self.assertEquals(2, len(mock_rm_lock().save.mock_calls))
mock_time.sleep.assert_called_once_with(CELERY_CHECK_INTERVAL)
mock_time.sleep.assert_called_once_with(PULP_PROCESS_HEARTBEAT_INTERVAL)

0 comments on commit f9355d2

Please sign in to comment.