Skip to content

Commit

Permalink
Final round of fixes
Browse files Browse the repository at this point in the history
* switch storage backend to DefaultStorage()
* super() is now called first in anytime it's used
* FirstStage is removed
* BaseStage -> Stage
* the relative path now handles duplicate rel_paths correctly
* documenting args and kwargs

I did some final hand testing, and I also ensure the docs build and look
good.

https://pulp.plan.io/issues/3844
re #3844
  • Loading branch information
Brian Bouterse committed Aug 3, 2018
1 parent 856d090 commit 579c700
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 65 deletions.
5 changes: 1 addition & 4 deletions docs/plugins/plugin-api/stages.rst
Expand Up @@ -19,9 +19,6 @@ DeclarativeVersion

.. autoclass:: pulpcore.plugin.stages.DeclarativeVersion

.. autoclass:: pulpcore.plugin.stages.FirstStage
:special-members: __call__

.. autoclass:: pulpcore.plugin.stages.DeclarativeArtifact
:no-members:

Expand All @@ -36,7 +33,7 @@ Stages API

.. autofunction:: pulpcore.plugin.stages.create_pipeline

.. autoclass:: pulpcore.plugin.stages.BaseStage
.. autoclass:: pulpcore.plugin.stages.Stage
:special-members: __call__

.. autoclass:: pulpcore.plugin.stages.EndStage
Expand Down
4 changes: 2 additions & 2 deletions plugin/pulpcore/plugin/stages/__init__.py
@@ -1,6 +1,6 @@
from .api import BaseStage, create_pipeline, EndStage # noqa
from .api import create_pipeline, EndStage, Stage # noqa
from .artifact_stages import ArtifactDownloader, ArtifactSaver, QueryExistingArtifacts # noqa
from .association_stages import ContentUnitAssociation, ContentUnitUnassociation # noqa
from .content_unit_stages import ContentUnitSaver, QueryExistingContentUnits # noqa
from .declarative_version import DeclarativeVersion, FirstStage # noqa
from .declarative_version import DeclarativeVersion # noqa
from .models import DeclarativeArtifact, DeclarativeContent # noqa
4 changes: 2 additions & 2 deletions plugin/pulpcore/plugin/stages/api.py
Expand Up @@ -2,7 +2,7 @@
from gettext import gettext as _


class BaseStage:
class Stage:
"""
The base class for all Stages API stages.
Expand Down Expand Up @@ -57,7 +57,7 @@ async def create_pipeline(stages, maxsize=100):
await asyncio.gather(*futures)


class EndStage(BaseStage):
class EndStage(Stage):
"""
A Stages API stage that drains `in_q` and does nothing with the items. This is required at the
end of all pipelines.
Expand Down
17 changes: 10 additions & 7 deletions plugin/pulpcore/plugin/stages/artifact_stages.py
@@ -1,15 +1,15 @@
import asyncio

from django.core.files import File
from django.core.files.storage import default_storage
from django.core.files.storage import DefaultStorage
from django.db.models import Q

from pulpcore.plugin.models import Artifact, ProgressBar

from .api import BaseStage
from .api import Stage


class QueryExistingArtifacts(BaseStage):
class QueryExistingArtifacts(Stage):
"""
A Stages API stage that replaces :attr:`DeclarativeContent.content` objects with already-saved
:class:`~pulpcore.plugin.models.Artifact` objects.
Expand Down Expand Up @@ -94,7 +94,7 @@ async def __call__(self, in_q, out_q):
await out_q.put(None)


class ArtifactDownloader(BaseStage):
class ArtifactDownloader(Stage):
"""
A Stages API stage to download :class:`~pulpcore.plugin.models.Artifact` files, but don't save
the :class:`~pulpcore.plugin.models.Artifact` in the db.
Expand All @@ -116,11 +116,13 @@ class ArtifactDownloader(BaseStage):
Args:
max_concurrent_downloads (int): The maximum number of concurrent downloads this stage will
run. Default is 100.
args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
"""

def __init__(self, max_concurrent_downloads=100, *args, **kwargs):
self.max_concurrent_downloads = max_concurrent_downloads
super().__init__(*args, **kwargs)
self.max_concurrent_downloads = max_concurrent_downloads

async def __call__(self, in_q, out_q):
"""
Expand Down Expand Up @@ -212,7 +214,7 @@ async def return_content_for_downloader(c):
await out_q.put(None)


class ArtifactSaver(BaseStage):
class ArtifactSaver(Stage):
"""
A Stages API stage that saves any unsaved :attr:`DeclarativeArtifact.artifact` objects.
Expand Down Expand Up @@ -242,6 +244,7 @@ async def __call__(self, in_q, out_q):
Returns:
The coroutine for this stage.
"""
storage_backend = DefaultStorage()
shutdown = False
batch = []
while not shutdown:
Expand All @@ -267,7 +270,7 @@ async def __call__(self, in_q, out_q):
dst_path = declarative_artifact.artifact.storage_path(None)
with open(src_path, mode='rb') as input_file:
django_file_obj = File(input_file)
default_storage.save(dst_path, django_file_obj)
storage_backend.save(dst_path, django_file_obj)
declarative_artifact.artifact.file = dst_path
artifacts_to_save.append(declarative_artifact.artifact)

Expand Down
14 changes: 9 additions & 5 deletions plugin/pulpcore/plugin/stages/association_stages.py
Expand Up @@ -5,10 +5,10 @@

from pulpcore.plugin.models import ProgressBar

from .api import BaseStage
from .api import Stage


class ContentUnitAssociation(BaseStage):
class ContentUnitAssociation(Stage):
"""
A Stages API stage that associates content units with `new_version`.
Expand All @@ -22,15 +22,17 @@ class ContentUnitAssociation(BaseStage):
Args:
new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this
stage associates content with.
args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
"""

def __init__(self, new_version, *args, **kwargs):
super().__init__(*args, **kwargs)
self.new_version = new_version
self.unit_keys_by_type = defaultdict(set)
for unit in self.new_version.content.all():
unit = unit.cast()
self.unit_keys_by_type[type(unit)].add(unit.natural_key())
super().__init__(*args, **kwargs)

async def __call__(self, in_q, out_q):
"""
Expand Down Expand Up @@ -100,7 +102,7 @@ async def __call__(self, in_q, out_q):
await out_q.put(None)


class ContentUnitUnassociation(BaseStage):
class ContentUnitUnassociation(Stage):
"""
A Stages API stage that unassociates content units from `new_version`.
Expand All @@ -110,11 +112,13 @@ class ContentUnitUnassociation(BaseStage):
Args:
new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this
stage unassociates content from.
args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
"""

def __init__(self, new_version, *args, **kwargs):
self.new_version = new_version
super().__init__(*args, **kwargs)
self.new_version = new_version

async def __call__(self, in_q, out_q):
"""
Expand Down
12 changes: 6 additions & 6 deletions plugin/pulpcore/plugin/stages/content_unit_stages.py
Expand Up @@ -6,10 +6,10 @@

from pulpcore.plugin.models import ContentArtifact, RemoteArtifact

from .api import BaseStage
from .api import Stage


class QueryExistingContentUnits(BaseStage):
class QueryExistingContentUnits(Stage):
"""
A Stages API stage that saves :attr:`DeclarativeContent.content` objects and saves its related
:class:`~pulpcore.plugin.models.ContentArtifact` and
Expand Down Expand Up @@ -90,7 +90,7 @@ async def __call__(self, in_q, out_q):
await out_q.put(None)


class ContentUnitSaver(BaseStage):
class ContentUnitSaver(Stage):
"""
A Stages API stage that saves :attr:`DeclarativeContent.content` objects and saves its related
:class:`~pulpcore.plugin.models.ContentArtifact` and
Expand Down Expand Up @@ -167,11 +167,11 @@ async def __call__(self, in_q, out_q):
'sha512': declarative_artifact.artifact.sha512,
'remote': declarative_artifact.remote,
}
rel_path = content_artifact.relative_path
remote_artifact_map[rel_path] = remote_artifact_data
content_pk = content_artifact.content.pk
remote_artifact_map[content_pk] = remote_artifact_data

for content_artifact in ContentArtifact.objects.bulk_create(content_artifact_bulk):
remote_artifact_data = remote_artifact_map.pop(content_artifact.relative_path)
remote_artifact_data = remote_artifact_map.pop(content_artifact.content.pk)
new_remote_artifact = RemoteArtifact(
content_artifact=content_artifact, **remote_artifact_data
)
Expand Down
46 changes: 7 additions & 39 deletions plugin/pulpcore/plugin/stages/declarative_version.py
Expand Up @@ -10,38 +10,6 @@
from .content_unit_stages import ContentUnitSaver, QueryExistingContentUnits


class FirstStage:
"""
Plugin writers subclass this to create their first stage for a
:class:`~pulpcore.plugin.stages.DeclarativeVersion` pipeline.
To use this class, the plugin writer needs to:
1. Subclass it and implement the
:meth:`~pulpcore.plugin.stages.FirstStage.__call__` method.
2. Pass the instantiated subclass to :class:`~pulpcore.plugin.stages.DeclarativeVersion`.
"""

async def __call__(self, in_q, out_q):
"""
A Stages API compatible coroutine for :class:`~pulpcore.plugin.stages.DeclarativeVersion` to
use as the first stage.
This must be implemented on the subclass.
Args:
in_q (:class:`asyncio.Queue`): Unused because the first stage doesn't read from an input
queue.
out_q (:class:`asyncio.Queue`): The queue to put
:class:`~pulpcore.plugin.stages.DeclarativeContent` into.
Returns:
A Stages API compatible coroutine for
:class:`~pulpcore.plugin.stages.DeclarativeVersion` to use as the first stage.
"""
raise NotImplementedError('A plugin writer needs to implement this')


class DeclarativeVersion:

def __init__(self, first_stage, repository, sync_mode='mirror'):
Expand All @@ -66,13 +34,13 @@ def __init__(self, first_stage, repository, sync_mode='mirror'):
8. Unassociate any content units not declared in the stream (only when sync_mode='mirror')
To do this, the plugin writer should subclass the
:class:`~pulpcore.plugin.stages.FirstStage` class and define its
:meth:`__call__()` interface which return a coroutine. This coroutine should
:class:`~pulpcore.plugin.stages.Stage` class and define its
:meth:`__call__()` interface which returns a coroutine. This coroutine should
download metadata, create the corresponding
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects, and put them into the
:class:`asyncio.Queue` to send them down the pipeline. For example:
>>> class MyFirstStage(FirstStage):
>>> class MyFirstStage(Stage):
>>>
>>> def __init__(remote):
>>> self.remote = remote
Expand All @@ -91,19 +59,19 @@ def __init__(self, first_stage, repository, sync_mode='mirror'):
To use your first stage with the pipeline you have to instantiate the subclass and pass it
to :class:`~pulpcore.plugin.stages.DeclarativeVersion`.
1. Create the instance of the :class:`~pulpcore.plugin.stages.FirstStage` object subclass
1. Create the instance of the subclassed :class:`~pulpcore.plugin.stages.Stage` object.
2. Create the :class:`~pulpcore.plugin.stages.DeclarativeVersion` instance, passing the
:class:`~pulpcore.plugin.stages.FirstStage` subclass instance to it
:class:`~pulpcore.plugin.stages.Stage` subclass instance to it
3. Call the :meth:`~pulpcore.plugin.stages.DeclarativeVersion.create` method on your
:class:`~pulpcore.plugin.stages.DeclarativeVersion` instance
Here is an example:
>>> first_stage = FileFirstStage(remote)
>>> first_stage = MyFirstStage(remote)
>>> DeclarativeVersion(first_stage, repository).create()
Args:
first_stage (:class:`~pulpcore.plugin.stages.FirstStage`): The first stage to receive
first_stage (:class:`~pulpcore.plugin.stages.Stage`): The first stage to receive
:class:`~pulpcore.plugin.stages.DeclarativeContent` from.
repository (:class:`~pulpcore.plugin.models.Repository`): The repository receiving the
new version.
Expand Down

0 comments on commit 579c700

Please sign in to comment.