Skip to content

Commit

Permalink
Introduce Worker.last_seen, worker's last communication with hub
Browse files Browse the repository at this point in the history
This commit introduces a new attribute, last_seen, onto the Worker
model used by hub. This field contains the timestamp for the worker's
last XML-RPC call to hub and is kept up-to-date automatically.
It is accessible from 'export', which means it appears in the responses
to get_worker_info method.

The motivation is to improve our ability to monitor workers.
Prior to this, no attributes on the model can be used to reliably
determine whether a worker is alive, or even whether it truly exists.
For example, a worker can have enabled=True, ready=True even if it
uses a nonexistent hostname.

The time at which a worker last called the hub is a useful signal to
determine whether workers are alive, so let's start tracking it.

Note that the way this is implemented is quite different from how it
would be in other circumstances. In particular, if this were implemented
when the model was originally defined, it'd just be another column
in the DB. The problem with doing this now is that the last DB schema
change was ~5 years ago and I doubt that most kobo-using services have a
process in place for running migrations around upgrades. I know this is
true at least for Pub.

For that reason, the info is maintained using 0-byte state files. This
is expected to be compatible with all environments without requiring
migrations.
  • Loading branch information
rohanpm committed Jul 7, 2021
1 parent 14189d9 commit 34a6d85
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 1 deletion.
3 changes: 3 additions & 0 deletions kobo/admin/templates/hub/settings.py.template
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ TASK_DIR = os.path.join(FILES_PATH, 'tasks')
# Root directory for uploaded files
UPLOAD_DIR = os.path.join(FILES_PATH, 'upload')

# Used for additional per-worker state
WORKER_DIR = os.path.join(FILES_PATH, 'worker')

# Absolute path to the directory that holds media.
# Example: "/home/media/media.lawrence.com/"
MEDIA_ROOT = os.path.join(PROJECT_DIR, "media/")
Expand Down
9 changes: 8 additions & 1 deletion kobo/hub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
raise ImproperlyConfigured("'%s' is missing in project settings. It must be set to run kobo.hub app." % var)


for var in ["TASK_DIR", "UPLOAD_DIR"]:
if not hasattr(settings, "WORKER_DIR"):
# This setting introduced in 2021 can be defaulted to ensure backwards compatibility
# with existing config files.
worker_dir = os.path.join(os.path.dirname(settings.TASK_DIR), 'worker')
setattr(settings, "WORKER_DIR", worker_dir)


for var in ["TASK_DIR", "UPLOAD_DIR", "WORKER_DIR"]:
dir_path = getattr(settings, var)
if not os.path.isdir(dir_path):
try:
Expand Down
2 changes: 2 additions & 0 deletions kobo/hub/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def _new_func(request, *args, **kwargs):
if getattr(request, 'worker', None) is None:
raise SuspiciousOperation("User doesn't match any worker: %s" % request.user.username)

request.worker.update_last_seen()

return func(request, *args, **kwargs)

_new_func.__name__ = func.__name__
Expand Down
45 changes: 45 additions & 0 deletions kobo/hub/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from __future__ import print_function
import os
import errno
import sys
import datetime
import base64
import gzip
import shutil
import logging
Expand Down Expand Up @@ -226,6 +228,7 @@ def export(self):
"ready": self.ready,
"task_count": self.task_count,
"current_load": self.current_load,
"last_seen": self.last_seen_iso8601,

# Add the hub version.
# This can be used for taskd compatibility checking everytime a worker_info is updated.
Expand All @@ -247,6 +250,48 @@ def assigned_tasks(self):
"""Return list of assigned tasks to this worker."""
return Task.objects.assigned().filter(worker=self)

@property
def last_seen(self):
"""Time of this worker's last communication with hub,
or None if unknown.
:rtype: datetime.datetime
"""

try:
stat = os.stat(self._state_path)
except EnvironmentError as error:
if error.errno == errno.ENOENT:
return None
raise

return datetime.datetime.utcfromtimestamp(stat.st_mtime)

@property
def last_seen_iso8601(self):
"""Time of this worker's last communication with hub
as ISO8601-formatted timestamp, or None if unknown.
Example: '2007-04-05T14:30Z'
:rtype: str
"""
when = self.last_seen
if when:
return when.replace(microsecond=0).isoformat() + 'Z'

def update_last_seen(self):
"""Mark worker as having communicated with hub at the current time."""
with open(self._state_path, 'w'):
pass

@property
def _state_path(self):
# Returns path to worker's state file.
# We don't know what characters may have been used in the worker name here,
# so it's base64 encoded first.
safe_name = base64.urlsafe_b64encode(self.name.encode('utf-8')).decode()
return os.path.join(settings.WORKER_DIR, safe_name)

def update_worker(self, enabled, ready, task_count):
"""Update worker attributes. Return worker_info.
Expand Down
24 changes: 24 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ def test_export(self):
self.assertEquals(data['task_count'], 0)
self.assertEquals(data['current_load'], 0)
self.assertEquals(data['version'], '1.0.0')
self.assertIsNone(data['last_seen'])

def test_export_last_seen(self):
worker = Worker.objects.create(worker_key='worker', name='Worker')

# Force a known value for timestamp.
with patch('os.stat') as mock_stat:
mock_stat.return_value.st_mtime = 1625695768
data = worker.export()

self.assertEquals(data['last_seen'], '2021-07-07T22:09:28Z')

def test_export_with_arch_and_channel(self):
worker = Worker.objects.create(worker_key='worker', name='Worker')
Expand Down Expand Up @@ -227,6 +238,19 @@ def test_assigned_tasks(self):
tasks = worker.assigned_tasks()
self.assertEquals(len(tasks), 1)

def test_last_seen(self):
worker = Worker.objects.create(worker_key='worker', name='Worker')

# Initially, there should not be any last_seen.
self.assertIsNone(worker.last_seen)

# If I update it...
worker.update_last_seen()

# Now there should be a recent value (some fuzz allowed)
last_seen_delta = datetime.utcnow() - worker.last_seen
self.assertTrue(abs(last_seen_delta) < timedelta(seconds=5))

@pytest.mark.xfail(reason='Check issue #68 for more info (https://git.io/fxSZ2).')
def test_update_worker(self):
worker = Worker.objects.create(worker_key='worker', name='Worker')
Expand Down

0 comments on commit 34a6d85

Please sign in to comment.