Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Removes database backed celerybeat schedule
Browse files Browse the repository at this point in the history
This change also converts worker watcher to use Django. This patch renames the scheduler to pulp_celerybeat.

closes: #2158
https://pulp.plan.io/issues/2158
closes: #2156
https://pulp.plan.io/issues/2156
closes: #1978
https://pulp.plan.io/issues/1978

blah
  • Loading branch information
dkliban committed Oct 24, 2016
1 parent 7f42f40 commit 365f9fd
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 190 deletions.
2 changes: 1 addition & 1 deletion app/pulp/app/models/__init__.py
Expand Up @@ -9,7 +9,7 @@
RepositoryContent) # NOQA

from .catalog import DownloadCatalog # NOQA
from .task import ReservedResource, Worker, Task, TaskTag, TaskLock, ScheduledCalls # NOQA
from .task import ReservedResource, Worker, Task, TaskTag, TaskLock # NOQA

# Moved here to avoid a circular import with Task
from .progress import ProgressBar, ProgressSpinner # NOQA
56 changes: 0 additions & 56 deletions app/pulp/app/models/task.py
Expand Up @@ -226,59 +226,3 @@ class TaskTag(Model):
name = models.TextField()

task = models.ForeignKey("Task", related_name="tags", related_query_name="tag")


class ScheduledCalls(Model):
"""
Scheduled Call Request
Fields:
:cvar task: The task that should be run on a schedule
:type task: models.TextField
:cvar enabled: Indicates if schedule should be actively run by the scheduler
:type enabled: models.BooleanField
:cvar resource: Indicates a unique resource that should be used to find this schedule
:type resource: models.TextField
:cvar iso_schedule: ISO8601 string representing the schedule
:type iso_schedule: models.TextField
:cvar schedule: Pickled instance of celery.schedules.schedule that should be run.
:type schedule: models.TextField
:cvar first_run: The first time this schedule was ran
:type first_run: models.DateTimeField
:cvar last_run: Last time this schedule was ran
:type last_run: models.DateTimeField
:cvar total_run_count: Number of times this schedle has ran
:type total_run_count: models.IntegerField
:cvar last_updated: The last time this schedule was saved to the database
:type last_updated: models.DateTimeField
:cvar args: Arguments that should be passed to the apply_async function
:type args: models.JSONField
:cvar kwargs: Keyword arguments that should be passed to the apply_async function
:type kwargs: models.JSONField
"""
task = models.TextField()
enabled = models.BooleanField(default=True)
resource = models.TextField(null=True)

iso_schedule = models.TextField()
schedule = models.TextField(null=True)

first_run = models.DateTimeField()
last_run = models.DateTimeField(null=True)
total_run_count = models.IntegerField()

last_updated = models.DateTimeField()

args = JSONField()
kwargs = JSONField()
18 changes: 16 additions & 2 deletions tasking/pulp/tasking/constants.py
Expand Up @@ -4,6 +4,20 @@
# The interval in seconds for which the Celery Process monitor thread sleeps between
# checking for missing Celery processes.
CELERY_CHECK_INTERVAL=60,
# Resource manager worker name
RESOURCE_MANAGER_WORKER_NAME='resource_manager'
# The interval in seconds during which the Celery Processes being monitored need to report a
# hearbeat to be considered active.
HEARTBEAT_MAX_AGE=200,
# The interval in seconds with which a secondary celerybeat tries to acquire the lock
# that is needed to be the primary celerybeat.
CELERYBEAT_LOCK_RETRY_TIME=90,
# The name of the resource manager entry in the workers table
RESOURCE_MANAGER_WORKER_NAME='resource_manager',
# The name of the celerybeat entry in the workers table
CELERYBEAT_WORKER_NAME = 'celerybeat',
# The maximum amount of time in seconds celerybeat sleeps before checking for more scheduled
# work to dispatch.
CELERYBEAT_MAX_SLEEP_INTERVAL=90,
# The amount of time in seconds that a celerybeat lock is valid for. If the lock is older than
# the value specified here, the process holding the lock is considered dead.
CELERYBEAT_LOCK_MAX_AGE=200
)
@@ -1,4 +1,3 @@
import itertools
import logging
import platform
import threading
Expand All @@ -7,20 +6,15 @@
from gettext import gettext as _

from celery import beat
from django.utils import timezone
from django.db import IntegrityError

import mongoengine

from pulp.common import constants
from pulp.common.dateutils import ensure_tz
from pulp.server.async import worker_watcher
from pulp.server.db import connection as db_connection
from pulp.server.db.connection import UnsafeRetry
from pulp.server.db.model import CeleryBeatLock, Worker
from pulp.server.db.model.dispatch import ScheduledCall, ScheduleEntry
from pulp.server.managers.schedule import utils
from pulp.tasking import delete_worker
from pulp.app.models.task import TaskLock, Worker
from pulp.tasking import worker_watcher
from pulp.tasking.celery_instance import celery as app
from pulp.tasking.constants import TASKING_CONSTANTS
from pulp.tasking.constants import TASKING_CONSTANTS as constants
from pulp.tasking import delete_worker


# The import below is not used in this module, but it needs to be kept here. This module is the
# first and only Pulp module to be imported by celerybeat, and by importing pulp.server.logs, it
Expand Down Expand Up @@ -115,30 +109,29 @@ def check_celery_processes(self):
correctly.
"""
msg = _('Checking if pulp_workers, pulp_celerybeat, or pulp_resource_manager processes '
'are missing for more than %d seconds') % constants.CELERY_TIMEOUT_SECONDS
'are missing for more than %d seconds') % constants.HEARTBEAT_MAX_AGE
_logger.debug(msg)
now = ensure_tz(datetime.utcnow())
oldest_heartbeat_time = now - timedelta(seconds=constants.CELERY_TIMEOUT_SECONDS)
worker_list = Worker.objects.all()
worker_count = 0
resource_manager_count = 0
scheduler_count = 0

for worker in worker_list:
if worker.last_heartbeat < oldest_heartbeat_time:
msg = _("Worker '%s' has gone missing, removing from list of workers") % worker.name
_logger.error(msg)

if worker.name.startswith(constants.SCHEDULER_WORKER_NAME):
worker.delete()
else:
delete_worker(worker.name)
elif worker.name.startswith(constants.SCHEDULER_WORKER_NAME):
scheduler_count = scheduler_count + 1
elif worker.name.startswith(TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME):
resource_manager_count = resource_manager_count + 1
now = timezone.now()
oldest_heartbeat_time = now - timedelta(seconds=constants.HEARTBEAT_MAX_AGE)

for worker in Worker.objects.filter(last_heartbeat__lt=oldest_heartbeat_time):
msg = _("Worker '%s' has gone missing, removing from list of workers") % worker.name
_logger.error(msg)

if worker.name.startswith(constants.CELERYBEAT_WORKER_NAME):
worker.delete()
else:
worker_count = worker_count + 1
delete_worker(worker.name)

worker_count = Worker.objects.exclude(
name__startswith=constants.RESOURCE_MANAGER_WORKER_NAME).exclude(
name__startswith=constants.CELERYBEAT_WORKER_NAME).count()

scheduler_count = Worker.objects.filter(
name__startswith=constants.CELERYBEAT_WORKER_NAME).count()

resource_manager_count = Worker.objects.filter(
name__startswith=constants.RESOURCE_MANAGER_WORKER_NAME).count()

if resource_manager_count == 0:
msg = _("There are 0 pulp_resource_manager processes running. Pulp will not operate "
Expand All @@ -149,6 +142,7 @@ def check_celery_processes(self):
msg = _("There are 0 pulp_celerybeat processes running. Pulp will not operate "
"correctly without at least one pulp_celerybeat process running.")
_logger.error(msg)

output_dict = {'workers': worker_count, 'celerybeat': scheduler_count,
'resource_manager': resource_manager_count}
msg = _("%(workers)d pulp_worker processes, %(celerybeat)d "
Expand All @@ -173,14 +167,9 @@ class Scheduler(beat.Scheduler):
second, is a WorkerTimeoutMonitor thread that watches for cases where all workers disappear
at once.
"""
Entry = ScheduleEntry

# the superclass reads this attribute, which is the maximum number of seconds
# that will ever elapse before the scheduler looks for new or changed schedules.
max_interval = constants.CELERY_MAX_INTERVAL

# allows mongo initialization to occur exactly once during the first call to setup_schedule()
_mongo_initialized = False
max_interval = constants.CELERYBEAT_MAX_SLEEP_INTERVAL

def __init__(self, *args, **kwargs):
"""
Expand All @@ -194,7 +183,7 @@ def __init__(self, *args, **kwargs):
self._loaded_from_db_count = 0

# Setting the celerybeat name
self.celerybeat_name = constants.SCHEDULER_WORKER_NAME + "@" + platform.node()
self.celerybeat_name = constants.CELERYBEAT_WORKER_NAME + "@" + platform.node()

# Force the use of the Pulp celery_instance when this custom Scheduler is used.
kwargs['app'] = app
Expand Down Expand Up @@ -260,110 +249,32 @@ def tick(self):
old_timestamp = datetime.utcnow() - timedelta(seconds=constants.CELERYBEAT_LOCK_MAX_AGE)

# Updating the current lock if lock is on this instance of celerybeat
result = CeleryBeatLock.objects(celerybeat_name=self.celerybeat_name).\
update(set__timestamp=datetime.utcnow())

# If current instance has lock and updated lock_timestamp, call super
if result == 1:
try:
celerybeat_lock = TaskLock.objects.get(name=self.celerybeat_name)
celerybeat_lock.timestamp = timezone.now()
celerybeat_lock.save()
# If current instance has lock and updated lock_timestamp, call super
_logger.debug(_('Lock updated by %(celerybeat_name)s')
% {'celerybeat_name': self.celerybeat_name})
ret = self.call_tick(self, self.celerybeat_name)
else:
# check for old enough time_stamp and remove if such lock is present
CeleryBeatLock.objects(timestamp__lte=old_timestamp).delete()
except TaskLock.DoesNotExist:
TaskLock.objects.get(name=self.celerybeat_name, timestamp__lte=old_timestamp).delete()
try:
lock_timestamp = datetime.utcnow()

# Insert new lock entry
new_lock = CeleryBeatLock(celerybeat_name=self.celerybeat_name,
timestamp=lock_timestamp)
new_lock.save()
TaskLock.objects.create(name=self.celerybeat_name, lock=TaskLock.CELERY_BEAT)
_logger.info(_("New lock acquired by %(celerybeat_name)s") %
{'celerybeat_name': self.celerybeat_name})
# After acquiring new lock call super to dispatch tasks
ret = self.call_tick(self, self.celerybeat_name)

except mongoengine.NotUniqueError:
except IntegrityError:
# Setting a default wait time for celerybeat instances with no lock
ret = constants.CELERY_TICK_DEFAULT_WAIT_TIME
ret = constants.CELERYBEAT_LOCK_RETRY_TIME
_logger.info(_("Duplicate or new celerybeat Instance, "
"ticking again in %(ret)s seconds.")
% {'ret': ret})
return ret

def setup_schedule(self):
"""
This loads enabled schedules from the database and adds them to the
"_schedule" dictionary as instances of celery.beat.ScheduleEntry
"""
if not Scheduler._mongo_initialized:
_logger.debug('Initializing Mongo client connection to read celerybeat schedule')
db_connection.initialize()
Scheduler._mongo_initialized = True
_logger.debug(_('loading schedules from app'))
self._schedule = {}
for key, value in self.app.conf.CELERYBEAT_SCHEDULE.iteritems():
self._schedule[key] = beat.ScheduleEntry(**dict(value, name=key))

# include a "0" as the default in case there are no schedules to load
update_timestamps = [0]

_logger.debug(_('loading schedules from DB'))
ignored_db_count = 0
self._loaded_from_db_count = 0
for call in itertools.imap(ScheduledCall.from_db, utils.get_enabled()):
if call.remaining_runs == 0:
_logger.debug(
_('ignoring schedule with 0 remaining runs: %(id)s') % {'id': call.id})
ignored_db_count += 1
else:
self._schedule[call.id] = call.as_schedule_entry()
update_timestamps.append(call.last_updated)
self._loaded_from_db_count += 1

_logger.debug('loaded %(count)d schedules' % {'count': self._loaded_from_db_count})

self._most_recent_timestamp = max(update_timestamps)

@property
@UnsafeRetry.retry_decorator()
def schedule_changed(self):
"""
Looks at the update timestamps in the database to determine if there
are new or modified schedules.
Indexing should make this very fast.
:return: True iff the set of enabled scheduled calls has changed
in the database.
:rtype: bool
"""
if utils.get_enabled().count() != self._loaded_from_db_count:
logging.debug(_('number of enabled schedules has changed'))
return True

if utils.get_updated_since(self._most_recent_timestamp).count() > 0:
logging.debug(_('one or more enabled schedules has been updated'))
return True

return False

@property
def schedule(self):
"""
:return: dictionary where keys are schedule ids and values are
instances of celery.beat.ScheduleEntry. These instances are
the schedules currently in active use by the scheduler.
:rtype: dict
"""
if self._schedule is None:
return self.get_schedule()

if self.schedule_changed:
self.setup_schedule()

return self._schedule

def add(self, **kwargs):
"""
This class does not support adding entries in-place. You must add new
Expand All @@ -372,5 +283,5 @@ def add(self, **kwargs):
raise NotImplementedError

def close(self):
_delete_worker(self.celerybeat_name, normal_shutdown=True)
delete_worker(self.celerybeat_name, normal_shutdown=True)
super(Scheduler, self).close()

0 comments on commit 365f9fd

Please sign in to comment.