diff --git a/CHANGES/5787.feature b/CHANGES/5787.feature new file mode 100644 index 0000000000..f7dc92dc34 --- /dev/null +++ b/CHANGES/5787.feature @@ -0,0 +1,4 @@ +Workers no longer require names, and auto-name as {pid}@{fqdn}. This allows easy finding of +processes from the Status API. Custom names still work by specifying the ``-n`` option when starting +a worker. Any worker name starting with ``resource-manager`` is a resource-manager, otherwise it's +assumed to be a task worker. diff --git a/containers/images/pulp/container-assets/pulp-worker b/containers/images/pulp/container-assets/pulp-worker index 960cf29f27..b62cab42f2 100755 --- a/containers/images/pulp/container-assets/pulp-worker +++ b/containers/images/pulp/container-assets/pulp-worker @@ -5,4 +5,4 @@ # TODO: Set ${PULP_WORKER_NUMBER} to the Pod Number # In the meantime, the hostname provides uniqueness. -exec rq worker --url "redis://${REDIS_SERVICE_HOST}:${REDIS_SERVICE_PORT}" -n "reserved-resource-worker-${PULP_WORKER_NUMBER}@${HOSTNAME}" -w "pulpcore.tasking.worker.PulpWorker" -c "pulpcore.rqconfig" +exec rq worker --url "redis://${REDIS_SERVICE_HOST}:${REDIS_SERVICE_PORT}" -w "pulpcore.tasking.worker.PulpWorker" -c "pulpcore.rqconfig" diff --git a/docs/components.rst b/docs/components.rst index 05e93e7aaa..6d78fc6d56 100644 --- a/docs/components.rst +++ b/docs/components.rst @@ -63,11 +63,13 @@ Pulp's tasking system has two components: a resource manager and workers, all of Worker Pulp workers perform most tasks "run" by the tasking system including long-running tasks like synchronize and short-running tasks like a Distribution update. Each worker handles one task at a - time, and additional workers provide more concurrency. + time, and additional workers provide more concurrency. Workers auto-name and are auto-discovered, + so they can be started and stopped without notifying Pulp. Resource Manager A different type of Pulp worker that plays a coordinating role for the tasking system. You must - run exactly one of these for Pulp to operate correctly. + run exactly one of these for Pulp to operate correctly. The ``resource-manager`` is identified by + configuring its name with the ``-n 'resource_manager'``. .. note:: diff --git a/docs/installation/instructions.rst b/docs/installation/instructions.rst index a762a47017..d7e4010e0f 100644 --- a/docs/installation/instructions.rst +++ b/docs/installation/instructions.rst @@ -63,7 +63,8 @@ PyPI Installation $ pip install -e ./pulpcore[postgres] -5. Follow the :ref:`configuration instructions ` to set the ``SECRET_KEY``. +5. Follow the :ref:`configuration instructions ` to set ``SECRET_KEY`` and + ``CONTENT_ORIGIN``. 6. Create the ``MEDIA_ROOT`` & ``WORKING_DIRECTORY`` with the prescribed permissions from the :ref:`configuration instructions. ` @@ -76,8 +77,8 @@ PyPI Installation the commands yourself inside of a shell. This is fine for development but not recommended in production:: $ /path/to/python/bin/rq worker -n 'resource-manager@%h' -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig' - $ /path/to/python/bin/rq worker -n 'reserved-resource-worker-1@%h' -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig' - $ /path/to/python/bin/rq worker -n 'reserved-resource-worker-2@%h' -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig' + $ /path/to/python/bin/rq worker -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig' + $ /path/to/python/bin/rq worker -w 'pulpcore.tasking.worker.PulpWorker' -c 'pulpcore.rqconfig' 8. Run Django Migrations:: diff --git a/pulpcore/app/models/task.py b/pulpcore/app/models/task.py index 9149b7d691..bd495ca117 100644 --- a/pulpcore/app/models/task.py +++ b/pulpcore/app/models/task.py @@ -120,7 +120,9 @@ def get_unreserved_worker(self): Raises: Worker.DoesNotExist: If all Workers have at least one ReservedResource entry. """ - workers_qs = self.online_workers().filter(name__startswith=TASKING_CONSTANTS.WORKER_PREFIX) + workers_qs = self.online_workers().exclude( + name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME + ) workers_qs_with_counts = workers_qs.annotate(models.Count('reservations')) try: return workers_qs_with_counts.filter(reservations__count=0).order_by('?')[0] diff --git a/pulpcore/tasking/constants.py b/pulpcore/tasking/constants.py index 961e00483f..cc64bbdf17 100644 --- a/pulpcore/tasking/constants.py +++ b/pulpcore/tasking/constants.py @@ -1,8 +1,6 @@ from types import SimpleNamespace TASKING_CONSTANTS = SimpleNamespace( - # The prefix provided to normal worker entries in the workers table - WORKER_PREFIX='reserved-resource-worker', # The name of resource manager entries in the workers table RESOURCE_MANAGER_WORKER_NAME='resource-manager', # The amount of time (in seconds) after which a worker process is considered missing. diff --git a/pulpcore/tasking/services/worker_watcher.py b/pulpcore/tasking/services/worker_watcher.py index af626ca402..ef298916c4 100644 --- a/pulpcore/tasking/services/worker_watcher.py +++ b/pulpcore/tasking/services/worker_watcher.py @@ -74,8 +74,8 @@ def check_worker_processes(): mark_worker_offline(worker.name) - worker_count = Worker.objects.online_workers().filter( - name__startswith=TASKING_CONSTANTS.WORKER_PREFIX).count() + worker_count = Worker.objects.online_workers().exclude( + name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME).count() resource_manager_count = Worker.objects.online_workers().filter( name__startswith=TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME).count() @@ -86,8 +86,8 @@ def check_worker_processes(): _logger.error(msg) if worker_count == 0: - msg = _("There are 0 worker processes running. Pulp will not operate " - "correctly without at least one worker process running.") + msg = _("There are 0 task worker processes running. Pulp will not operate " + "correctly without at least one task worker process running.") _logger.error(msg) output_dict = {'workers': worker_count, 'resource-manager': resource_manager_count} diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index d667d7f78e..47a39a5f8e 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -49,12 +49,15 @@ class PulpWorker(Worker): def __init__(self, queues, **kwargs): - kwargs['name'] = kwargs['name'].replace('%h', socket.getfqdn()) + if kwargs['name']: + kwargs['name'] = kwargs['name'].replace('%h', socket.getfqdn()) + else: + kwargs['name'] = "{pid}@{hostname}".format(pid=os.getpid(), hostname=socket.getfqdn()) - if kwargs['name'].startswith(TASKING_CONSTANTS.WORKER_PREFIX): - queues = [Queue(kwargs['name'], connection=kwargs['connection'])] if kwargs['name'].startswith(TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME): queues = [Queue('resource-manager', connection=kwargs['connection'])] + else: + queues = [Queue(kwargs['name'], connection=kwargs['connection'])] kwargs['default_worker_ttl'] = TASKING_CONSTANTS.WORKER_TTL kwargs['job_monitoring_interval'] = TASKING_CONSTANTS.JOB_MONITORING_INTERVAL diff --git a/pulpcore/tests/functional/api/test_workers.py b/pulpcore/tests/functional/api/test_workers.py index a27c2bd4d5..8244d362db 100644 --- a/pulpcore/tests/functional/api/test_workers.py +++ b/pulpcore/tests/functional/api/test_workers.py @@ -1,11 +1,10 @@ # coding=utf-8 """Tests related to the workers.""" -import time import unittest from datetime import datetime, timedelta from random import choice -from pulp_smash import api, cli, config +from pulp_smash import api, config from pulp_smash.pulp3.constants import WORKER_PATH from requests.exceptions import HTTPError @@ -101,48 +100,3 @@ def test_05_http_method(self): """ with self.assertRaises(HTTPError): self.client.delete(self.worker['pulp_href']) - - -class OfflineWorkerTestCase(unittest.TestCase): - """Test actions over offline workers. - - This test targets the following issues: - - * `Pulp #2659 `_ - * `Pulp Smash #877 `_ - """ - - @classmethod - def setUpClass(cls): - """Create an API Client and a ServiceManager.""" - cls.cfg = config.get_config() - cls.client = api.Client(cls.cfg, api.json_handler) - cls.svc_mgr = cli.ServiceManager(cls.cfg, cls.cfg.get_hosts('api')[0]) - cls.worker = {} - if not cls.svc_mgr.is_active(['pulpcore-worker@*']): - raise unittest.SkipTest( - 'These tests require pulp workers running on systemd' - ) - - def test_01_start_new_worker(self): - """Start a new worker to be used in next assertions.""" - self.svc_mgr.start(['pulpcore-worker@99']) - time.sleep(2) - workers = self.client.get( - WORKER_PATH, params={'online': True} - )['results'] - for worker in workers: - if 'worker-99' in worker['name']: - self.worker.update(worker) - break - self.assertNotEqual({}, self.worker) - self.assertIn('resource-worker-99', self.worker['name']) - - @skip_if(bool, 'worker', False) - def test_02_read_all_workers(self): - """Worker API shows all workers including offline.""" - workers = self.client.get(WORKER_PATH)['results'] - self.assertIn( - self.worker['pulp_href'], - [worker['pulp_href'] for worker in workers] - )