From 579c7004a655d19330af800577dc239d9f9aa6e6 Mon Sep 17 00:00:00 2001 From: Brian Bouterse Date: Fri, 3 Aug 2018 14:04:47 -0400 Subject: [PATCH] Final round of fixes * switch storage backend to DefaultStorage() * super() is now called first in anytime it's used * FirstStage is removed * BaseStage -> Stage * the relative path now handles duplicate rel_paths correctly * documenting args and kwargs I did some final hand testing, and I also ensure the docs build and look good. https://pulp.plan.io/issues/3844 re #3844 --- docs/plugins/plugin-api/stages.rst | 5 +- plugin/pulpcore/plugin/stages/__init__.py | 4 +- plugin/pulpcore/plugin/stages/api.py | 4 +- .../pulpcore/plugin/stages/artifact_stages.py | 17 ++++--- .../plugin/stages/association_stages.py | 14 ++++-- .../plugin/stages/content_unit_stages.py | 12 ++--- .../plugin/stages/declarative_version.py | 46 +++---------------- 7 files changed, 37 insertions(+), 65 deletions(-) diff --git a/docs/plugins/plugin-api/stages.rst b/docs/plugins/plugin-api/stages.rst index 93164bce64..5f87473b14 100644 --- a/docs/plugins/plugin-api/stages.rst +++ b/docs/plugins/plugin-api/stages.rst @@ -19,9 +19,6 @@ DeclarativeVersion .. autoclass:: pulpcore.plugin.stages.DeclarativeVersion -.. autoclass:: pulpcore.plugin.stages.FirstStage - :special-members: __call__ - .. autoclass:: pulpcore.plugin.stages.DeclarativeArtifact :no-members: @@ -36,7 +33,7 @@ Stages API .. autofunction:: pulpcore.plugin.stages.create_pipeline -.. autoclass:: pulpcore.plugin.stages.BaseStage +.. autoclass:: pulpcore.plugin.stages.Stage :special-members: __call__ .. autoclass:: pulpcore.plugin.stages.EndStage diff --git a/plugin/pulpcore/plugin/stages/__init__.py b/plugin/pulpcore/plugin/stages/__init__.py index 7496c1f38b..231df2fc75 100644 --- a/plugin/pulpcore/plugin/stages/__init__.py +++ b/plugin/pulpcore/plugin/stages/__init__.py @@ -1,6 +1,6 @@ -from .api import BaseStage, create_pipeline, EndStage # noqa +from .api import create_pipeline, EndStage, Stage # noqa from .artifact_stages import ArtifactDownloader, ArtifactSaver, QueryExistingArtifacts # noqa from .association_stages import ContentUnitAssociation, ContentUnitUnassociation # noqa from .content_unit_stages import ContentUnitSaver, QueryExistingContentUnits # noqa -from .declarative_version import DeclarativeVersion, FirstStage # noqa +from .declarative_version import DeclarativeVersion # noqa from .models import DeclarativeArtifact, DeclarativeContent # noqa diff --git a/plugin/pulpcore/plugin/stages/api.py b/plugin/pulpcore/plugin/stages/api.py index ef00f0815d..eab6bd7c40 100644 --- a/plugin/pulpcore/plugin/stages/api.py +++ b/plugin/pulpcore/plugin/stages/api.py @@ -2,7 +2,7 @@ from gettext import gettext as _ -class BaseStage: +class Stage: """ The base class for all Stages API stages. @@ -57,7 +57,7 @@ async def create_pipeline(stages, maxsize=100): await asyncio.gather(*futures) -class EndStage(BaseStage): +class EndStage(Stage): """ A Stages API stage that drains `in_q` and does nothing with the items. This is required at the end of all pipelines. diff --git a/plugin/pulpcore/plugin/stages/artifact_stages.py b/plugin/pulpcore/plugin/stages/artifact_stages.py index c25eaf5df0..3d798d9a5f 100644 --- a/plugin/pulpcore/plugin/stages/artifact_stages.py +++ b/plugin/pulpcore/plugin/stages/artifact_stages.py @@ -1,15 +1,15 @@ import asyncio from django.core.files import File -from django.core.files.storage import default_storage +from django.core.files.storage import DefaultStorage from django.db.models import Q from pulpcore.plugin.models import Artifact, ProgressBar -from .api import BaseStage +from .api import Stage -class QueryExistingArtifacts(BaseStage): +class QueryExistingArtifacts(Stage): """ A Stages API stage that replaces :attr:`DeclarativeContent.content` objects with already-saved :class:`~pulpcore.plugin.models.Artifact` objects. @@ -94,7 +94,7 @@ async def __call__(self, in_q, out_q): await out_q.put(None) -class ArtifactDownloader(BaseStage): +class ArtifactDownloader(Stage): """ A Stages API stage to download :class:`~pulpcore.plugin.models.Artifact` files, but don't save the :class:`~pulpcore.plugin.models.Artifact` in the db. @@ -116,11 +116,13 @@ class ArtifactDownloader(BaseStage): Args: max_concurrent_downloads (int): The maximum number of concurrent downloads this stage will run. Default is 100. + args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`. + kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`. """ def __init__(self, max_concurrent_downloads=100, *args, **kwargs): - self.max_concurrent_downloads = max_concurrent_downloads super().__init__(*args, **kwargs) + self.max_concurrent_downloads = max_concurrent_downloads async def __call__(self, in_q, out_q): """ @@ -212,7 +214,7 @@ async def return_content_for_downloader(c): await out_q.put(None) -class ArtifactSaver(BaseStage): +class ArtifactSaver(Stage): """ A Stages API stage that saves any unsaved :attr:`DeclarativeArtifact.artifact` objects. @@ -242,6 +244,7 @@ async def __call__(self, in_q, out_q): Returns: The coroutine for this stage. """ + storage_backend = DefaultStorage() shutdown = False batch = [] while not shutdown: @@ -267,7 +270,7 @@ async def __call__(self, in_q, out_q): dst_path = declarative_artifact.artifact.storage_path(None) with open(src_path, mode='rb') as input_file: django_file_obj = File(input_file) - default_storage.save(dst_path, django_file_obj) + storage_backend.save(dst_path, django_file_obj) declarative_artifact.artifact.file = dst_path artifacts_to_save.append(declarative_artifact.artifact) diff --git a/plugin/pulpcore/plugin/stages/association_stages.py b/plugin/pulpcore/plugin/stages/association_stages.py index 01f3ac65da..8d9af89fdc 100644 --- a/plugin/pulpcore/plugin/stages/association_stages.py +++ b/plugin/pulpcore/plugin/stages/association_stages.py @@ -5,10 +5,10 @@ from pulpcore.plugin.models import ProgressBar -from .api import BaseStage +from .api import Stage -class ContentUnitAssociation(BaseStage): +class ContentUnitAssociation(Stage): """ A Stages API stage that associates content units with `new_version`. @@ -22,15 +22,17 @@ class ContentUnitAssociation(BaseStage): Args: new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this stage associates content with. + args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`. + kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`. """ def __init__(self, new_version, *args, **kwargs): + super().__init__(*args, **kwargs) self.new_version = new_version self.unit_keys_by_type = defaultdict(set) for unit in self.new_version.content.all(): unit = unit.cast() self.unit_keys_by_type[type(unit)].add(unit.natural_key()) - super().__init__(*args, **kwargs) async def __call__(self, in_q, out_q): """ @@ -100,7 +102,7 @@ async def __call__(self, in_q, out_q): await out_q.put(None) -class ContentUnitUnassociation(BaseStage): +class ContentUnitUnassociation(Stage): """ A Stages API stage that unassociates content units from `new_version`. @@ -110,11 +112,13 @@ class ContentUnitUnassociation(BaseStage): Args: new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this stage unassociates content from. + args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`. + kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`. """ def __init__(self, new_version, *args, **kwargs): - self.new_version = new_version super().__init__(*args, **kwargs) + self.new_version = new_version async def __call__(self, in_q, out_q): """ diff --git a/plugin/pulpcore/plugin/stages/content_unit_stages.py b/plugin/pulpcore/plugin/stages/content_unit_stages.py index bc853f98fe..b8f75d3453 100644 --- a/plugin/pulpcore/plugin/stages/content_unit_stages.py +++ b/plugin/pulpcore/plugin/stages/content_unit_stages.py @@ -6,10 +6,10 @@ from pulpcore.plugin.models import ContentArtifact, RemoteArtifact -from .api import BaseStage +from .api import Stage -class QueryExistingContentUnits(BaseStage): +class QueryExistingContentUnits(Stage): """ A Stages API stage that saves :attr:`DeclarativeContent.content` objects and saves its related :class:`~pulpcore.plugin.models.ContentArtifact` and @@ -90,7 +90,7 @@ async def __call__(self, in_q, out_q): await out_q.put(None) -class ContentUnitSaver(BaseStage): +class ContentUnitSaver(Stage): """ A Stages API stage that saves :attr:`DeclarativeContent.content` objects and saves its related :class:`~pulpcore.plugin.models.ContentArtifact` and @@ -167,11 +167,11 @@ async def __call__(self, in_q, out_q): 'sha512': declarative_artifact.artifact.sha512, 'remote': declarative_artifact.remote, } - rel_path = content_artifact.relative_path - remote_artifact_map[rel_path] = remote_artifact_data + content_pk = content_artifact.content.pk + remote_artifact_map[content_pk] = remote_artifact_data for content_artifact in ContentArtifact.objects.bulk_create(content_artifact_bulk): - remote_artifact_data = remote_artifact_map.pop(content_artifact.relative_path) + remote_artifact_data = remote_artifact_map.pop(content_artifact.content.pk) new_remote_artifact = RemoteArtifact( content_artifact=content_artifact, **remote_artifact_data ) diff --git a/plugin/pulpcore/plugin/stages/declarative_version.py b/plugin/pulpcore/plugin/stages/declarative_version.py index a500ed1ee0..df45ce01e4 100644 --- a/plugin/pulpcore/plugin/stages/declarative_version.py +++ b/plugin/pulpcore/plugin/stages/declarative_version.py @@ -10,38 +10,6 @@ from .content_unit_stages import ContentUnitSaver, QueryExistingContentUnits -class FirstStage: - """ - Plugin writers subclass this to create their first stage for a - :class:`~pulpcore.plugin.stages.DeclarativeVersion` pipeline. - - To use this class, the plugin writer needs to: - - 1. Subclass it and implement the - :meth:`~pulpcore.plugin.stages.FirstStage.__call__` method. - 2. Pass the instantiated subclass to :class:`~pulpcore.plugin.stages.DeclarativeVersion`. - """ - - async def __call__(self, in_q, out_q): - """ - A Stages API compatible coroutine for :class:`~pulpcore.plugin.stages.DeclarativeVersion` to - use as the first stage. - - This must be implemented on the subclass. - - Args: - in_q (:class:`asyncio.Queue`): Unused because the first stage doesn't read from an input - queue. - out_q (:class:`asyncio.Queue`): The queue to put - :class:`~pulpcore.plugin.stages.DeclarativeContent` into. - - Returns: - A Stages API compatible coroutine for - :class:`~pulpcore.plugin.stages.DeclarativeVersion` to use as the first stage. - """ - raise NotImplementedError('A plugin writer needs to implement this') - - class DeclarativeVersion: def __init__(self, first_stage, repository, sync_mode='mirror'): @@ -66,13 +34,13 @@ def __init__(self, first_stage, repository, sync_mode='mirror'): 8. Unassociate any content units not declared in the stream (only when sync_mode='mirror') To do this, the plugin writer should subclass the - :class:`~pulpcore.plugin.stages.FirstStage` class and define its - :meth:`__call__()` interface which return a coroutine. This coroutine should + :class:`~pulpcore.plugin.stages.Stage` class and define its + :meth:`__call__()` interface which returns a coroutine. This coroutine should download metadata, create the corresponding :class:`~pulpcore.plugin.stages.DeclarativeContent` objects, and put them into the :class:`asyncio.Queue` to send them down the pipeline. For example: - >>> class MyFirstStage(FirstStage): + >>> class MyFirstStage(Stage): >>> >>> def __init__(remote): >>> self.remote = remote @@ -91,19 +59,19 @@ def __init__(self, first_stage, repository, sync_mode='mirror'): To use your first stage with the pipeline you have to instantiate the subclass and pass it to :class:`~pulpcore.plugin.stages.DeclarativeVersion`. - 1. Create the instance of the :class:`~pulpcore.plugin.stages.FirstStage` object subclass + 1. Create the instance of the subclassed :class:`~pulpcore.plugin.stages.Stage` object. 2. Create the :class:`~pulpcore.plugin.stages.DeclarativeVersion` instance, passing the - :class:`~pulpcore.plugin.stages.FirstStage` subclass instance to it + :class:`~pulpcore.plugin.stages.Stage` subclass instance to it 3. Call the :meth:`~pulpcore.plugin.stages.DeclarativeVersion.create` method on your :class:`~pulpcore.plugin.stages.DeclarativeVersion` instance Here is an example: - >>> first_stage = FileFirstStage(remote) + >>> first_stage = MyFirstStage(remote) >>> DeclarativeVersion(first_stage, repository).create() Args: - first_stage (:class:`~pulpcore.plugin.stages.FirstStage`): The first stage to receive + first_stage (:class:`~pulpcore.plugin.stages.Stage`): The first stage to receive :class:`~pulpcore.plugin.stages.DeclarativeContent` from. repository (:class:`~pulpcore.plugin.models.Repository`): The repository receiving the new version.