Skip to content

Commit

Permalink
Workers now auto-name {pid}@{hostname}
Browse files Browse the repository at this point in the history
Requiring users to specify a name for task workers was difficult and not
productive. Having workers auto-name as {pid}@{hostname} is much easier
to operate.

This also updates the docs to match.

https://pulp.plan.io/issues/5787
closes #5787
  • Loading branch information
bmbouter committed Nov 26, 2019
1 parent 2ac6d55 commit 76ad575
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 63 deletions.
4 changes: 4 additions & 0 deletions CHANGES/5787.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Workers no longer require names, and auto-name as {pid}@{fqdn}. This allows easy finding of
processes from the Status API. Custom names still work by specifying the ``-n`` option when starting
a worker. Any worker name starting with ``resource-manager`` is a resource-manager, otherwise it's
assumed to be a task worker.
2 changes: 1 addition & 1 deletion containers/images/pulp/container-assets/pulp-worker
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

# TODO: Set ${PULP_WORKER_NUMBER} to the Pod Number
# In the meantime, the hostname provides uniqueness.
exec rq worker --url "redis://${REDIS_SERVICE_HOST}:${REDIS_SERVICE_PORT}" -n "reserved-resource-worker-${PULP_WORKER_NUMBER}@${HOSTNAME}" -w "pulpcore.tasking.worker.PulpWorker" -c "pulpcore.rqconfig"
exec rq worker --url "redis://${REDIS_SERVICE_HOST}:${REDIS_SERVICE_PORT}" -w "pulpcore.tasking.worker.PulpWorker" -c "pulpcore.rqconfig"
6 changes: 4 additions & 2 deletions docs/components.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ Pulp's tasking system has two components: a resource manager and workers, all of
Worker
Pulp workers perform most tasks "run" by the tasking system including long-running tasks like
synchronize and short-running tasks like a Distribution update. Each worker handles one task at a
time, and additional workers provide more concurrency.
time, and additional workers provide more concurrency. Workers auto-name and are auto-discovered,
so they can be started and stopped without notifying Pulp.

Resource Manager
A different type of Pulp worker that plays a coordinating role for the tasking system. You must
run exactly one of these for Pulp to operate correctly.
run exactly one of these for Pulp to operate correctly. The ``resource-manager`` is identified by
configuring its name with the ``-n 'resource_manager'``.

.. note::

Expand Down
7 changes: 4 additions & 3 deletions docs/installation/instructions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ PyPI Installation
$ pip install -e ./pulpcore[postgres]


5. Follow the :ref:`configuration instructions <configuration>` to set the ``SECRET_KEY``.
5. Follow the :ref:`configuration instructions <configuration>` to set ``SECRET_KEY`` and
``CONTENT_ORIGIN``.

6. Create the ``MEDIA_ROOT`` & ``WORKING_DIRECTORY`` with the prescribed permissions from the
:ref:`configuration instructions. <configuration>`
Expand All @@ -76,8 +77,8 @@ PyPI Installation
the commands yourself inside of a shell. This is fine for development but not recommended in production::

$ /path/to/python/bin/rq worker -n 'resource-manager@%h' -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig'
$ /path/to/python/bin/rq worker -n 'reserved-resource-worker-1@%h' -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig'
$ /path/to/python/bin/rq worker -n 'reserved-resource-worker-2@%h' -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig'
$ /path/to/python/bin/rq worker -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig'
$ /path/to/python/bin/rq worker -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig'

8. Run Django Migrations::

Expand Down
4 changes: 3 additions & 1 deletion pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ def get_unreserved_worker(self):
Raises:
Worker.DoesNotExist: If all Workers have at least one ReservedResource entry.
"""
workers_qs = self.online_workers().filter(name__startswith=TASKING_CONSTANTS.WORKER_PREFIX)
workers_qs = self.online_workers().exclude(
name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME
)
workers_qs_with_counts = workers_qs.annotate(models.Count('reservations'))
try:
return workers_qs_with_counts.filter(reservations__count=0).order_by('?')[0]
Expand Down
2 changes: 0 additions & 2 deletions pulpcore/tasking/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from types import SimpleNamespace

TASKING_CONSTANTS = SimpleNamespace(
# The prefix provided to normal worker entries in the workers table
WORKER_PREFIX='reserved-resource-worker',
# The name of resource manager entries in the workers table
RESOURCE_MANAGER_WORKER_NAME='resource-manager',
# The amount of time (in seconds) after which a worker process is considered missing.
Expand Down
8 changes: 4 additions & 4 deletions pulpcore/tasking/services/worker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def check_worker_processes():

mark_worker_offline(worker.name)

worker_count = Worker.objects.online_workers().filter(
name__startswith=TASKING_CONSTANTS.WORKER_PREFIX).count()
worker_count = Worker.objects.online_workers().exclude(
name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME).count()

resource_manager_count = Worker.objects.online_workers().filter(
name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME).count()
Expand All @@ -86,8 +86,8 @@ def check_worker_processes():
_logger.error(msg)

if worker_count == 0:
msg = _("There are 0 worker processes running. Pulp will not operate "
"correctly without at least one worker process running.")
msg = _("There are 0 task worker processes running. Pulp will not operate "
"correctly without at least one task worker process running.")
_logger.error(msg)

output_dict = {'workers': worker_count, 'resource-manager': resource_manager_count}
Expand Down
9 changes: 6 additions & 3 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ class PulpWorker(Worker):

def __init__(self, queues, **kwargs):

kwargs['name'] = kwargs['name'].replace('%h', socket.getfqdn())
if kwargs['name']:
kwargs['name'] = kwargs['name'].replace('%h', socket.getfqdn())
else:
kwargs['name'] = "{pid}@{hostname}".format(pid=os.getpid(), hostname=socket.getfqdn())

if kwargs['name'].startswith(TASKING_CONSTANTS.WORKER_PREFIX):
queues = [Queue(kwargs['name'], connection=kwargs['connection'])]
if kwargs['name'].startswith(TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME):
queues = [Queue('resource-manager', connection=kwargs['connection'])]
else:
queues = [Queue(kwargs['name'], connection=kwargs['connection'])]

kwargs['default_worker_ttl'] = TASKING_CONSTANTS.WORKER_TTL
kwargs['job_monitoring_interval'] = TASKING_CONSTANTS.JOB_MONITORING_INTERVAL
Expand Down
48 changes: 1 addition & 47 deletions pulpcore/tests/functional/api/test_workers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# coding=utf-8
"""Tests related to the workers."""
import time
import unittest
from datetime import datetime, timedelta
from random import choice

from pulp_smash import api, cli, config
from pulp_smash import api, config
from pulp_smash.pulp3.constants import WORKER_PATH
from requests.exceptions import HTTPError

Expand Down Expand Up @@ -101,48 +100,3 @@ def test_05_http_method(self):
"""
with self.assertRaises(HTTPError):
self.client.delete(self.worker['pulp_href'])


class OfflineWorkerTestCase(unittest.TestCase):
"""Test actions over offline workers.
This test targets the following issues:
* `Pulp #2659 <https://pulp.plan.io/issues/2659>`_
* `Pulp Smash #877 <https://github.com/PulpQE/pulp-smash/issues/877>`_
"""

@classmethod
def setUpClass(cls):
"""Create an API Client and a ServiceManager."""
cls.cfg = config.get_config()
cls.client = api.Client(cls.cfg, api.json_handler)
cls.svc_mgr = cli.ServiceManager(cls.cfg, cls.cfg.get_hosts('api')[0])
cls.worker = {}
if not cls.svc_mgr.is_active(['pulpcore-worker@*']):
raise unittest.SkipTest(
'These tests require pulp workers running on systemd'
)

def test_01_start_new_worker(self):
"""Start a new worker to be used in next assertions."""
self.svc_mgr.start(['pulpcore-worker@99'])
time.sleep(2)
workers = self.client.get(
WORKER_PATH, params={'online': True}
)['results']
for worker in workers:
if 'worker-99' in worker['name']:
self.worker.update(worker)
break
self.assertNotEqual({}, self.worker)
self.assertIn('resource-worker-99', self.worker['name'])

@skip_if(bool, 'worker', False)
def test_02_read_all_workers(self):
"""Worker API shows all workers including offline."""
workers = self.client.get(WORKER_PATH)['results']
self.assertIn(
self.worker['pulp_href'],
[worker['pulp_href'] for worker in workers]
)

0 comments on commit 76ad575

Please sign in to comment.