Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Merge pull request #3369 from dralley/3121-worker-online
Browse files Browse the repository at this point in the history
Simplify worker 'online' state tracking
  • Loading branch information
dralley committed Mar 22, 2018
2 parents 99cb168 + 501cdd8 commit d4466ce
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 113 deletions.
75 changes: 58 additions & 17 deletions pulpcore/pulpcore/app/models/task.py
Expand Up @@ -69,19 +69,35 @@ def online_workers(self):
"""
Returns a queryset of workers meeting the criteria to be considered 'online'
To be considered 'online', a worker should both have its 'online' field set to True
and have a recent heartbeat. "Recent" is defined here as "within the pulp process timeout
interval".
To be considered 'online', a worker must have a recent heartbeat timestamp and must not
have the 'gracefully_stopped' flag set to True. "Recent" is defined here as "within
the pulp process timeout interval".
Returns:
:class:`django.db.models.query.QuerySet`: A query set of the Worker objects which
are considered by Pulp to be 'online'.
"""
now = timezone.now()
age_threshold = now - timedelta(seconds=TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL)
return self.filter(name__startswith='reserved',
last_heartbeat__gte=age_threshold,
online=True)

return self.filter(last_heartbeat__gte=age_threshold, gracefully_stopped=False)

def missing_workers(self):
"""
Returns a queryset of workers meeting the criteria to be considered 'missing'
To be considered missing, a worker must have a stale timestamp while also having
cleaned_up=False, meaning that it has not been cleaned up after an improper shutdown.
Stale is defined here as "beyond the pulp process timeout interval".
Returns:
:class:`django.db.models.query.QuerySet`: A query set of the Worker objects which
are considered by Pulp to be 'missing'.
"""
now = timezone.now()
age_threshold = now - timedelta(seconds=TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL)

return self.filter(last_heartbeat__lt=age_threshold, cleaned_up=False)

def with_reservations(self, resources):
"""
Expand Down Expand Up @@ -117,16 +133,31 @@ class Worker(Model):

name = models.TextField(db_index=True, unique=True)
last_heartbeat = models.DateTimeField(auto_now=True)
online = models.BooleanField(default=True)
gracefully_stopped = models.BooleanField(default=True)
cleaned_up = models.BooleanField(default=False)

def save_heartbeat(self):
"""Save a worker heartbeat
@property
def online(self):
"""
Whether a worker can be considered 'online'
Update the last_heartbeat field to now and save it.
To be considered 'online', a worker must have a recent heartbeat timestamp and must not
have the 'gracefully_stopped' flag set to True. "Recent" is defined here as "within
the pulp process timeout interval".
Warnings:
Returns:
bool: True if the worker is considered online, otherwise False
"""
now = timezone.now()
age_threshold = now - timedelta(seconds=TASKING_CONSTANTS.PULP_PROCESS_TIMEOUT_INTERVAL)

return not self.gracefully_stopped and self.last_heartbeat >= age_threshold

def save_heartbeat(self):
"""
Update the last_heartbeat field to now and save it.
1) Only the last_heartbeat field will be saved. No other changes will be saved.
Only the last_heartbeat field will be saved. No other changes will be saved.
Raises:
ValueError: When the model instance has never been saved before. This method can
Expand Down Expand Up @@ -161,21 +192,31 @@ class TaskLock(Model):
Fields:
name (models.TextField): The name of the item that has the lock
timestamp (models.DateTimeField): The time the lock was acquired
timestamp (models.DateTimeField): The time the lock was last updated
lock (models.TextField): The name of the lock acquired
"""
CELERY_BEAT = 'CeleryBeat'
RESOURCE_MANAGER = 'ResourceManager'
LOCK_STRINGS = (
(CELERY_BEAT, 'Celery Beat Lock'),
(RESOURCE_MANAGER, 'Resource Manager Lock')
(RESOURCE_MANAGER, 'Resource Manager Lock'),
)

name = models.TextField(db_index=True, unique=True)
timestamp = models.DateTimeField(auto_now_add=True)
timestamp = models.DateTimeField(auto_now=True)
lock = models.TextField(unique=True, null=False, choices=LOCK_STRINGS)

def update_timestamp(self):
"""
Update the timestamp field to now and save it.
Only the timestamp field will be saved. No other changes will be saved.
Raises:
ValueError: When the model instance has never been saved before. This method can
only update an existing database record.
"""
self.save(update_fields=['timestamp'])


class Task(Model):
"""
Expand Down
11 changes: 9 additions & 2 deletions pulpcore/pulpcore/app/serializers/task.py
Expand Up @@ -115,10 +115,17 @@ class WorkerSerializer(ModelSerializer):
)

online = serializers.BooleanField(
help_text='Whether the worker is online or not. Defaults to True.',
help_text=_('True if the worker is considered online, otherwise False'),
read_only=True
)

gracefully_stopped = serializers.BooleanField(
help_text=_('True when the worker was shut down properly, False when the worker is \
online, or if it crashed (determined by timestamp).'),
read_only=True
)

class Meta:
model = models.Worker
fields = ModelSerializer.Meta.fields + ('name', 'last_heartbeat', 'online')
fields = ModelSerializer.Meta.fields + ('name', 'last_heartbeat',
'online', 'gracefully_stopped')
92 changes: 35 additions & 57 deletions pulpcore/pulpcore/tasking/celery_app.py
Expand Up @@ -11,7 +11,7 @@
from gettext import gettext as _

from celery import bootsteps
from celery.signals import celeryd_after_setup, worker_shutdown
from celery.signals import celeryd_after_setup
from django.db.utils import IntegrityError
from django.utils import timezone

Expand All @@ -33,15 +33,15 @@ class PulpWorkerStep(bootsteps.StartStopStep):
Adds pulp recurrent logic to celery workers.
This class is a celery "Blueprint". It extends the functionality of the celery
worker by establishing a timer on consumer startup which calls the '_on_tick()'
consumer by establishing a timer on consumer startup which calls the '_on_tick()'
method periodically. This allows each worker to write its own worker record to the
database.
http://docs.celeryproject.org/en/master/userguide/extending.html
https://groups.google.com/d/msg/celery-users/3fs0ocREYqw/C7U1lCAp56sJ
:param worker: The worker instance (unused)
:type worker: celery.apps.worker.Worker
Args:
consumer (celery.worker.consumer.Consumer): The consumer instance (unused)
"""

def __init__(self, consumer, **kwargs):
Expand All @@ -52,8 +52,8 @@ def __init__(self, consumer, **kwargs):
consumer instance as the first argument and all keyword arguments from the original
consumer.__init__ call.
:param worker: The consumer instance (unused)
:type worker: celery.worker.consumer.Consumer
Args:
consumer (celery.worker.consumer.Consumer): The consumer instance (unused)
"""
self.timer_ref = None

Expand All @@ -65,16 +65,18 @@ def start(self, consumer):
reset (which triggers an internal restart). The timer is reset when the connection is lost,
so we have to install the timer again for every call to self.start.
:param worker: The consumer instance
:type worker: celery.worker.consumer.Consumer
Args:
consumer (celery.worker.consumer.Consumer): The consumer instance (unused)
"""
from pulpcore.tasking.services.worker_watcher import mark_worker_online

self.timer_ref = consumer.timer.call_repeatedly(
TASKING_CONSTANTS.PULP_PROCESS_HEARTBEAT_INTERVAL,
self._on_tick,
(consumer, ),
priority=10,
)
mark_worker_online(consumer.hostname)
self._on_tick(consumer)

def stop(self, consumer):
Expand All @@ -84,34 +86,38 @@ def stop(self, consumer):
This method is called every time the worker is restarted (i.e. connection is lost)
and also at shutdown.
:param worker: The consumer instance (unused)
:type worker: celery.worker.consumer.Consumer
Args:
consumer (celery.worker.consumer.Consumer): The consumer instance (unused)
"""
if self.timer_ref:
self.timer_ref.cancel()
self.timer_ref = None

def shutdown(self, consumer):
"""
Called when a worker is shutdown.
So far, this just marks the worker as offline.
Args:
consumer (celery.worker.consumer.Consumer): The consumer instance (unused)
"""
from pulpcore.tasking.services.worker_watcher import mark_worker_offline

# Mark the worker as offline
mark_worker_offline(consumer.hostname, normal_shutdown=True)

def _on_tick(self, consumer):
"""
This method regularly checks for offline workers and records worker heartbeats
:param worker: The consumer instance
:type worker: celery.worker.consumer.Consumer
Args:
consumer (celery.worker.consumer.Consumer): The consumer instance (unused)
"""
from pulpcore.tasking.services.worker_watcher import (check_celery_processes,
handle_worker_heartbeat)
from pulpcore.app.models.task import Worker

name = consumer.hostname

check_celery_processes()
handle_worker_heartbeat(name)

# If the worker is a resource manager, update the associated ResourceManagerLock timestamp
if name.startswith(TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME):
worker_lock = Worker.objects.get(name=name)
worker_lock.timestamp = timezone.now()
worker_lock.save()
handle_worker_heartbeat(consumer.hostname)


celery.steps['consumer'].add(PulpWorkerStep)
Expand Down Expand Up @@ -149,14 +155,10 @@ def initialize_worker(sender, instance, **kwargs):
[0] http://celery.readthedocs.org/en/latest/userguide/signals.html#celeryd-after-setup
:param sender: The hostname of the worker
:type sender: basestring
:param instance: The Worker instance to be initialized (unused)
:type instance: celery.apps.worker.Worker
:param kwargs: Other params (unused)
:type kwargs: dict
Args:
sender (str): The hostname of the worker
instance (celery.apps.worker.Worker) The Worker instance to be initialized (unused)
kwargs (dict): (unused)
"""
from pulpcore.tasking.services.worker_watcher import mark_worker_offline

Expand All @@ -171,26 +173,6 @@ def initialize_worker(sender, instance, **kwargs):
get_resource_manager_lock(sender)


@worker_shutdown.connect
def shutdown_worker(signal, sender, **kwargs):
"""
Called when a worker is shutdown.
So far, this just marks the worker as offline.
:param signal: The signal being sent to the workerTaskLock
:param type: int
:param instance: The hostname of the worker
:type instance: celery.apps.worker.Worker
:param kwargs: Other params (unused)
:type kwargs: dict
"""
from pulpcore.tasking.services.worker_watcher import mark_worker_offline

# Mark the worker as offline
mark_worker_offline(sender.hostname, normal_shutdown=True)


def get_resource_manager_lock(name):
"""Block until the the resource manager lock is acquired.
Tries to acquire the resource manager lock.
Expand All @@ -202,20 +184,16 @@ def get_resource_manager_lock(name):
API. This worker record will be cleaned up through the regular worker
shutdown routine.
:param name: The hostname of the worker
:type name: basestring
Args:
name (str): The hostname of the worker
"""
from pulpcore.app.models.task import TaskLock, Worker

assert name.startswith(TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME)

lock = TaskLock(name=name, lock=TaskLock.RESOURCE_MANAGER)
worker, created = Worker.objects.get_or_create(name=name)

if not created:
worker.online = True
# Because the method save_heartbeat() can't save new models we have to save it now
worker.save()
worker.save_heartbeat()

# Whether this is the first lock availability check for this instance
_first_check = True
Expand Down
4 changes: 3 additions & 1 deletion pulpcore/pulpcore/tasking/constants.py
@@ -1,7 +1,9 @@
from types import SimpleNamespace

TASKING_CONSTANTS = SimpleNamespace(
# The name of the resource manager entry in the workers table
# The prefix provided to normal worker entries in the workers table
WORKER_PREFIX='reserved_resource_worker',
# The name of resource manager entries in the workers table
RESOURCE_MANAGER_WORKER_NAME='resource_manager',
# The amount of time (in seconds) between process wakeups to "heartbeat" and perform
# their tasks.
Expand Down

0 comments on commit d4466ce

Please sign in to comment.