Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Simplify worker 'online' state tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
dralley committed Mar 22, 2018
1 parent 1cb4e19 commit f852da1
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 71 deletions.
55 changes: 43 additions & 12 deletions pulpcore/pulpcore/app/models/task.py
Expand Up @@ -69,19 +69,35 @@ def online_workers(self):
"""
Returns a queryset of workers meeting the criteria to be considered 'online'
To be considered 'online', a worker should both have its 'online' field set to True
and have a recent heartbeat. "Recent" is defined here as "within the pulp process timeout
interval".
To be considered 'online', a worker must have a recent heartbeat timestamp and must not
have the 'gracefully_stopped' flag set to True. "Recent" is defined here as "within
the pulp process timeout interval".
Returns:
:class:`django.db.models.query.QuerySet`: A query set of the Worker objects which
are considered by Pulp to be 'online'.
"""
now = timezone.now()
age_threshold = now - timedelta(seconds=TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL)
return self.filter(name__startswith='reserved',
last_heartbeat__gte=age_threshold,
online=True)

return self.filter(last_heartbeat__gte=age_threshold, gracefully_stopped=False)

def missing_workers(self):
"""
Returns a queryset of workers meeting the criteria to be considered 'missing'
To be considered missing, a worker must have a stale timestamp while also having
cleaned_up=False, meaning that it has not been cleaned up after an improper shutdown.
Stale is defined here as "beyond the pulp process timeout interval".
Returns:
:class:`django.db.models.query.QuerySet`: A query set of the Worker objects which
are considered by Pulp to be 'missing'.
"""
now = timezone.now()
age_threshold = now - timedelta(seconds=TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL)

return self.filter(last_heartbeat__lt=age_threshold, cleaned_up=False)

def with_reservations(self, resources):
"""
Expand Down Expand Up @@ -117,16 +133,31 @@ class Worker(Model):

name = models.TextField(db_index=True, unique=True)
last_heartbeat = models.DateTimeField(auto_now=True)
online = models.BooleanField(default=True)
gracefully_stopped = models.BooleanField(default=True)
cleaned_up = models.BooleanField(default=False)

def save_heartbeat(self):
"""Save a worker heartbeat
@property
def online(self):
"""
Whether a worker can be considered 'online'
Update the last_heartbeat field to now and save it.
To be considered 'online', a worker must have a recent heartbeat timestamp and must not
have the 'gracefully_stopped' flag set to True. "Recent" is defined here as "within
the pulp process timeout interval".
Warnings:
Returns:
bool: True if the worker is considered online, otherwise False
"""
now = timezone.now()
age_threshold = now - timedelta(seconds=TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL)

return not self.gracefully_stopped and self.last_heartbeat >= age_threshold

def save_heartbeat(self):
"""
Update the last_heartbeat field to now and save it.
1) Only the last_heartbeat field will be saved. No other changes will be saved.
Only the last_heartbeat field will be saved. No other changes will be saved.
Raises:
ValueError: When the model instance has never been saved before. This method can
Expand Down
11 changes: 9 additions & 2 deletions pulpcore/pulpcore/app/serializers/task.py
Expand Up @@ -115,10 +115,17 @@ class WorkerSerializer(ModelSerializer):
)

online = serializers.BooleanField(
help_text='Whether the worker is online or not. Defaults to True.',
help_text=_('True if the worker is considered online, otherwise False'),
read_only=True
)

gracefully_stopped = serializers.BooleanField(
help_text=_('True when the worker was shut down properly, False when the worker is \
online, or if it crashed (determined by timestamp).'),
read_only=True
)

class Meta:
model = models.Worker
fields = ModelSerializer.Meta.fields + ('name', 'last_heartbeat', 'online')
fields = ModelSerializer.Meta.fields + ('name', 'last_heartbeat',
'online', 'gracefully_stopped')
43 changes: 17 additions & 26 deletions pulpcore/pulpcore/tasking/celery_app.py
Expand Up @@ -11,7 +11,7 @@
from gettext import gettext as _

from celery import bootsteps
from celery.signals import celeryd_after_setup, worker_shutdown
from celery.signals import celeryd_after_setup
from django.db.utils import IntegrityError
from django.utils import timezone

Expand Down Expand Up @@ -68,13 +68,15 @@ def start(self, consumer):
:param worker: The consumer instance
:type worker: celery.worker.consumer.Consumer
"""
from pulpcore.tasking.services.worker_watcher import mark_worker_online

self.timer_ref = consumer.timer.call_repeatedly(
TASKING_CONSTANTS.PULP_PROCESS_HEARTBEAT_INTERVAL,
self._on_tick,
(consumer, ),
priority=10,
)
mark_worker_online(consumer.hostname)
self._on_tick(consumer)

def stop(self, consumer):
Expand All @@ -91,6 +93,19 @@ def stop(self, consumer):
self.timer_ref.cancel()
self.timer_ref = None

def shutdown(self, consumer):
"""
Called when a worker is shutdown.
So far, this just marks the worker as offline.
:param consumer: The consumer instance
:type consumer: celery.worker.consumer.Consumer
"""
from pulpcore.tasking.services.worker_watcher import mark_worker_offline

# Mark the worker as offline
mark_worker_offline(consumer.hostname, normal_shutdown=True)

def _on_tick(self, consumer):
"""
This method regularly checks for offline workers and records worker heartbeats
Expand Down Expand Up @@ -171,26 +186,6 @@ def initialize_worker(sender, instance, **kwargs):
get_resource_manager_lock(sender)


@worker_shutdown.connect
def shutdown_worker(signal, sender, **kwargs):
"""
Called when a worker is shutdown.
So far, this just marks the worker as offline.
:param signal: The signal being sent to the workerTaskLock
:param type: int
:param instance: The hostname of the worker
:type instance: celery.apps.worker.Worker
:param kwargs: Other params (unused)
:type kwargs: dict
"""
from pulpcore.tasking.services.worker_watcher import mark_worker_offline

# Mark the worker as offline
mark_worker_offline(sender.hostname, normal_shutdown=True)


def get_resource_manager_lock(name):
"""Block until the the resource manager lock is acquired.
Tries to acquire the resource manager lock.
Expand All @@ -211,11 +206,7 @@ def get_resource_manager_lock(name):

lock = TaskLock(name=name, lock=TaskLock.RESOURCE_MANAGER)
worker, created = Worker.objects.get_or_create(name=name)

if not created:
worker.online = True
# Because the method save_heartbeat() can't save new models we have to save it now
worker.save()
worker.save_heartbeat()

# Whether this is the first lock availability check for this instance
_first_check = True
Expand Down
4 changes: 3 additions & 1 deletion pulpcore/pulpcore/tasking/constants.py
@@ -1,7 +1,9 @@
from types import SimpleNamespace

TASKING_CONSTANTS = SimpleNamespace(
# The name of the resource manager entry in the workers table
# The prefix provided to normal worker entries in the workers table
WORKER_PREFIX='reserved_resource_worker',
# The name of resource manager entries in the workers table
RESOURCE_MANAGER_WORKER_NAME='resource_manager',
# The amount of time (in seconds) between process wakeups to "heartbeat" and perform
# their tasks.
Expand Down
69 changes: 39 additions & 30 deletions pulpcore/pulpcore/tasking/services/worker_watcher.py
Expand Up @@ -27,6 +27,18 @@
_logger = logging.getLogger(__name__)


def mark_worker_online(worker_name):
""" Sets some bookkeeping values on the worker record for tracking worker state
Args:
worker_name (str): The hostname of the worker
"""
worker, created = Worker.objects.get_or_create(name=worker_name)
worker.gracefully_stopped = False
worker.cleaned_up = False
worker.save()


def handle_worker_heartbeat(worker_name):
"""
This is a generic function for updating worker heartbeat records.
Expand All @@ -37,20 +49,16 @@ def handle_worker_heartbeat(worker_name):
:param worker_name: The hostname of the worker
:type worker_name: basestring
"""
existing_worker, created = Worker.objects.get_or_create(name=worker_name)
worker, created = Worker.objects.get_or_create(name=worker_name)
if created:
msg = _("New worker '{name}' discovered").format(name=worker_name)
_logger.info(msg)
elif existing_worker.online is False:
msg = _("Worker '{name}' is back online.").format(name=worker_name)
_logger.info(msg)
existing_worker.online = True
existing_worker.save()
else:
existing_worker.save_heartbeat()
_logger.info(_("New worker '{name}' discovered").format(name=worker_name))
elif worker.online is False:
_logger.info(_("Worker '{name}' is back online.").format(name=worker_name))

worker.save_heartbeat()

msg = _("Worker heartbeat from '{name}' at time {timestamp}").format(
timestamp=existing_worker.last_heartbeat,
timestamp=worker.last_heartbeat,
name=worker_name
)

Expand All @@ -73,20 +81,18 @@ def check_celery_processes():
msg = _('Checking if pulp_workers or pulp_resource_manager processes are '
'missing for more than %d seconds') % TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL
_logger.debug(msg)
now = timezone.now()
oldest_heartbeat_time = now - timedelta(seconds=TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL)

for worker in Worker.objects.filter(last_heartbeat__lt=oldest_heartbeat_time, online=True):
for worker in Worker.objects.missing_workers():
msg = _("Worker '%s' has gone missing, removing from list of workers") % worker.name
_logger.error(msg)

mark_worker_offline(worker.name)

worker_count = Worker.objects.exclude(
name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME).filter(online=True).count()
worker_count = Worker.objects.online_workers().filter(
name__startswith=TASKING_CONSTANTS.WORKER_PREFIX).count()

resource_manager_count = Worker.objects.filter(
name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME).filter(online=True).count()
resource_manager_count = Worker.objects.online_workers().filter(
name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME).count()

if resource_manager_count == 0:
msg = _("There are 0 pulp_resource_manager processes running. Pulp will not operate "
Expand Down Expand Up @@ -119,7 +125,7 @@ def handle_worker_offline(worker_name):
mark_worker_offline(worker_name, normal_shutdown=True)


def mark_worker_offline(name, normal_shutdown=False):
def mark_worker_offline(worker_name, normal_shutdown=False):
"""
Mark the :class:`~pulpcore.app.models.Worker` as offline and cancel associated tasks.
Expand All @@ -130,32 +136,35 @@ def mark_worker_offline(name, normal_shutdown=False):
Any tasks associated with this worker are explicitly canceled.
:param name: The name of the worker you wish to be marked as offline.
:type name: basestring
:param normal_shutdown: True if the worker shutdown normally, False otherwise. Defaults to
False.
:type normal_shutdown: bool
Args:
worker_name (str) The name of the worker
normal_shutdown (bool): True if the worker shutdown normally, False otherwise. Defaults to
False.
"""
is_resource_manager = name.startswith(TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME)
is_resource_manager = worker_name.startswith(TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME)

if not normal_shutdown:
msg = _('The worker named %(name)s is missing. Canceling the tasks in its queue.')
msg = msg % {'name': name}
msg = msg % {'name': worker_name}
_logger.error(msg)
else:
msg = _("Cleaning up shutdown worker '%s'.") % name
msg = _("Cleaning up shutdown worker '%s'.") % worker_name
_logger.info(msg)

try:
worker = Worker.objects.get(name=name, online=True)
worker = Worker.objects.get(name=worker_name, gracefully_stopped=False, cleaned_up=False)
except Worker.DoesNotExist:
pass
else:
# Cancel all of the tasks that were assigned to this worker's queue
for task_status in worker.tasks.filter(state__in=TASK_INCOMPLETE_STATES):
cancel(task_status.pk)
worker.online = False

if normal_shutdown:
worker.gracefully_stopped = True

worker.cleaned_up = True
worker.save()

if is_resource_manager:
TaskLock.objects.filter(name=name).delete()
TaskLock.objects.filter(name=worker_name).delete()

0 comments on commit f852da1

Please sign in to comment.