Skip to content

Commit

Permalink
Lots of docs fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Bouterse committed Jul 31, 2018
1 parent 55553cf commit 05bb5f8
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 134 deletions.
12 changes: 8 additions & 4 deletions docs/plugins/plugin-api/stages.rst
Expand Up @@ -3,11 +3,13 @@
pulpcore.plugin.stages
======================

Plugin writers can use the Stages API to create a high-performance download-and-saving pipeline
to make writing sync code easier. There are two parts to the API:
Plugin writers can use the Stages API to create a high-performance, download-and-saving pipeline
to make writing sync code easier. There are several parts to the API:

1. DeclarativeVersion is a generic pipeline useful for most synchronization use cases.
2. The Stages themselves
1. :ref:`declarative-version` is a generic pipeline useful for most synchronization use cases.
2. The builtin Stages including :ref:`artifact-stages`, :ref:`content-stages`, and
:ref:`content-association-stages`.
3. The :ref:`stages-api`, which allows you to build custom stages and pipelines.


.. _declarative-version:
Expand All @@ -20,8 +22,10 @@ DeclarativeVersion
.. autoclass:: pulpcore.plugin.stages.FirstStage

.. autoclass:: pulpcore.plugin.stages.DeclarativeArtifact
:no-members:

.. autoclass:: pulpcore.plugin.stages.DeclarativeContent
:no-members:


.. _stages-api:
Expand Down
20 changes: 9 additions & 11 deletions plugin/pulpcore/plugin/stages/api.py
@@ -1,13 +1,13 @@
import asyncio


async def create_pipeline(stages, in_q=None, maxsize=100):
async def create_pipeline(stages, maxsize=100):
"""
Creates a Stages API linear pipeline from the list `stages` and return as a single coroutine.
Each stage is a coroutine and reads from an input `asyncio.Queue` and writes to an output
`asyncio.Queue`. When the stage is ready to shutdown it writes a `None` to the output Queue.
Here is an example of the simplest stage that only passes data.
Each stage is a coroutine and reads from an input :class:`asyncio.Queue` and writes to an output
:class:`asyncio.Queue`. When the stage is ready to shutdown it writes a `None` to the output
queue. Here is an example of the simplest stage that only passes data.
>>> async def my_stage(in_q, out_q):
>>> while True:
Expand All @@ -19,15 +19,13 @@ async def create_pipeline(stages, in_q=None, maxsize=100):
Args:
stages (list of coroutines): A list of Stages API compatible coroutines.
in_q (asyncio.Queue): The queue the first stage should read from. This is how to put work
into the pipeline. Optional especially for cases where the first stage generates items
for `out_q` without needing inputs from `in_q`.
maxsize (int): The maximum amount of items a queue between two stages should hold. Optional
and defaults to 100.
Returns:
A single coroutine that can be used to run, wait, or cancel the entire pipeline with.
"""
in_q = None
futures = []
for stage in stages:
out_q = asyncio.Queue(maxsize=maxsize)
Expand All @@ -38,11 +36,11 @@ async def create_pipeline(stages, in_q=None, maxsize=100):

async def end_stage(in_q, out_q):
"""
A Stages API stage that drains `in_q` and do nothing with the items. This is expected at the end
of all pipelines.
A Stages API stage that drains `in_q` and does nothing with the items. This is required at the
end of all pipelines.
Without this stage, the maxsize of the `out_q` from the last stage could fill up and block the
entire pipeline.
Without this stage, the `maxsize` of the last stage's `out_q` could fill up and block the entire
pipeline.
"""
while True:
content = await in_q.get()
Expand Down
88 changes: 55 additions & 33 deletions plugin/pulpcore/plugin/stages/artifact_stages.py
Expand Up @@ -9,25 +9,31 @@

async def query_existing_artifacts(in_q, out_q):
"""
Stages API stage replacing DeclarativeArtifact.artifact with already-saved Artifacts.
Stages API stage that replaces :attr:`DeclarativeContent.content` objects with already-saved
:class:`~pulpcore.plugin.models.Artifact` objects.
This stage expects `~pulpcore.plugin.stages.DeclarativeContent` units from `in_q` and inspects
their associated `~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
`DeclarativeArtifact` object stores one Artifact.
This stage expects :class:`~pulpcore.plugin.stages.DeclarativeContent` units from `in_q` and
inspects their associated :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object stores one
:class:`~pulpcore.plugin.models.Artifact`.
This stage inspects any "unsaved" Artifact objects metadata and searches for existing saved
Artifacts inside Pulp with the same digest value(s). Any existing Artifact objects found replace
their "unsaved" counterpart in the `~pulpcore.plugin.stages.DeclarativeArtifact` object.
This stage inspects any unsaved :class:`~pulpcore.plugin.models.Artifact` objects and searches
using their metadata for existing saved :class:`~pulpcore.plugin.models.Artifact` objects inside
Pulp with the same digest value(s). Any existing :class:`~pulpcore.plugin.models.Artifact`
objects found will replace their unsaved counterpart in the
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object.
Each `~pulpcore.plugin.stages.DeclarativeContent` is sent to `out_q` after all of its
`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled.
Each :class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to `out_q` after all of its
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled.
This stage drains all available items from `in_q` and batches everything into one large call to
the db for efficiency.
Args:
in_q: `~pulpcore.plugin.stages.DeclarativeContent`
out_q: `~pulpcore.plugin.stages.DeclarativeContent`
in_q (:class:`asyncio.Queue`): The queue to receive
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects from.
out_q (:class:`asyncio.Queue`): The queue to put
:class:`~pulpcore.plugin.stages.DeclarativeContent` into.
Returns:
The query_existing_artifacts stage as a coroutine to be included in a pipeline.
Expand Down Expand Up @@ -82,23 +88,22 @@ async def query_existing_artifacts(in_q, out_q):

class artifact_downloader:
"""
An object containing a Stages API stage to download missing Artifacts, but don't call save()
An object containing a Stages API stage to download :class:`~pulpcore.plugin.models.Artifact`
file, but don't save the :class:`~pulpcore.plugin.models.Artifact` in the db.
The actual stage is the `stage` attribute which can be used as follows:
The actual stage is the :meth:`~pulpcore.plugin.stages.artifact_downloader.stage`
which can be used as follows:
>>> artifact_downloader(max_concurrent_downloads=42).stage # This is the real stage
in_q data type: A `~pulpcore.plugin.stages.DeclarativeContent` with potentially files missing
`~pulpcore.plugin.stages.DeclarativeArtifact.artifact` objects.
out_q data type: A `~pulpcore.plugin.stages.DeclarativeContent` with all files downloaded for
all `~pulpcore.plugin.stages.DeclarativeArtifact.artifact` objects.
This stage downloads the file for any :class:`~pulpcore.plugin.models.Artifact` objects missing
files and creates a new :class:`~pulpcore.plugin.models.Artifact` object from the downloaded
file and its digest data. The new :class:`~pulpcore.plugin.models.Artifact` is not saved but
added to the :class:`~pulpcore.plugin.stages.DeclarativeArtifact` object, replacing the likely
incomplete :class:`~pulpcore.plugin.models.Artifact`.
This stage downloads the file for any Artifact objects missing files and creates a new Artifact
from the downloaded file and its digest data. The new Artifact is *not* saved but added to the
`~pulpcore.plugin.stages.DeclarativeArtifact` object, replacing the likely incomplete Artifact.
Each `~pulpcore.plugin.stages.DeclarativeContent` is sent to `out_q` after all of its
`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled.
Each :class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to `out_q` after all of its
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled.
This stage creates a ProgressBar named 'Downloading Artifacts' that counts the number of
downloads completed. Since it's a stream the total count isn't known until it's finished.
Expand All @@ -117,7 +122,18 @@ def __init__(self, max_concurrent_downloads=100):
self.max_concurrent_downloads = max_concurrent_downloads

async def stage(self, in_q, out_q):
"""Download undownloaded Artifacts, but don't save them"""
"""
Download undownloaded Artifacts, but don't save them in the db.
Args:
in_q (:class:`asyncio.Queue`): The queue to receive
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects from that may have
undownloaded files.
out_q (:class:`asyncio.Queue`): The queue to put
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects into, all of which have
files downloaded.
"""
pending = set()
incoming_content = []
outstanding_downloads = 0
Expand Down Expand Up @@ -193,19 +209,25 @@ async def return_content_for_downloader(c):

async def artifact_saver(in_q, out_q):
"""
Stages API stage that saves an Artifact for any "unsaved" DeclarativeArtifact.artifact objects.
Stages API stage that saves any unsaved :attr:`DeclarativeArtifact.artifact` objects.
This stage expects :class:`~pulpcore.plugin.stages.DeclarativeContent` units from `in_q` and
inspects their associated :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object stores one
:class:`~pulpcore.plugin.models.Artifact`.
This stage expects `~pulpcore.plugin.stages.DeclarativeContent` units from `in_q` and inspects
their associated `~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
`DeclarativeArtifact` object stores one Artifact.
Any unsaved :class:`~pulpcore.plugin.models.Artifact` objects are saved. Each
:class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to `out_q` after all of its
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled.
Any "unsaved" Artifact objects are saved. Each `~pulpcore.plugin.stages.DeclarativeContent` is
sent to `out_q` after all of its `~pulpcore.plugin.stages.DeclarativeArtifact` objects have been
handled.
This stage drains all available items from `in_q` and batches everything into one large call to
the db for efficiency.
Args:
in_q: `~pulpcore.plugin.stages.DeclarativeContent`
out_q: `~pulpcore.plugin.stages.DeclarativeContent`
in_q (:class:`asyncio.Queue`): The queue to receive
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects from.
out_q (:class:`asyncio.Queue`): The queue to put
:class:`~pulpcore.plugin.stages.DeclarativeContent` into.
Returns:
The artifact_saver stage as a coroutine to be included in a pipeline.
Expand Down
60 changes: 39 additions & 21 deletions plugin/pulpcore/plugin/stages/association_stages.py
Expand Up @@ -10,27 +10,21 @@ class content_unit_association:
"""
An object containing a Stages API stage that associates content units with `new_version`.
The actual stage is the `stage` attribute which can be used as follows:
The actual stage is the :meth:`~pulpcore.plugin.stages.content_unit_association.stage`
which can be used as follows:
>>> content_unit_association(my_new_version).stage # This is the real stage
This stage stores all content unit types and unit keys in memory before running. This is done to
compute the units already associated but not received from `in_q`. These units are passed via
`out_q` to the next stage as a `~django.db.models.query.QuerySet`.
in_q data type: A `~pulpcore.plugin.stages.DeclarativeContent` with saved `content` that needs
to be associated.
out_q data type: A `django.db.models.query.QuerySet` of `~pulpcore.plugin.models.Content` or
subclass that are already associated but not included in the stream of items from `in_q`.
One `django.db.models.query.QuerySet` is put for each `~pulpcore.plugin.models.Content`
type.
`out_q` to the next stage as a :class:`django.db.models.query.QuerySet`.
This stage creates a ProgressBar named 'Associating Content' that counts the number of units
associated. Since it's a stream the total count isn't known until it's finished.
Args:
new_version (pulpcore.plugin.models.RepositoryVersion): The repo version this stage
associates content with.
new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this
stage associates content with.
Returns:
An object containing the content_unit_association stage to be included in a pipeline.
Expand All @@ -44,7 +38,19 @@ def __init__(self, new_version):
self.unit_keys_by_type[type(unit)].add(unit.natural_key())

async def stage(self, in_q, out_q):
"""For each Content Unit associate it with the repository version"""
"""
For each Content Unit associate it with the repository version
Args:
in_q (:class:`asyncio.Queue`): Each item is a
:class:`~pulpcore.plugin.stages.DeclarativeContent` with saved `content` that needs
to be associated.
out_q (:class:`asyncio.Queue`): Each item is a :class:`django.db.models.query.QuerySet`
of :class:`~pulpcore.plugin.models.Content` subclass that are already associated but
not included in the stream of items from `in_q`. One
:class:`django.db.models.query.QuerySet` is put for each
:class:`~pulpcore.plugin.models.Content` type.
"""
with ProgressBar(message='Associating Content') as pb:
declarative_content_list = []
shutdown = False
Expand Down Expand Up @@ -100,32 +106,44 @@ class content_unit_unassociation:
"""
An object containing a Stages API stage that unassociates content units from `new_version`.
The actual stage is the `stage` attribute which can be used as follows:
The actual stage is the :meth:`~pulpcore.plugin.stages.content_unit_association.stage`
which can be used as follows:
>>> content_unit_unassociation(my_new_version).stage # This is the real stage
in_q data type: `django.db.models.query.QuerySet` of `~pulpcore.plugin.models.Content` or
subclass to be unassociated from `new_version`.
out_q data type: `django.db.models.query.QuerySet` of `~pulpcore.plugin.models.Content` or
subclass that were unassociated from `new_version`.
This stage creates a ProgressBar named 'Un-Associating Content' that counts the number of units
un-associated. Since it's a stream the total count isn't known until it's finished.
Args:
new_version (pulpcore.plugin.models.RepositoryVersion): The repo version this stage
unassociates content from
new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this
stage unassociates content from.
Returns:
An object containing the configured content_unit_unassociation stage to be included in a
pipeline.
pipeline.
"""

def __init__(self, new_version):
self.new_version = new_version

async def stage(self, in_q, out_q):
"""For each Content Unit from in_q, unassociate it with the repository version"""
"""
For each Content Unit from in_q, unassociate it with the repository version
Args:
in_q (:class:`asyncio.Queue`): Each item is a
:class:`django.db.models.query.QuerySet` of
:class:`~pulpcore.plugin.models.Content` subclass that are already associated
but not included in the stream of items from `in_q`. One
:class:`django.db.models.query.QuerySet` is put for each
:class:`~pulpcore.plugin.models.Content` type.
out_q (:class:`asyncio.Queue`): Each item is a
:class:`django.db.models.query.QuerySet` of
:class:`~pulpcore.plugin.models.Content` subclass that were unassociated. One
:class:`django.db.models.query.QuerySet` is put for each
:class:`~pulpcore.plugin.models.Content` type.
"""
with ProgressBar(message='Un-Associating Content') as pb:
while True:
queryset_to_unassociate = await in_q.get()
Expand Down

0 comments on commit 05bb5f8

Please sign in to comment.