Skip to content

Commit

Permalink
Parallelizes applicability regeneration for updated repository
Browse files Browse the repository at this point in the history
This patch provides a new Celery task for performing applicability regenration
for a batch of applicability profiles. The ApplicabilityRegenerationManager
dispatches a series of tasks with the same group id. Each task is dispatched with
a list of up to 10 RepoProfileApplicabilities to reevaluate.

The API endpoint for generating content applicability for updated repositories
changes as part of this patch. Instead of returning 202 with a call report, the
server returns 202 with a group call report.

This patch does not make any changes to the algorithm used to calculate content
applicability.

https://pulp.plan.io/issues/20
closes #20
  • Loading branch information
dkliban committed Dec 16, 2015
1 parent 0da5917 commit 0ecc2df
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 69 deletions.
35 changes: 31 additions & 4 deletions docs/dev-guide/conventions/sync-v-async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ Any REST API will return one of three responses:
A success response indicates no conflicts were detected and the REST call
executed. This is what is typically expected from a REST API call.

A postponed response indicates that some portion of the command has been
queued to execute asynchronously. In this case a :ref:`call_report` will be returned
with the results of the synchronously executed portion of the command, if there are any,
and a list of the tasks that have been spawned to complete the work in the future.
A postponed response indicates that some portion of the command has been queued to execute
asynchronously. In this case a :ref:`call_report` or a :ref:`group_call_report` will be returned
with the results of the synchronously executed portion of the command.

When a single task is queued or a task with follow up tasks, such as a publish after a sync or unbind
consumers after repo delete, a :ref:`call_report` is returned. The :ref:`call_report` will contain
a list of spawned tasks.

When some number of related tasks are queued, and it is useful to track the state of all of them
together, a :ref:`group_call_report` is returned.

More information on retrieving and displaying task information can be found
:ref:`in the Task Management API documentation <task_management>`.
Expand Down Expand Up @@ -50,3 +56,24 @@ Example Call Report::
"spawned_tasks": [{"_href": "/pulp/api/v2/tasks/7744e2df-39b9-46f0-bb10-feffa2f7014b/",
"task_id": "7744e2df-39b9-46f0-bb10-feffa2f7014b" }]
}

.. _group_call_report:

Group Call Report
-----------------

A 202 ACCEPTED response returns a **Group Call Report** JSON object as the response body that has
the following fields:

* **_href** - Path to the root of task group resource. However, this API endpoint currently
returns 404 in all cases. You can append `state-summary/` to the URL and perform a GET request
to retrieve a :ref:`task_group_summary`.
* **group_id** - UUID of the group that all the dispatched tasks belong to.

Example Group Call Report::

{
"_href": "/pulp/api/v2/task_groups/16412fcb-06fa-4caa-818b-b103e2a9bf44/",
"group_id": "16412fcb-06fa-4caa-818b-b103e2a9bf44"
}

24 changes: 15 additions & 9 deletions docs/dev-guide/integration/rest-api/consumer/applicability.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ to the given repositories, applicability data is generated for them as well.
Generated applicability data can be queried using
the `Query Content Applicability` API described below.

The API will return a :ref:`call_report`. Users can check whether the applicability
generation is completed using task id in the :ref:`call_report`. You can run
a single applicability generation task at a time. If an applicability generation
task is running, any new applicability generation tasks requested are queued
and postponed until the current task is completed.
The API will return a :ref:`group_call_report`. Users can check whether the applicability
generation is completed using group id in the :ref:`group_call_report`. The `_href` in the
:ref:`group_call_report` will point to the root of `task_group` resource. However, this API
endpoint currently returns 404 in all cases. You can append '/state-summary/' to the URL and
perform a GET request to retrieve a :ref:`task_group_summary`.

| :method:`post`
| :path:`/v2/repositories/actions/content/regenerate_applicability/`
Expand All @@ -77,7 +77,7 @@ and postponed until the current task is completed.
* :response_code:`202,if applicability regeneration is queued successfully`
* :response_code:`400,if one or more of the parameters is invalid`

| :return:a :ref:`call_report` representing the current state of the applicability regeneration
| :return: a :ref:`group_call_report` representing the current state of the applicability regeneration
:sample_request:`_` ::

Expand All @@ -87,8 +87,14 @@ and postponed until the current task is completed.
}
}

**Tags:**
The task created will have the following tag: ``"pulp:action:content_applicability_regeneration"``

:sample_response:`202` ::

{
"_href": "/pulp/api/v2/task_groups/16412fcb-06fa-4caa-818b-b103e2a9bf44/",
"group_id": "16412fcb-06fa-4caa-818b-b103e2a9bf44"
}


Generate Content Applicability for a single Consumer
----------------------------------------------------
Expand All @@ -114,7 +120,7 @@ are queued and postponed until the current task is completed.
* :response_code:`202,if applicability regeneration is queued successfully`
* :response_code:`404,if a consumer with given consumer_id does not exist`

| :return:a :ref:`call_report` representing the current state of the applicability regeneration
| :return: a :ref:`call_report` representing the current state of the applicability regeneration
**Tags:**
The task created will have the following tag: ``"pulp:action:consumer_content_applicability_regeneration"``
Expand Down
5 changes: 5 additions & 0 deletions docs/user-guide/release-notes/master.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ New Features
store rather than the certificate authority trust store bundled with
``python-requests``.

* Content applicability for an updated repository is calculated in parallel.

Deprecation
-----------

Expand Down Expand Up @@ -71,6 +73,9 @@ Rest API Changes

* Tasks with complete states (except `canceled` state) can now be deleted.

* The API for regenerating content applicability for updated repositories no longer returns a
:ref:`call_report`. Instead a :ref:`group_call_report` is returned.

Binding API Changes
-------------------

Expand Down
74 changes: 47 additions & 27 deletions server/pulp/server/managers/consumer/applicability.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from gettext import gettext as _
from logging import getLogger
from uuid import uuid4

from celery import task

Expand All @@ -17,6 +18,7 @@
from pulp.server.db.model.criteria import Criteria
from pulp.server.managers import factory as managers
from pulp.server.managers.consumer.query import ConsumerQueryManager
from pulp.plugins.util.misc import paginate


_logger = getLogger(__name__)
Expand Down Expand Up @@ -105,9 +107,10 @@ def regenerate_applicability_for_consumers(consumer_criteria):
manager.regenerate_applicability(profile_hash, content_type, profile_id, repo_id)

@staticmethod
def regenerate_applicability_for_repos(repo_criteria):
def queue_regenerate_applicability_for_repos(repo_criteria):
"""
Regenerate and save applicability data affected by given updated repositories.
Queue a group of tasks to generate and save applicability data affected by given updated
repositories.
:param repo_criteria: The repo selection criteria
:type repo_criteria: dict
Expand All @@ -118,29 +121,46 @@ def regenerate_applicability_for_repos(repo_criteria):
repo_criteria.fields = ['id']
repo_ids = [r.repo_id for r in model.Repository.objects.find_by_criteria(repo_criteria)]

task_group_id = uuid4()

for repo_id in repo_ids:
# Find all existing applicabilities for given repo_id. Setting batch size of 5 ensures
# the MongoDB cursor does not time out. See https://pulp.plan.io/issues/998#note-6 for
# more details.
existing_applicabilities = RepoProfileApplicability.get_collection().find(
{'repo_id': repo_id}).batch_size(5)
for existing_applicability in existing_applicabilities:
existing_applicability_ids = RepoProfileApplicability.get_collection().find(
{'repo_id': repo_id}, {'_id': 1})
for batch in paginate(existing_applicability_ids, 10):
batch_regenerate_applicability_task.apply_async((repo_id, batch),
**{'group_id': task_group_id})
return task_group_id

@staticmethod
def batch_regenerate_applicability(repo_id, existing_applicability_ids):
"""
Regenerate and save applicability data for a batch of existing applicabilities
:param repo_id: Repository id for which applicability is being calculated
:type repo_id: str
:param existing_applicability_ids: Tuple of Object Ids for applicability profiles
:type existing_applicability_ids: tuple of dicts in form of {"_id": ObjectID('mongo-id')}
"""
id_list = [id['_id'] for id in existing_applicability_ids]
existing_applicabilities = RepoProfileApplicability.get_collection().find(
{"_id": {"$in": id_list}})
for existing_applicability in existing_applicabilities:
# Convert cursor to RepoProfileApplicability object
existing_applicability = RepoProfileApplicability(**dict(existing_applicability))
profile_hash = existing_applicability['profile_hash']
unit_profile = UnitProfile.get_collection().find_one({'profile_hash': profile_hash},
fields=['id', 'content_type'])
if unit_profile is None:
# Unit profiles change whenever packages are installed or removed on consumers,
# and it is possible that existing_applicability references a UnitProfile
# that no longer exists. This is harmless, as Pulp has a monthly cleanup task
# that will identify these dangling references and remove them.
continue

# Regenerate applicability data for given unit_profile and repo id
ApplicabilityRegenerationManager.regenerate_applicability(
profile_hash, unit_profile['content_type'], unit_profile['id'], repo_id,
existing_applicability)
existing_applicability = RepoProfileApplicability(**dict(existing_applicability))
profile_hash = existing_applicability['profile_hash']
unit_profile = UnitProfile.get_collection().find_one({'profile_hash': profile_hash},
fields=['id', 'content_type'])
if unit_profile is None:
# Unit profiles change whenever packages are installed or removed on consumers,
# and it is possible that existing_applicability references a UnitProfile
# that no longer exists. This is harmless, as Pulp has a monthly cleanup task
# that will identify these dangling references and remove them.
continue

# Regenerate applicability data for given unit_profile and repo id
ApplicabilityRegenerationManager.regenerate_applicability(
profile_hash, unit_profile['content_type'], unit_profile['id'], repo_id,
existing_applicability)

@staticmethod
def regenerate_applicability(profile_hash, content_type, profile_id,
Expand Down Expand Up @@ -223,7 +243,7 @@ def _get_existing_repo_content_types(repo_id):
:return: A list of content type ids that have unit counts greater than 0
:rtype: list
"""
repo_obj = model.Repository.objects.first(repo_id)
repo_obj = model.Repository.objects.get(repo_id=repo_id)
if not repo_obj:
return []

Expand Down Expand Up @@ -273,9 +293,9 @@ def _profiler(type_id):
regenerate_applicability_for_consumers = task(
ApplicabilityRegenerationManager.regenerate_applicability_for_consumers, base=Task,
ignore_result=True)
regenerate_applicability_for_repos = task(
ApplicabilityRegenerationManager.regenerate_applicability_for_repos, base=Task,
ignore_result=True)
batch_regenerate_applicability_task = task(
ApplicabilityRegenerationManager.batch_regenerate_applicability, base=Task,
ignore_results=True)


class DoesNotExist(Exception):
Expand Down
7 changes: 4 additions & 3 deletions server/pulp/server/webservices/middleware/postponed.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ def _get_operation_postponed_body(exception):
if isinstance(exception.call_report, AsyncResult):
report = TaskResult.from_async_result(exception.call_report)
serialized_call_report = report.serialize()
for task in serialized_call_report['spawned_tasks']:
href_obj = dispatch.task_result_href(task)
task.update(href_obj)
if 'spawned_tasks' in serialized_call_report:
for task in serialized_call_report['spawned_tasks']:
href_obj = dispatch.task_result_href(task)
task.update(href_obj)

# Use the object's serializer if it is a Mongoengine Document.
result = serialized_call_report.get('result')
Expand Down
2 changes: 2 additions & 0 deletions server/pulp/server/webservices/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@
url(r'^v2/tasks/$', tasks.TaskCollectionView.as_view(), name='task_collection'),
url(r'^v2/tasks/search/$', tasks.TaskSearchView.as_view(), name='task_search'),
url(r'^v2/tasks/(?P<task_id>[^/]+)/$', tasks.TaskResourceView.as_view(), name='task_resource'),
url(r'^v2/task_groups/(?P<group_id>[^/]+)/$',
task_groups.TaskGroupView.as_view(), name='task_group'),
url(r'^v2/task_groups/(?P<group_id>[^/]+)/state_summary/$',
task_groups.TaskGroupSummaryView.as_view(), name='task_group_summary'),
url(r'^v2/users/$', users.UsersView.as_view(), name='users'),
Expand Down
17 changes: 11 additions & 6 deletions server/pulp/server/webservices/views/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pulp.server.db import model
from pulp.server.db.model.criteria import Criteria, UnitAssociationCriteria
from pulp.server.managers import factory as manager_factory
from pulp.server.managers.consumer.applicability import regenerate_applicability_for_repos
from pulp.server.managers.consumer.applicability import ApplicabilityRegenerationManager
from pulp.server.managers.content.upload import import_uploaded_unit
from pulp.server.managers.repo.distributor import RepoDistributorManager
from pulp.server.managers.repo.unit_association import associate_from_repo, unassociate_by_criteria
Expand Down Expand Up @@ -927,6 +927,9 @@ def post(self, request):
child exceptions.
:raises pulp_exceptions.OperationPostponed: dispatch a task
"""
class GroupCallReport(dict):
def serialize(self):
return self

repo_criteria_body = request.body_as_json.get('repo_criteria', None)
if repo_criteria_body is None:
Expand All @@ -938,11 +941,13 @@ def post(self, request):
invalid_criteria.add_child_exception(e)
raise invalid_criteria

regeneration_tag = tags.action_tag('content_applicability_regeneration')
async_result = regenerate_applicability_for_repos.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_PROFILE_APPLICABILITY_TYPE, tags.RESOURCE_ANY_ID,
(repo_criteria.as_dict(),), tags=[regeneration_tag])
raise pulp_exceptions.OperationPostponed(async_result)
async_result = ApplicabilityRegenerationManager.queue_regenerate_applicability_for_repos(
repo_criteria.as_dict())
ret = GroupCallReport()
ret['group_id'] = str(async_result)
ret['_href'] = reverse('task_group', kwargs={'group_id': str(async_result)})

raise pulp_exceptions.OperationPostponed(ret)


class HistoryView(View):
Expand Down
22 changes: 22 additions & 0 deletions server/pulp/server/webservices/views/task_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,28 @@
from pulp.server.webservices.views.util import generate_json_response_with_pulp_encoder


class TaskGroupView(View):
"""
View for a task group.
"""

@auth_required(authorization.READ)
def get(self, request, group_id):
"""
Return a response containing a list of tasks for task group.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:param group_id: The ID of the task group you wish to summarize
:type group_id: basestring
:return: Response containing a list of the tasks in task group
:rtype : django.http.HttpResponse
:raises MissingResource: if group id is not found
"""
raise MissingResource(group_id)


class TaskGroupSummaryView(View):
"""
View for a task group summary.
Expand Down
Loading

0 comments on commit 0ecc2df

Please sign in to comment.