Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.
/ pulp Public archive

Commit

Permalink
Progress for story 23
Browse files Browse the repository at this point in the history
* Recursive serialization and deserialization
* Force all Pulp Celery tasks to inherit from PulpTask
  • Loading branch information
Shubham Bhawsinka authored and ipanova committed Dec 15, 2015
1 parent 3cb6dea commit 2ed8b7f
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 15 deletions.
1 change: 1 addition & 0 deletions server/pulp/server/async/celery_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
celery.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)
celery.conf.update(CELERYBEAT_SCHEDULER='pulp.server.async.scheduler.Scheduler')
celery.conf.update(CELERY_WORKER_DIRECT=True)
celery.conf.update(CELERY_TASK_SERIALIZER='json')


def configure_login_method():
Expand Down
108 changes: 100 additions & 8 deletions server/pulp/server/async/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import traceback
import uuid

from bson.json_util import dumps as bson_dumps
from bson.json_util import loads as bson_loads
from bson import ObjectId
from celery import task, Task as CeleryTask, current_task
from celery.app import control, defaults
from celery.result import AsyncResult
Expand All @@ -28,7 +31,98 @@
_logger = logging.getLogger(__name__)


@task(acks_late=True)
class PulpTask(CeleryTask):
"""
This is a custom Pulp subclass of the Celery Task object. It allows us to check if the
parameters are valid or not and call Celery's method if they are valid.
Otherwise raise an exception.
"""

@classmethod
def _json_validator(cls, value):
"""
Checks if the value is an allowed type or not. It also transforms the
ObjectId's to str type and vice versa.
Accepted types are ObjectId, bool, int, long, float or basestring.
This function is recursively called for dict, list or tuples.
Any ObjectId types present are serialized to a str.
The same str is converted back to an ObjectId while de-serializing.
This function is called from apply_async and __call__ methods of this class.
A TypeError is raised if value is not a Python primitive or ObjectId.
:param value: the args or kwargs to be checked
:type value: Object
:returns: ars or kwargs value with updated id field
:rtype: Object
:raise TypeError: if value is not a Python primitive or ObjectId
"""

if value is None or isinstance(value, (bool, int, long, float, basestring)):
return value

# Encoding ObjectId to str
if isinstance(value, ObjectId):
return bson_dumps(value)

# Recursive checks inside dict
if isinstance(value, dict):
if len(value) == 0:
return value
# Decoding '$oid' back to ObjectId
if '$oid' in value.keys():
return bson_loads(value)

return {cls._json_validator(k): cls._json_validator(v) for k, v in value.iteritems()}

# Recursive checks inside a list
if isinstance(value, list):
if len(value) == 0:
return value
for i, val in enumerate(value):
value[i] = cls._json_validator(val)
return value

# Recursive checks inside a tuple
if isinstance(value, tuple):
if len(value) == 0:
return value
return tuple([cls._json_validator(val) for val in value])

raise TypeError("%s is of type %s. The Pulp tasking system only allows Python primitives "
"and PyMongo ObjectId types as arguments." % (repr(value), type(value)))

def apply_async(self, *args, **kwargs):
"""
A wrapper around the Celery apply_async method.
It calls the _json_checker with args and kwargs to check if they are valid
or not. If they are valid, the methods calls the Celery apply_async methods,
if not, a TypeError is expected to be raised.
:return: An AsyncResult instance as returned by Celery's apply_async
:rtype: celery.result.AsyncResult
"""

args = self._json_validator(args)
kwargs = self._json_validator(kwargs)
return super(PulpTask, self).apply_async(*args, **kwargs)

def __call__(self, *args, **kwargs):
"""
This overrides Celery's __call__() method. We use this method
for task state tracking of Pulp tasks.
"""

args = self._json_validator(args)
kwargs = self._json_validator(kwargs)
return super(PulpTask, self).__call__(*args, **kwargs)


@task(base=PulpTask, acks_late=True)
def _queue_reserved_task(name, task_id, resource_id, inner_args, inner_kwargs):
"""
A task that encapsulates another task to be dispatched later. This task being encapsulated is
Expand Down Expand Up @@ -183,7 +277,7 @@ def _delete_worker(name, normal_shutdown=False):
cancel(task_status['task_id'])


@task
@task(base=PulpTask)
def _release_resource(task_id):
"""
Do not queue this task yourself. It will be used automatically when your task is dispatched by
Expand Down Expand Up @@ -321,23 +415,22 @@ def apply_async_with_reservation(self, resource_type, resource_id, *args, **kwar
# this change is propagated to all db nodes, using an 'upsert' here and setting
# the task state to 'waiting' only on an insert.
task_status.save_with_set_on_insert(fields_to_set_on_insert=['state', 'start_time'])

_queue_reserved_task.apply_async(args=[task_name, inner_task_id, resource_id, args, kwargs],
queue=RESOURCE_MANAGER_QUEUE)
return AsyncResult(inner_task_id)


class Task(CeleryTask, ReservedTaskMixin):
class Task(PulpTask, ReservedTaskMixin):
"""
This is a custom Pulp subclass of the Celery Task object. It allows us to inject some custom
This is a custom Pulp subclass of the PulpTask class. It allows us to inject some custom
behavior into each Pulp task, including management of resource locking.
"""
# this tells celery to not automatically log tracebacks for these exceptions
throws = (PulpCodedException,)

def apply_async(self, *args, **kwargs):
"""
A wrapper around the Celery apply_async method. It allows us to accept a few more
A wrapper around the PulpTask apply_async method. It allows us to accept a few more
parameters than Celery does for our own purposes, listed below. It also allows us
to create and update task status which can be used to track status of this task
during it's lifetime.
Expand All @@ -357,7 +450,6 @@ def apply_async(self, *args, **kwargs):
defaults.NAMESPACES['CELERY']['DEFAULT_ROUTING_KEY'].default)
tag_list = kwargs.pop('tags', [])
group_id = kwargs.pop('group_id', None)

async_result = super(Task, self).apply_async(*args, **kwargs)
async_result.tags = tag_list

Expand All @@ -374,7 +466,7 @@ def apply_async(self, *args, **kwargs):

def __call__(self, *args, **kwargs):
"""
This overrides CeleryTask's __call__() method. We use this method
This overrides PulpTask's __call__() method. We use this method
for task state tracking of Pulp tasks.
"""
# Check task status and skip running the task if task state is 'canceled'.
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/controllers/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pulp.plugins.conduits.mixins import (ContentSourcesConduitException, StatusMixin,
PublishReportMixin)
from pulp.plugins.util.publish_step import Step
from pulp.server.async.tasks import Task
from pulp.server.async.tasks import PulpTask, Task
from pulp.server.content.sources.container import ContentContainer
from pulp.server.exceptions import PulpCodedTaskException

Expand Down
36 changes: 34 additions & 2 deletions server/pulp/server/controllers/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import celery

from pulp.common.error_codes import PLP0002, PLP0003
from pulp.server.async.tasks import Task, TaskResult
from pulp.server.controllers import consumer
from pulp.common.error_codes import PLP0002, PLP0003
from pulp.server.async.tasks import PulpTask, Task, TaskResult
from pulp.server.exceptions import PulpCodedException
from pulp.server.managers import factory as managers

Expand Down Expand Up @@ -107,3 +107,35 @@ def update(repo_id, distributor_id, config, delta):
bind_error = PulpCodedException(PLP0002, repo_id=repo_id, distributor_id=distributor_id)
bind_error.child_exceptions = bind_errors
return TaskResult(distributor, bind_error, additional_tasks)


@celery.task(base=PulpTask)
def publish(repo_id, distributor_id, overrides=None):
"""
Create an itinerary for repo publish.
:param repo_id: id of the repo to publish
:type repo_id: str
:param distributor_id: id of the distributor to use for the repo publish
:type distributor_id: str
:param overrides: dictionary of options to pass to the publish manager
:type overrides: dict or None
:return: list of call requests
:rtype: list
"""
return managers.repo_publish_manager().queue_publish(repo_id, distributor_id, overrides)


@celery.task(base=PulpTask)
def sync_with_auto_publish(repo_id, overrides=None):
"""
Sync a repository and upon successful completion, publish
any distributors that are configured for auto publish.
:param repo_id: id of the repository to create a sync call request list for
:type repo_id: str
:param overrides: dictionary of configuration overrides for this sync
:type overrides: dict or None
:return: A task result containing the details of the task executed and any spawned tasks
:rtype: TaskResult
"""
return managers.repo_sync_manager().queue_sync_with_auto_publish(repo_id, overrides)
4 changes: 2 additions & 2 deletions server/pulp/server/db/reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from pulp.common.tags import action_tag
from pulp.server import config as pulp_config
from pulp.server.async.tasks import Task
from pulp.server.db import model
from pulp.server.async.tasks import PulpTask, Task
from pulp.server.db.model import celery_result, consumer, repo_group, repository


Expand All @@ -27,7 +27,7 @@
_logger = logging.getLogger(__name__)


@task
@task(base=PulpTask)
def queue_reap_expired_documents():
"""
Create an itinerary for reaper task
Expand Down
4 changes: 2 additions & 2 deletions server/pulp/server/maintenance/monthly.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from celery import task

from pulp.common.tags import action_tag
from pulp.server.async.tasks import Task
from pulp.server.async.tasks import PulpTask, Task
from pulp.server.managers.consumer.applicability import RepoProfileApplicabilityManager


@task
@task(base=PulpTask)
def queue_monthly_maintenance():
"""
Create an itinerary for monthly task
Expand Down
1 change: 1 addition & 0 deletions server/test/unit/server/async/test_celery_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def test_celery_conf_updated(self):
"""
self.assertEqual(celery_instance.celery.conf['CELERYBEAT_SCHEDULE'],
celery_instance.CELERYBEAT_SCHEDULE)
self.assertEqual(celery_instance.celery.conf['CELERY_TASK_SERIALIZER'], 'json')


def fake_get(new_config, section, key):
Expand Down

0 comments on commit 2ed8b7f

Please sign in to comment.