Skip to content

Commit

Permalink
Workers can run and execute tasks.
Browse files Browse the repository at this point in the history
Thanks to ttereshc aka @goosemania who contributed some of this code.

There is a known issue that resource-reserving tasks still cannot run, but
there is a fix in the works that will follow this PR.

Fixes bugs in pulp.tasking
Rearranges pulp.tasking to put related code together
Adds two tasks (importer and publisher delete) in pulp.app.tasks
Fixes import order problems of django app vs. celery app

https://pulp.plan.io/issues/2440

re #2440
re #2400
re #2408
  • Loading branch information
mhrivnak committed Nov 29, 2016
1 parent c914453 commit 1ef8c35
Show file tree
Hide file tree
Showing 42 changed files with 245 additions and 250 deletions.
27 changes: 27 additions & 0 deletions app/pulp/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@ class ReservedResource(Model):
worker = models.ForeignKey("Worker", on_delete=models.CASCADE, related_name="reservations")


class WorkerManager(models.Manager):

def get_unreserved_worker(self):
"""
Randomly selects an unreserved :class:`~pulp.app.models.Worker`
Return the Worker instance that has no :class:`~pulp.app.models.ReservedResource`
associated with it. If all workers have ReservedResource relationships, a
:class:`pulp.app.models.Worker.DoesNotExist` exception is raised.
This method also provides randomization for worker selection.
:raises Worker.DoesNotExist: If all workers have ReservedResource entries associated with
them.
:returns: A randomly-selected Worker instance that has no ReservedResource
entries associated with it.
:rtype: pulp.app.models.Worker
"""
free_workers_qs = self.annotate(models.Count('reservations')).filter(reservations__count=0)
if free_workers_qs.count() == 0:
raise self.model.DoesNotExist()
return free_workers_qs.order_by('?').first()


class Worker(Model):
"""
Represents a worker
Expand All @@ -44,6 +69,8 @@ class Worker(Model):
name (models.TextField): The name of the worker, in the format "worker_type@hostname"
last_heartbeat (models.DateTimeField): A timestamp of this worker's last heartbeat
"""
objects = WorkerManager()

name = models.TextField(db_index=True, unique=True)
last_heartbeat = models.DateTimeField(auto_now=True)

Expand Down
1 change: 1 addition & 0 deletions app/pulp/app/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pulp.app.tasks import importer, publisher # noqa
17 changes: 17 additions & 0 deletions app/pulp/app/tasks/importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from celery import shared_task

from pulp.app import models
from pulp.tasking.tasks import UserFacingTask


@shared_task(base=UserFacingTask)
def delete(repo_name, importer_name):
"""
Delete an :class:`~pulp.app.models.Importer`
:param repo_name: the name of a repository
:type repo_name: str
:param importer_name: the name of an importer
:type importer_name: str
"""
models.Importer.objects.filter(name=importer_name, repository__name=repo_name).delete()
17 changes: 17 additions & 0 deletions app/pulp/app/tasks/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from celery import shared_task

from pulp.app import models
from pulp.tasking.tasks import UserFacingTask


@shared_task(base=UserFacingTask)
def delete(repo_name, publisher_name):
"""
Delete a :class:`~pulp.app.models.Publisher`
:param repo_name: the name of a repository
:type repo_name: str
:param publisher_name: the name of a publisher
:type publisher_name: str
"""
models.Publisher.objects.filter(name=publisher_name, repository__name=repo_name).delete()
2 changes: 1 addition & 1 deletion app/pulp/app/viewsets/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pulp.app.serializers import TaskSerializer, WorkerSerializer
from pulp.app.viewsets import NamedModelViewSet
from pulp.app.viewsets.custom_filters import CharInFilter
from pulp.tasking.base import cancel as cancel_task
from pulp.tasking.util import cancel as cancel_task

from rest_framework.decorators import detail_route
from rest_framework.response import Response
Expand Down
45 changes: 24 additions & 21 deletions docs/contributing/platform_api/tasking.rst
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@

pulp.tasking
===============
============

.. automodule:: pulp.tasking

pulp.tasking.base
-----------------

.. automodule:: pulp.tasking.base

pulp.tasking.celery_app
-----------------------

Expand All @@ -24,27 +19,35 @@ pulp.tasking.constants

.. automodule:: pulp.tasking.constants

pulp.tasking.manage_workers
---------------------------
pulp.tasking.services.manage_workers
------------------------------------

.. automodule:: pulp.tasking.manage_workers
.. automodule:: pulp.tasking.services.manage_workers

pulp.tasking.registry
---------------------
pulp.tasking.services.scheduler
-------------------------------

.. automodule:: pulp.tasking.registry
.. automodule:: pulp.tasking.services.scheduler

pulp.tasking.scheduler
----------------------
pulp.tasking.services.storage
-----------------------------

.. automodule:: pulp.tasking.services.storage

.. automodule:: pulp.tasking.scheduler
pulp.tasking.services.worker_watcher
------------------------------------

pulp.tasking.storage
--------------------
.. automodule:: pulp.tasking.services.worker_watcher

pulp.tasking.tasks
------------------

.. automodule:: pulp.tasking.tasks


pulp.tasking.util
-----------------

.. automodule:: pulp.tasking.storage
.. automodule:: pulp.tasking.util

pulp.tasking.worker_watcher
---------------------------

.. automodule:: pulp.tasking.worker_watcher
2 changes: 1 addition & 1 deletion plugin/pulp/plugin/tasking.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pulp.app import models
from pulp.exceptions import exception_to_dict
from pulp.tasking import get_current_task_id
from pulp.tasking.util import get_current_task_id


class Task(object):
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/plugins/conduits/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pulp.server import exceptions as pulp_exceptions
import pulp.plugins.conduits._common as common_utils
import pulp.server.managers.factory as manager_factory
from pulp.tasking import get_current_task_id
from pulp.tasking.util import get_current_task_id


_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/controllers/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pulp.plugins.util.publish_step import Step
from pulp.server.content.sources.container import ContentContainer
from pulp.server.exceptions import PulpCodedTaskException
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask

_logger = getLogger(__name__)

Expand Down
43 changes: 1 addition & 42 deletions server/pulp/server/controllers/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pulp.server import exceptions
from pulp.server.db import model
from pulp.server.managers import factory as managers
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,47 +110,6 @@ def queue_delete(distributor):
return async_result


@celery.task(base=UserFacingTask, name='pulp.server.tasks.repository.distributor_delete')
def delete(repo_id, dist_id):
"""
Removes a distributor from a repository and unbinds any bound consumers.
:param distributor: distributor to be deleted
:type distributor: pulp.server.db.model.Distributor
"""

distributor = model.Distributor.objects.get_or_404(repo_id=repo_id, distributor_id=dist_id)
managers.repo_publish_schedule_manager().delete_by_distributor_id(repo_id, dist_id)

# Call the distributor's cleanup method
dist_instance, plugin_config = plugin_api.get_distributor_by_id(distributor.distributor_type_id)

call_config = PluginCallConfiguration(plugin_config, distributor.config)
repo = model.Repository.objects.get_repo_or_missing_resource(repo_id)
dist_instance.distributor_removed(repo.to_transfer_repo(), call_config)
distributor.delete()

unbind_errors = []
additional_tasks = []
options = {}

bind_manager = managers.consumer_bind_manager()
for bind in bind_manager.find_by_distributor(repo_id, dist_id):
try:
report = bind_manager.unbind(bind['consumer_id'], bind['repo_id'],
bind['distributor_id'], options)
if report:
additional_tasks.extend(report.spawned_tasks)
except Exception, e:
unbind_errors.append(e)

if unbind_errors:
bind_error = exceptions.PulpCodedException(PLP0003, repo_id=repo_id,
distributor_id=dist_id)
bind_error.child_exceptions = unbind_errors
raise bind_error


def queue_update(distributor, config, delta):
"""
Dispatch a task to update a distributor.
Expand Down
25 changes: 1 addition & 24 deletions server/pulp/server/controllers/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pulp.server import exceptions
from pulp.server.db import model
from pulp.server.managers import factory as manager_factory
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -170,29 +170,6 @@ def validate_importer_config(repo_obj, importer_type_id, config):
raise exceptions.PulpDataException(message)


@celery.task(base=UserFacingTask, name='pulp.server.managers.repo.importer.remove_importer')
def remove_importer(repo_id):
"""
Removes an importer from a repository.
:param repo_id: identifies the repo
:type repo_id: str
"""
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
repo_importer = model.Importer.objects.get_or_404(repo_id=repo_id)

# remove schedules
sync_manager = manager_factory.repo_sync_schedule_manager()
sync_manager.delete_by_importer_id(repo_id, repo_importer.importer_type_id)

# Call the importer's cleanup method
importer_instance, plugin_config = plugin_api.get_importer_by_id(repo_importer.importer_type_id)

call_config = PluginCallConfiguration(plugin_config, repo_importer.config)
transfer_repo = repo_obj.to_transfer_repo()
importer_instance.importer_removed(transfer_repo, call_config)
repo_importer.delete()


def queue_remove_importer(repo_id, importer_type_id):
"""
Expand Down
3 changes: 2 additions & 1 deletion server/pulp/server/controllers/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
from pulp.server.lazy import URL, Key
from pulp.server.managers import factory as manager_factory
from pulp.server.util import InvalidChecksumType
from pulp.tasking import PulpTask, UserFacingTask, get_current_task_id
from pulp.tasking.tasks import PulpTask, UserFacingTask
from pulp.tasking.util import get_current_task_id
from pulp.tasking.storage import get_working_directory

_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/db/reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pulp.server import config as pulp_config
from pulp.server.db import model
from pulp.server.db.model import celery_result, consumer, repo_group, repository
from pulp.tasking import PulpTask, UserFacingTask
from pulp.tasking.tasks import PulpTask, UserFacingTask


# Add collections to reap here. The keys in this datastructure are the Model classes that represent
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/maintenance/monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pulp.common.tags import action_tag
from pulp.server.managers.consumer.applicability import RepoProfileApplicabilityManager

from pulp.tasking import PulpTask, UserFacingTask
from pulp.tasking.tasks import PulpTask, UserFacingTask


@task(base=PulpTask)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/auth/permission/cud.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
DuplicateResource, InvalidValue, MissingResource, PulpDataException,
PulpExecutionException)
from pulp.server.managers import factory
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


class PermissionManager(object):
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/auth/role/cud.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pulp.server.exceptions import (DuplicateResource, InvalidValue, MissingResource,
PulpDataException)
from pulp.server.managers import factory
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_ROLE_NAME_REGEX = re.compile(r'^[\-_A-Za-z0-9]+$') # letters, numbers, underscore, hyphen
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/consumer/applicability.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pulp.server.managers import factory as managers
from pulp.server.managers.consumer.query import ConsumerQueryManager
from pulp.plugins.util.misc import paginate
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/consumer/bind.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pulp.server.db.model.consumer import Bind
from pulp.server.exceptions import MissingResource, InvalidValue
from pulp.server.managers import factory
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/consumer/cud.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
MissingResource, PulpExecutionException, MissingValue
from pulp.server.managers import factory
from pulp.server.managers.schedule import utils as schedule_utils
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_CONSUMER_ID_REGEX = re.compile(r'^[.\-_A-Za-z0-9]+$') # letters, numbers, underscore, hyphen
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/consumer/group/cud.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pulp.server.exceptions import PulpCodedException, PulpException
from pulp.server.managers import factory as manager_factory
from pulp.server.controllers.consumer import bind as bind_task, unbind as unbind_task
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/consumer/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pulp.server.db.model.consumer import UnitProfile
from pulp.server.exceptions import MissingResource, MissingValue
from pulp.server.managers import factory
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


class ProfileManager(object):
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/content/orphan.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pulp.server.db.model.repository import RepoContentUnit
from pulp.server.db import model
from pulp.server.exceptions import MissingResource
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/content/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pulp.server.exceptions import (PulpDataException, MissingResource, PulpExecutionException,
PulpException, PulpCodedException)
from pulp.server.controllers import repository as repo_controller
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/repo/group/cud.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pulp.server.db import model
from pulp.server.db.model.repo_group import RepoGroup
from pulp.server.managers.repo.group.distributor import RepoGroupDistributorManager
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = logging.getLogger(__name__)
Expand Down
Loading

0 comments on commit 1ef8c35

Please sign in to comment.