diff --git a/docs/dev-guide/conventions/sync-v-async.rst b/docs/dev-guide/conventions/sync-v-async.rst index 13ca6e4052..f1d46d38b3 100644 --- a/docs/dev-guide/conventions/sync-v-async.rst +++ b/docs/dev-guide/conventions/sync-v-async.rst @@ -15,10 +15,16 @@ Any REST API will return one of three responses: A success response indicates no conflicts were detected and the REST call executed. This is what is typically expected from a REST API call. -A postponed response indicates that some portion of the command has been -queued to execute asynchronously. In this case a :ref:`call_report` will be returned -with the results of the synchronously executed portion of the command, if there are any, -and a list of the tasks that have been spawned to complete the work in the future. +A postponed response indicates that some portion of the command has been queued to execute +asynchronously. In this case a :ref:`call_report` or a :ref:`group_call_report` will be returned +with the results of the synchronously executed portion of the command. + +When a single task is queued or a task with follow up tasks, such as a publish after a sync or unbind +consumers after repo delete, a :ref:`call_report` is returned. The :ref:`call_report` will contain +a list of spawned tasks. + +When some number of related tasks are queued, and it is useful to track the state of all of them +together, a :ref:`group_call_report` is returned. More information on retrieving and displaying task information can be found :ref:`in the Task Management API documentation `. @@ -50,3 +56,24 @@ Example Call Report:: "spawned_tasks": [{"_href": "/pulp/api/v2/tasks/7744e2df-39b9-46f0-bb10-feffa2f7014b/", "task_id": "7744e2df-39b9-46f0-bb10-feffa2f7014b" }] } + +.. _group_call_report: + +Group Call Report +----------------- + +A 202 ACCEPTED response returns a **Group Call Report** JSON object as the response body that has +the following fields: + +* **_href** - Path to the root of task group resource. However, this API endpoint currently + returns 404 in all cases. You can append `state-summary/` to the URL and perform a GET request + to retrieve a :ref:`task_group_summary`. +* **group_id** - UUID of the group that all the dispatched tasks belong to. + +Example Group Call Report:: + + { + "_href": "/pulp/api/v2/task_groups/16412fcb-06fa-4caa-818b-b103e2a9bf44/", + "group_id": "16412fcb-06fa-4caa-818b-b103e2a9bf44" + } + diff --git a/docs/dev-guide/integration/rest-api/consumer/applicability.rst b/docs/dev-guide/integration/rest-api/consumer/applicability.rst index d1eaaa2f11..b95dd596be 100644 --- a/docs/dev-guide/integration/rest-api/consumer/applicability.rst +++ b/docs/dev-guide/integration/rest-api/consumer/applicability.rst @@ -59,11 +59,11 @@ to the given repositories, applicability data is generated for them as well. Generated applicability data can be queried using the `Query Content Applicability` API described below. -The API will return a :ref:`call_report`. Users can check whether the applicability -generation is completed using task id in the :ref:`call_report`. You can run -a single applicability generation task at a time. If an applicability generation -task is running, any new applicability generation tasks requested are queued -and postponed until the current task is completed. +The API will return a :ref:`group_call_report`. Users can check whether the applicability +generation is completed using group id in the :ref:`group_call_report`. The `_href` in the +:ref:`group_call_report` will point to the root of `task_group` resource. However, this API +endpoint currently returns 404 in all cases. You can append '/state-summary/' to the URL and +perform a GET request to retrieve a :ref:`task_group_summary`. | :method:`post` | :path:`/v2/repositories/actions/content/regenerate_applicability/` @@ -77,7 +77,7 @@ and postponed until the current task is completed. * :response_code:`202,if applicability regeneration is queued successfully` * :response_code:`400,if one or more of the parameters is invalid` -| :return:a :ref:`call_report` representing the current state of the applicability regeneration +| :return: a :ref:`group_call_report` representing the current state of the applicability regeneration :sample_request:`_` :: @@ -87,8 +87,14 @@ and postponed until the current task is completed. } } -**Tags:** -The task created will have the following tag: ``"pulp:action:content_applicability_regeneration"`` + +:sample_response:`202` :: + + { + "_href": "/pulp/api/v2/task_groups/16412fcb-06fa-4caa-818b-b103e2a9bf44/", + "group_id": "16412fcb-06fa-4caa-818b-b103e2a9bf44" + } + Generate Content Applicability for a single Consumer ---------------------------------------------------- @@ -114,7 +120,7 @@ are queued and postponed until the current task is completed. * :response_code:`202,if applicability regeneration is queued successfully` * :response_code:`404,if a consumer with given consumer_id does not exist` -| :return:a :ref:`call_report` representing the current state of the applicability regeneration +| :return: a :ref:`call_report` representing the current state of the applicability regeneration **Tags:** The task created will have the following tag: ``"pulp:action:consumer_content_applicability_regeneration"`` diff --git a/docs/user-guide/release-notes/master.rst b/docs/user-guide/release-notes/master.rst index 8fbc567d33..f3ccaed0af 100644 --- a/docs/user-guide/release-notes/master.rst +++ b/docs/user-guide/release-notes/master.rst @@ -15,6 +15,8 @@ New Features store rather than the certificate authority trust store bundled with ``python-requests``. +* Content applicability for an updated repository is calculated in parallel. + Deprecation ----------- @@ -71,6 +73,9 @@ Rest API Changes * Tasks with complete states (except `canceled` state) can now be deleted. +* The API for regenerating content applicability for updated repositories no longer returns a + :ref:`call_report`. Instead a :ref:`group_call_report` is returned. + Binding API Changes ------------------- diff --git a/server/pulp/server/managers/consumer/applicability.py b/server/pulp/server/managers/consumer/applicability.py index 4257dd341c..869733b1a6 100644 --- a/server/pulp/server/managers/consumer/applicability.py +++ b/server/pulp/server/managers/consumer/applicability.py @@ -4,6 +4,7 @@ from gettext import gettext as _ from logging import getLogger +from uuid import uuid4 from celery import task @@ -17,6 +18,7 @@ from pulp.server.db.model.criteria import Criteria from pulp.server.managers import factory as managers from pulp.server.managers.consumer.query import ConsumerQueryManager +from pulp.plugins.util.misc import paginate _logger = getLogger(__name__) @@ -105,9 +107,10 @@ def regenerate_applicability_for_consumers(consumer_criteria): manager.regenerate_applicability(profile_hash, content_type, profile_id, repo_id) @staticmethod - def regenerate_applicability_for_repos(repo_criteria): + def queue_regenerate_applicability_for_repos(repo_criteria): """ - Regenerate and save applicability data affected by given updated repositories. + Queue a group of tasks to generate and save applicability data affected by given updated + repositories. :param repo_criteria: The repo selection criteria :type repo_criteria: dict @@ -118,29 +121,46 @@ def regenerate_applicability_for_repos(repo_criteria): repo_criteria.fields = ['id'] repo_ids = [r.repo_id for r in model.Repository.objects.find_by_criteria(repo_criteria)] + task_group_id = uuid4() + for repo_id in repo_ids: - # Find all existing applicabilities for given repo_id. Setting batch size of 5 ensures - # the MongoDB cursor does not time out. See https://pulp.plan.io/issues/998#note-6 for - # more details. - existing_applicabilities = RepoProfileApplicability.get_collection().find( - {'repo_id': repo_id}).batch_size(5) - for existing_applicability in existing_applicabilities: + existing_applicability_ids = RepoProfileApplicability.get_collection().find( + {'repo_id': repo_id}, {'_id': 1}) + for batch in paginate(existing_applicability_ids, 10): + batch_regenerate_applicability_task.apply_async((repo_id, batch), + **{'group_id': task_group_id}) + return task_group_id + + @staticmethod + def batch_regenerate_applicability(repo_id, existing_applicability_ids): + """ + Regenerate and save applicability data for a batch of existing applicabilities + + :param repo_id: Repository id for which applicability is being calculated + :type repo_id: str + :param existing_applicability_ids: Tuple of Object Ids for applicability profiles + :type existing_applicability_ids: tuple of dicts in form of {"_id": ObjectID('mongo-id')} + """ + id_list = [id['_id'] for id in existing_applicability_ids] + existing_applicabilities = RepoProfileApplicability.get_collection().find( + {"_id": {"$in": id_list}}) + for existing_applicability in existing_applicabilities: # Convert cursor to RepoProfileApplicability object - existing_applicability = RepoProfileApplicability(**dict(existing_applicability)) - profile_hash = existing_applicability['profile_hash'] - unit_profile = UnitProfile.get_collection().find_one({'profile_hash': profile_hash}, - fields=['id', 'content_type']) - if unit_profile is None: - # Unit profiles change whenever packages are installed or removed on consumers, - # and it is possible that existing_applicability references a UnitProfile - # that no longer exists. This is harmless, as Pulp has a monthly cleanup task - # that will identify these dangling references and remove them. - continue - - # Regenerate applicability data for given unit_profile and repo id - ApplicabilityRegenerationManager.regenerate_applicability( - profile_hash, unit_profile['content_type'], unit_profile['id'], repo_id, - existing_applicability) + existing_applicability = RepoProfileApplicability(**dict(existing_applicability)) + profile_hash = existing_applicability['profile_hash'] + unit_profile = UnitProfile.get_collection().find_one({'profile_hash': profile_hash}, + fields=['id', 'content_type']) + if unit_profile is None: + # Unit profiles change whenever packages are installed or removed on consumers, + # and it is possible that existing_applicability references a UnitProfile + # that no longer exists. This is harmless, as Pulp has a monthly cleanup task + # that will identify these dangling references and remove them. + continue + + # Regenerate applicability data for given unit_profile and repo id + ApplicabilityRegenerationManager.regenerate_applicability( + profile_hash, unit_profile['content_type'], unit_profile['id'], repo_id, + existing_applicability) @staticmethod def regenerate_applicability(profile_hash, content_type, profile_id, @@ -223,7 +243,7 @@ def _get_existing_repo_content_types(repo_id): :return: A list of content type ids that have unit counts greater than 0 :rtype: list """ - repo_obj = model.Repository.objects.first(repo_id) + repo_obj = model.Repository.objects.get(repo_id=repo_id) if not repo_obj: return [] @@ -273,9 +293,9 @@ def _profiler(type_id): regenerate_applicability_for_consumers = task( ApplicabilityRegenerationManager.regenerate_applicability_for_consumers, base=Task, ignore_result=True) -regenerate_applicability_for_repos = task( - ApplicabilityRegenerationManager.regenerate_applicability_for_repos, base=Task, - ignore_result=True) +batch_regenerate_applicability_task = task( + ApplicabilityRegenerationManager.batch_regenerate_applicability, base=Task, + ignore_results=True) class DoesNotExist(Exception): diff --git a/server/pulp/server/webservices/middleware/postponed.py b/server/pulp/server/webservices/middleware/postponed.py index 7ea47023f9..01df28dd3e 100644 --- a/server/pulp/server/webservices/middleware/postponed.py +++ b/server/pulp/server/webservices/middleware/postponed.py @@ -24,9 +24,10 @@ def _get_operation_postponed_body(exception): if isinstance(exception.call_report, AsyncResult): report = TaskResult.from_async_result(exception.call_report) serialized_call_report = report.serialize() - for task in serialized_call_report['spawned_tasks']: - href_obj = dispatch.task_result_href(task) - task.update(href_obj) + if 'spawned_tasks' in serialized_call_report: + for task in serialized_call_report['spawned_tasks']: + href_obj = dispatch.task_result_href(task) + task.update(href_obj) # Use the object's serializer if it is a Mongoengine Document. result = serialized_call_report.get('result') diff --git a/server/pulp/server/webservices/urls.py b/server/pulp/server/webservices/urls.py index ab046eadef..64cc54dd4c 100644 --- a/server/pulp/server/webservices/urls.py +++ b/server/pulp/server/webservices/urls.py @@ -244,6 +244,8 @@ url(r'^v2/tasks/$', tasks.TaskCollectionView.as_view(), name='task_collection'), url(r'^v2/tasks/search/$', tasks.TaskSearchView.as_view(), name='task_search'), url(r'^v2/tasks/(?P[^/]+)/$', tasks.TaskResourceView.as_view(), name='task_resource'), + url(r'^v2/task_groups/(?P[^/]+)/$', + task_groups.TaskGroupView.as_view(), name='task_group'), url(r'^v2/task_groups/(?P[^/]+)/state_summary/$', task_groups.TaskGroupSummaryView.as_view(), name='task_group_summary'), url(r'^v2/users/$', users.UsersView.as_view(), name='users'), diff --git a/server/pulp/server/webservices/views/repositories.py b/server/pulp/server/webservices/views/repositories.py index d81affa1a8..f5a8607532 100644 --- a/server/pulp/server/webservices/views/repositories.py +++ b/server/pulp/server/webservices/views/repositories.py @@ -12,7 +12,7 @@ from pulp.server.db import model from pulp.server.db.model.criteria import Criteria, UnitAssociationCriteria from pulp.server.managers import factory as manager_factory -from pulp.server.managers.consumer.applicability import regenerate_applicability_for_repos +from pulp.server.managers.consumer.applicability import ApplicabilityRegenerationManager from pulp.server.managers.content.upload import import_uploaded_unit from pulp.server.managers.repo.distributor import RepoDistributorManager from pulp.server.managers.repo.unit_association import associate_from_repo, unassociate_by_criteria @@ -927,6 +927,9 @@ def post(self, request): child exceptions. :raises pulp_exceptions.OperationPostponed: dispatch a task """ + class GroupCallReport(dict): + def serialize(self): + return self repo_criteria_body = request.body_as_json.get('repo_criteria', None) if repo_criteria_body is None: @@ -938,11 +941,13 @@ def post(self, request): invalid_criteria.add_child_exception(e) raise invalid_criteria - regeneration_tag = tags.action_tag('content_applicability_regeneration') - async_result = regenerate_applicability_for_repos.apply_async_with_reservation( - tags.RESOURCE_REPOSITORY_PROFILE_APPLICABILITY_TYPE, tags.RESOURCE_ANY_ID, - (repo_criteria.as_dict(),), tags=[regeneration_tag]) - raise pulp_exceptions.OperationPostponed(async_result) + async_result = ApplicabilityRegenerationManager.queue_regenerate_applicability_for_repos( + repo_criteria.as_dict()) + ret = GroupCallReport() + ret['group_id'] = str(async_result) + ret['_href'] = reverse('task_group', kwargs={'group_id': str(async_result)}) + + raise pulp_exceptions.OperationPostponed(ret) class HistoryView(View): diff --git a/server/pulp/server/webservices/views/task_groups.py b/server/pulp/server/webservices/views/task_groups.py index c81026e0ab..ec9aa89b5d 100644 --- a/server/pulp/server/webservices/views/task_groups.py +++ b/server/pulp/server/webservices/views/task_groups.py @@ -11,6 +11,28 @@ from pulp.server.webservices.views.util import generate_json_response_with_pulp_encoder +class TaskGroupView(View): + """ + View for a task group. + """ + + @auth_required(authorization.READ) + def get(self, request, group_id): + """ + Return a response containing a list of tasks for task group. + + :param request: WSGI request object + :type request: django.core.handlers.wsgi.WSGIRequest + :param group_id: The ID of the task group you wish to summarize + :type group_id: basestring + + :return: Response containing a list of the tasks in task group + :rtype : django.http.HttpResponse + :raises MissingResource: if group id is not found + """ + raise MissingResource(group_id) + + class TaskGroupSummaryView(View): """ View for a task group summary. diff --git a/server/test/unit/server/managers/consumer/test_applicability.py b/server/test/unit/server/managers/consumer/test_applicability.py index b6f2921440..bc4321755f 100644 --- a/server/test/unit/server/managers/consumer/test_applicability.py +++ b/server/test/unit/server/managers/consumer/test_applicability.py @@ -189,7 +189,7 @@ def test_regenerate_applicability_for_repos_with_different_consumer_profiles( self.populate_bindings() manager = factory.applicability_regeneration_manager() manager.regenerate_applicability_for_consumers(self.CONSUMER_CRITERIA) - manager.regenerate_applicability_for_repos(self.REPO_CRITERIA) + manager.queue_regenerate_applicability_for_repos(self.REPO_CRITERIA) # Verify applicability_list = list(RepoProfileApplicability.get_collection().find()) self.assertEqual(len(applicability_list), 4) @@ -204,7 +204,7 @@ def test_regenerate_applicability_for_repos_with_same_consumer_profiles(self, mo self.populate_bindings() manager = factory.applicability_regeneration_manager() manager.regenerate_applicability_for_consumers(self.CONSUMER_CRITERIA) - manager.regenerate_applicability_for_repos(self.REPO_CRITERIA) + manager.queue_regenerate_applicability_for_repos(self.REPO_CRITERIA) # Verify applicability_list = list(RepoProfileApplicability.get_collection().find()) self.assertEqual(len(applicability_list), 2) @@ -219,7 +219,7 @@ def test_regenerate_applicability_for_empty_repo_criteria(self, mock_repo_qs): self.populate_bindings() manager = factory.applicability_regeneration_manager() manager.regenerate_applicability_for_consumers(self.CONSUMER_CRITERIA) - manager.regenerate_applicability_for_repos(Criteria()) + manager.queue_regenerate_applicability_for_repos(Criteria()) # Verify applicability_list = list(RepoProfileApplicability.get_collection().find()) self.assertEqual(len(applicability_list), 2) @@ -232,7 +232,7 @@ def test_regenerate_applicability_for_empty_repo_criteria(self, mock_repo_qs): def test_regenerate_applicability_for_repo_criteria_no_bindings(self, mock_repo_qs): self.populate_repos() manager = factory.applicability_regeneration_manager() - manager.regenerate_applicability_for_repos(self.REPO_CRITERIA) + manager.queue_regenerate_applicability_for_repos(self.REPO_CRITERIA) # Verify applicability_list = list(RepoProfileApplicability.get_collection().find()) self.assertEqual(applicability_list, []) @@ -245,7 +245,7 @@ def test_regenerate_applicability_for_repos_profiler_notfound(self, mock_repo_qs profiler.calculate_applicable_units = mock.Mock(side_effect=NotImplementedError()) # Test manager = factory.applicability_regeneration_manager() - manager.regenerate_applicability_for_repos(self.REPO_CRITERIA) + manager.queue_regenerate_applicability_for_repos(self.REPO_CRITERIA) # Verify applicability_list = list(RepoProfileApplicability.get_collection().find()) self.assertEqual(len(applicability_list), 0) @@ -272,7 +272,7 @@ def test_regenerate_applicability_for_repos_consumer_profile_updated(self, mock_ profile_manager = factory.consumer_profile_manager() profile_manager.update(self.CONSUMER_IDS[0], 'rpm', {'name': 'zsh', 'version': '1.0'}) # Request applicability regeneration for the repo and assert that no exception is raised - applicability_manager.regenerate_applicability_for_repos(self.REPO_CRITERIA) + applicability_manager.queue_regenerate_applicability_for_repos(self.REPO_CRITERIA) applicability_list = list(RepoProfileApplicability.get_collection().find()) self.assertEqual(len(applicability_list), 1) @@ -282,21 +282,21 @@ def test_regenerate_applicability_for_repos_consumer_profile_updated(self, mock_ @mock.patch('pulp.server.managers.consumer.applicability.model.Repository.objects') @mock.patch('pulp.server.db.model.consumer.RepoProfileApplicability.get_collection') - def test_regenerate_applicability_for_repos_batch_size(self, mock_get_collection, mock_repo_qs): + @mock.patch('pulp.server.db.model.consumer.UnitProfile.get_collection') + def test_batch_regenerate_applicability(self, mock_unit_profile_get_collection, + mock_repo_profile_app_get_collection, mock_repo_qs): factory.initialize() applicability_manager = ApplicabilityRegenerationManager() - repo_criteria = {'filters': None, 'sort': None, 'limit': None, - 'skip': None, 'fields': None} mock_repo = mock.MagicMock() mock_repo.repo_id = 'fake-repo' mock_repo_qs.find_by_criteria.return_value = [mock_repo] - - applicability_manager.regenerate_applicability_for_repos(repo_criteria) + existing_ids = ({'_id': 'mock-object-id'}, {'_id': 'mock-object-id-2'}) + applicability_manager.batch_regenerate_applicability('mock_repo', existing_ids) # validate that batch size of 5 is used - - mock_get_collection.return_value.find.return_value.batch_size.assert_called_with(5) + expected_params = {'_id': {'$in': ['mock-object-id', 'mock-object-id-2']}} + mock_repo_profile_app_get_collection.return_value.find.assert_called_with(expected_params) @mock.patch('pulp.server.managers.consumer.applicability.model.Repository.objects') def test_get_existing_repo_content_types_no_repo(self, mock_repo_qs): @@ -324,7 +324,7 @@ def test_get_existing_repo_content_types_repo_units(self, mock_repo_qs): """ Test that if a repository exists but has no units, return an empty list. """ - mock_repo = mock_repo_qs.first.return_value + mock_repo = mock_repo_qs.get.return_value mock_repo.content_unit_counts = {'mock_type_1': 4, 'mock_type_2': 8} content_types = self.old_get_existing('repo') self.assertListEqual(content_types, ['mock_type_2', 'mock_type_1']) diff --git a/server/test/unit/server/webservices/views/test_repositories.py b/server/test/unit/server/webservices/views/test_repositories.py index dab2b46925..6c8bb4d6be 100644 --- a/server/test/unit/server/webservices/views/test_repositories.py +++ b/server/test/unit/server/webservices/views/test_repositories.py @@ -1469,7 +1469,8 @@ class TestContentApplicabilityRegenerationView(unittest.TestCase): @mock.patch('pulp.server.webservices.views.decorators._verify_auth', new=assert_auth_CREATE()) - @mock.patch('pulp.server.webservices.views.repositories.regenerate_applicability_for_repos') + @mock.patch(('pulp.server.managers.consumer.applicability.ApplicabilityRegenerationManager.' + 'queue_regenerate_applicability_for_repos')) @mock.patch('pulp.server.webservices.views.repositories.tags') @mock.patch('pulp.server.webservices.views.repositories.Criteria.from_client_input') def test_post_with_expected_content(self, mock_crit, mock_tags, mock_regen): @@ -1488,10 +1489,7 @@ def test_post_with_expected_content(self, mock_crit, mock_tags, mock_regen): raise AssertionError('OperationPostponed should be raised for a regenerate task') self.assertEqual(response.http_status_code, 202) - mock_regen.apply_async_with_reservation.assert_called_once_with( - mock_tags.RESOURCE_REPOSITORY_PROFILE_APPLICABILITY_TYPE, mock_tags.RESOURCE_ANY_ID, - (mock_crit.return_value.as_dict(),), tags=[mock_tags.action_tag()] - ) + mock_regen.assert_called_once_with(mock_crit.return_value.as_dict()) @mock.patch('pulp.server.webservices.views.decorators._verify_auth', new=assert_auth_CREATE()) @@ -1517,7 +1515,8 @@ def test_post_with_invalid_repo_criteria(self, mock_crit): @mock.patch('pulp.server.webservices.views.decorators._verify_auth', new=assert_auth_CREATE()) - @mock.patch('pulp.server.webservices.views.repositories.regenerate_applicability_for_repos') + @mock.patch(('pulp.server.managers.consumer.applicability.ApplicabilityRegenerationManager.' + 'queue_regenerate_applicability_for_repos')) def test_post_without_repo_criteria(self, mock_crit): """ Test regenerate content applicability with missing repo_criteria.