Skip to content

Commit

Permalink
Added worker_cleanup to the tasking system
Browse files Browse the repository at this point in the history
All workers that are considered offline will be removed from the database
periodically.

backports #8931

fixes #9462

(cherry picked from commit 5bbf959)
  • Loading branch information
dralley committed Sep 29, 2021
1 parent 69b2e1d commit 7d0cbdd
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGES/9462.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added a periodical cleanup to the pulpcore-worker class to keep the `Worker` table clean.
(backported from #8931)
16 changes: 16 additions & 0 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ def online_workers(self):

return self.filter(last_heartbeat__gte=age_threshold, gracefully_stopped=False)

def offline_workers(self):
"""
Returns a queryset of workers meeting the criteria to be considered 'offline'
To be considered 'offline', a worker must have no recent heartbeat timestamp.
"Recent" is defined here as "within the pulp process timeout interval".
Returns:
:class:`django.db.models.query.QuerySet`: A query set of the Worker objects which
are considered by Pulp to be 'offline'.
"""
now = timezone.now()
age_threshold = now - timedelta(seconds=settings.WORKER_TTL)

return self.filter(last_heartbeat__lte=age_threshold)

def missing_workers(self):
"""
Returns a queryset of workers meeting the criteria to be considered 'missing'
Expand Down
28 changes: 24 additions & 4 deletions pulpcore/tasking/pulpcore_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import random
import select
import signal
import socket
Expand All @@ -27,7 +28,7 @@
)
from django_guid.middleware import GuidMiddleware # noqa: E402: module level not at top of file

from pulpcore.app.models import Task # noqa: E402: module level not at top of file
from pulpcore.app.models import Worker, Task # noqa: E402: module level not at top of file

from pulpcore.constants import ( # noqa: E402: module level not at top of file
TASK_STATES,
Expand All @@ -41,6 +42,10 @@


_logger = logging.getLogger(__name__)
random.seed()

TASK_GRACE_INTERVAL = 3
WORKER_CLEANUP_INTERVAL = 100


class NewPulpWorker:
Expand All @@ -50,6 +55,10 @@ def __init__(self):
self.heartbeat_period = settings.WORKER_TTL / 3
self.cursor = connection.cursor()
self.worker = handle_worker_heartbeat(self.name)
self.task_grace_timeout = 0
self.worker_cleanup_countdown = random.randint(
WORKER_CLEANUP_INTERVAL / 10, WORKER_CLEANUP_INTERVAL
)

# Add a file descriptor to trigger select on signals
self.sentinel, sentinel_w = os.pipe()
Expand All @@ -70,11 +79,22 @@ def shutdown(self):
self.worker.delete()
_logger.info(f"Worker {self.name} was shut down.")

def worker_cleanup(self):
qs = Worker.objects.offline_workers()
if qs:
for worker in qs:
_logger.info(f"Clean offline worker {worker.name}.")
worker.delete()

def beat(self):
if self.worker.last_heartbeat < timezone.now() - timedelta(seconds=self.heartbeat_period):
self.worker = handle_worker_heartbeat(self.name)
if self.shutdown_requested:
self.task_grace -= 1
self.task_grace_timeout -= 1
self.worker_cleanup_countdown -= 1
if self.worker_cleanup_countdown <= 0:
self.worker_cleanup_countdown = WORKER_CLEANUP_INTERVAL
self.worker_cleanup()

def notify_workers(self):
self.cursor.execute("NOTIFY pulp_worker_wakeup")
Expand Down Expand Up @@ -166,7 +186,7 @@ def supervise_task(self, task):
This function must only be called while holding the lock for that task."""

self.task_grace = 3
self.task_grace_timeout = TASK_GRACE_INTERVAL
self.cursor.execute("LISTEN pulp_worker_cancel")
task.worker = self.worker
task.save(update_fields=["worker"])
Expand Down Expand Up @@ -199,7 +219,7 @@ def supervise_task(self, task):
if self.sentinel in r:
os.read(self.sentinel, 256)
if self.shutdown_requested:
if self.task_grace > 0:
if self.task_grace_timeout > 0:
_logger.info(
f"Worker shutdown requested, waiting for task {task.pk} to finish."
)
Expand Down

0 comments on commit 7d0cbdd

Please sign in to comment.