Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.
/ pulp Public archive

Commit

Permalink
Port sync task for Pulp 3.0
Browse files Browse the repository at this point in the history
closes #2399
  • Loading branch information
asmacdo committed May 3, 2017
1 parent 32db142 commit 8401606
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 178 deletions.
3 changes: 1 addition & 2 deletions platform/pulp/app/serializers/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ class Meta:
fields = MasterModelSerializer.Meta.fields + (
'name', 'last_updated', 'feed_url', 'validate', 'ssl_ca_certificate',
'ssl_client_certificate', 'ssl_client_key', 'ssl_validation', 'proxy_url',
'basic_auth_user', 'basic_auth_password', 'max_download_bandwidth',
'max_concurrent_downloads', 'download_policy', 'last_sync', 'repository',
'basic_auth_user', 'basic_auth_password', 'download_policy', 'last_sync', 'repository',
)


Expand Down
28 changes: 28 additions & 0 deletions platform/pulp/app/tasks/importer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from gettext import gettext as _
import os

from celery import shared_task

from pulp.app import models
from pulp.tasking.services import storage
from pulp.tasking.tasks import UserFacingTask


Expand All @@ -15,3 +19,27 @@ def delete(repo_name, importer_name):
:type importer_name: str
"""
models.Importer.objects.filter(name=importer_name, repository__name=repo_name).delete()


@shared_task(base=UserFacingTask)
def sync(repo_name, importer_name):
"""
Call sync on the importer defined by a plugin.
Check that the importer has a feed_url, which is necessary to sync. A working directory
is prepared, the plugin's sync is called, and then working directory is removed.
Args:
repo_name (basestring): unique name to specify the repository.
importer_name (basestring): name to specify the Importer.
Raises:
ValueError: When feed_url is empty.
"""
importer = models.Importer.objects.get(name=importer_name, repository__name=repo_name).cast()
if not importer.feed_url:
raise ValueError_("An importer must have a 'feed_url' attribute to sync.")

with storage.working_dir_context() as working_dir:
importer.working_dir = working_dir
importer.sync()
39 changes: 5 additions & 34 deletions platform/pulp/plugin/models/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ class Importer(PlatformImporter):
* Plugin specific sync functionality
* Add persistent data attributes for a plugin importer subclass
* Additional validation of plugin importer subclass configuration
The sync implementation is provided by :meth: `Importer.sync` which provides more details.
Failing to implement this method will prevent sync functionality for this plugin type.
Expand All @@ -19,18 +18,13 @@ class Importer(PlatformImporter):
additional persistent importer data by subclassing this object and adding Django fields. We
defer to the Django docs on extending this model definition with additional fields.
Validation is done the Django way, so custom validation can also be added the same as any
Django model. We defer to the Django docs on adding custom validation to the subclassed
importer. If any of the model validation methods are overridden, be sure to call super() so the
platform can still perform its validation.
Instantiation and calling of the subclassed plugin Importer is described in detail in the
:meth: `Importer.sync` method.
Validation of the importer is done at the API level by a plugin defined subclass of
:class: `pulp.plugin.serializers.repository.ImporterSerializer`.
"""

def sync(self):
"""
Perform a sync
Perform a sync.
It is expected that plugins wanting to support sync will provide an implementation on the
subclassed Importer.
Expand All @@ -39,31 +33,8 @@ def sync(self):
platform :class: `pulp.app.models.Importer` base attributes and any custom attributes
defined by the subclass.
The model attributes were loaded from the database and then had the user specified override
config applied on top. Additionally the importer is read-only and prevents the saving of
changes to the Importer instance.
Instantiation and calling of the sync method by the platform is roughly done with the
following:
1. The plugin provides an implementation called WidgetImporter which subclasses Importer
2. The user makes a call to sync widget_importer (say id=10) with some override config
3. The platform loads the saved
>>> wi = WidgetImporter.objects.get(id=10)
4. The platform puts the WidgetImporter into read-only mode
5. The override config values are written over the in memory WidgetImporter
6. Call the full_clean() method on the Django model for validation
>>> wi.full_clean()
7. Call into the sync method
>>> wi.sync()
Instantiation and calling of the sync method by the platform is defined by
:meth: `pulp.app.tasks.importer.sync`.
Subclasses are designed to override this default implementation and should not call super().
"""
Expand Down
14 changes: 13 additions & 1 deletion platform/pulp/tasking/services/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import shutil
import stat
from contextlib import suppress
from contextlib import contextmanager, suppress

from celery import task
from django.conf import settings as pulp_settings
Expand Down Expand Up @@ -137,3 +137,15 @@ def _rmtree_fix_permissions(directory_path):
shutil.rmtree(directory_path)
else:
raise

@contextmanager
def working_dir_context():
"""
Prepares a working directory that is removed after the context has ended.
"""
try:
working_dir = get_working_directory()
os.chdir(working_dir)
yield working_dir
finally:
delete_working_directory()
4 changes: 0 additions & 4 deletions platform/pulp/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,6 @@ def on_success(self, retval, task_id, args, kwargs):

self._handle_cProfile(task_id)

storage.delete_working_directory()

def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
Update the :class:`pulp.app.models.Task` object, log, and save the results.
Expand Down Expand Up @@ -325,8 +323,6 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):

self._handle_cProfile(task_id)

storage.delete_working_directory()

def _get_parent_arg(self):
"""Return a dictionary with the 'parent' set if running inside of a UserFacingTask"""
parent_arg = {}
Expand Down
137 changes: 0 additions & 137 deletions server/pulp/server/controllers/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,143 +591,6 @@ def update_last_unit_removed(repo_id):
repo_obj.save()


@celery.task(base=PulpTask, name='pulp.server.tasks.repository.sync_with_auto_publish')
def queue_sync_with_auto_publish(repo_id, overrides=None, scheduled_call_id=None):
"""
Sync a repository and upon successful completion, publish any distributors that are configured
for auto publish.
:param repo_id: id of the repository to create a sync call request list for
:type repo_id: str
:param overrides: dictionary of configuration overrides for this sync
:type overrides: dict or None
:param scheduled_call_id: id of scheduled call that dispatched this task
:type scheduled_call_id: str
"""
kwargs = {'repo_id': repo_id, 'sync_config_override': overrides,
'scheduled_call_id': scheduled_call_id}
tags = [resource_tag(RESOURCE_REPOSITORY_TYPE, repo_id), action_tag('sync')]
sync.apply_async_with_reservation(RESOURCE_REPOSITORY_TYPE, repo_id, tags=tags, kwargs=kwargs)


@celery.task(base=UserFacingTask, name='pulp.server.managers.repo.sync.sync')
def sync(repo_id, sync_config_override=None, scheduled_call_id=None):
"""
Performs a synchronize operation on the given repository and triggers publishes for
distributors with auto-publish enabled.
The given repo must have an importer configured. This method is intentionally limited to
synchronizing a single repo. Performing multiple repository syncs concurrently will require a
more global view of the server and must be handled outside the scope of this class.
:param repo_id: identifies the repo to sync
:type repo_id: str
:param sync_config_override: optional config containing values to use for this sync only
:type sync_config_override: dict
:param scheduled_call_id: id of scheduled call that dispatched this task
:type scheduled_call_id: str
:return: Dictionary containing sync results.
:rtype: dict
:raise pulp_exceptions.MissingResource: if specified repo does not exist, or it does not have
an importer and associated plugin
:raise pulp_exceptions.PulpExecutionException: if the task fails.
"""

repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
transfer_repo = repo_obj.to_transfer_repo()

repo_importer = model.Importer.objects.get_or_404(repo_id=repo_id)
try:
importer, imp_config = plugin_api.get_importer_by_id(repo_importer.importer_type_id)
except plugin_exceptions.PluginNotFound:
raise pulp_exceptions.MissingResource(repository=repo_id)

importer_config = importer_controller.clean_config_dict(copy.deepcopy(repo_importer.config))
call_config = PluginCallConfiguration(imp_config, importer_config, sync_config_override)
transfer_repo.working_dir = get_working_directory()
conduit = RepoSyncConduit(repo_id, repo_importer.importer_type_id, repo_importer.id)
sync_result_collection = RepoSyncResult.get_collection()

# Fire an events around the call
fire_manager = manager_factory.event_fire_manager()
fire_manager.fire_repo_sync_started(repo_id)

before_sync_unit_count = model.RepositoryContentUnit.objects(repo_id=repo_id).count()

# Perform the sync
sync_start_timestamp = _now_timestamp()
sync_result = None

try:
# Replace the Importer's sync_repo() method with our register_sigterm_handler decorator,
# which will set up cancel_sync_repo() as the target for the signal handler
sync_repo = register_sigterm_handler(importer.sync_repo, importer.cancel_sync_repo)
sync_report = sync_repo(transfer_repo, conduit, call_config)

except Exception, e:
sync_end_timestamp = _now_timestamp()
sync_result = RepoSyncResult.error_result(
repo_obj.repo_id, repo_importer['id'], repo_importer['importer_type_id'],
sync_start_timestamp, sync_end_timestamp, e, sys.exc_info()[2])
raise

else:
# Need to be safe here in case the plugin is incorrect in its return
if isinstance(sync_report, SyncReport):
summary = sync_report.summary
details = sync_report.details

if sync_report.canceled_flag:
# need to leave this in case cancel_sync_repo() was not called from parent
result_code = RepoSyncResult.RESULT_CANCELED
elif sync_report.success_flag:
result_code = RepoSyncResult.RESULT_SUCCESS
else:
result_code = RepoSyncResult.RESULT_FAILED

else:
msg = _('Plugin type [%s] on repo [%s] did not return a valid sync report')
_logger.warn(msg % (repo_importer['importer_type_id'], repo_obj.repo_id))
summary = details = msg
result_code = RepoSyncResult.RESULT_ERROR # RESULT_UNKNOWN?

sync_result, sync_end_timestamp = _reposync_result(repo_obj, repo_importer,
sync_start_timestamp, summary, details,
result_code, before_sync_unit_count)
finally:
if sync_result is None:
msg = _('Sync was cancelled')
summary = details = msg
result_code = RepoSyncResult.RESULT_CANCELED
sync_result, sync_end_timestamp = _reposync_result(repo_obj, repo_importer,
sync_start_timestamp, summary,
details, result_code,
before_sync_unit_count)
# Update the override config if it has changed
if check_override_config_change(repo_id, call_config):
model.Importer.objects(repo_id=repo_id).\
update(set__last_override_config=call_config.override_config)
# Do an update instead of a save in case the importer has changed the scratchpad
model.Importer.objects(repo_id=repo_obj.repo_id).update(set__last_sync=sync_end_timestamp)
# Add a sync history entry for this run
sync_result_collection.save(sync_result)
# Ensure counts are updated
rebuild_content_unit_counts(repo_obj)
if sync_result['added_count'] > 0:
update_last_unit_added(repo_obj.repo_id)
if sync_result['removed_count'] > 0:
update_last_unit_removed(repo_obj.repo_id)

fire_manager.fire_repo_sync_finished(sync_result)
if sync_result.result == RepoSyncResult.RESULT_FAILED:
raise pulp_exceptions.PulpExecutionException(_('Importer indicated a failed response'))

_queue_auto_publish_tasks(repo_obj.repo_id, scheduled_call_id=scheduled_call_id)
return sync_result


def check_unit_removed_since_last_sync(conduit, repo_id):
"""
Checks whether a content unit has been removed since the last_sync timestamp.
Expand Down

0 comments on commit 8401606

Please sign in to comment.