Skip to content

Commit

Permalink
Merge ContentUnassociation stage into the ContentAssociation stage
Browse files Browse the repository at this point in the history
ContentUnassociation doesn't really need to be a separate stage, and
making it one breaks the expectations of the "pipeline" by changing the
semantics of what goes into and out of the queues.

closes: #8635
https://pulp.plan.io/issues/8635
  • Loading branch information
dralley committed Apr 30, 2021
1 parent 094f84a commit 0ae327a
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 103 deletions.
1 change: 1 addition & 0 deletions CHANGES/8635.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactored the sync pipeline to be simpler - merged ContentAssociation and ContentUnassociation stages.
12 changes: 1 addition & 11 deletions docs/plugins/api-reference/stages.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ Plugin writers can use the Stages API to create a high-performance, download-and
to make writing sync code easier. There are several parts to the API:

1. :ref:`declarative-version` is a generic pipeline useful for most synchronization use cases.
2. The builtin Stages including :ref:`artifact-stages`, :ref:`content-stages`, and
:ref:`content-association-stages`.
2. The builtin Stages including :ref:`artifact-stages` and :ref:`content-stages`.
3. The :ref:`stages-api`, which allows you to build custom stages and pipelines.


Expand Down Expand Up @@ -67,13 +66,4 @@ Content Related Stages

.. autoclass:: pulpcore.plugin.stages.ResolveContentFutures


.. _content-association-stages:

Content Association and Unassociation Stages
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. autoclass:: pulpcore.plugin.stages.ContentAssociation

.. autoclass:: pulpcore.plugin.stages.ContentUnassociation

7 changes: 4 additions & 3 deletions pulpcore/plugin/stages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
QueryExistingArtifacts,
RemoteArtifactSaver,
)
from .association_stages import ( # noqa
from .content_stages import ( # noqa
ContentAssociation,
ContentUnassociation,
ContentSaver,
QueryExistingContents,
ResolveContentFutures,
)
from .content_stages import ContentSaver, QueryExistingContents, ResolveContentFutures # noqa
from .declarative_version import DeclarativeVersion # noqa
from .models import DeclarativeArtifact, DeclarativeContent # noqa
from .profiler import ProfilingQueue, create_profile_db_and_connection # noqa
83 changes: 0 additions & 83 deletions pulpcore/plugin/stages/association_stages.py

This file was deleted.

62 changes: 61 additions & 1 deletion pulpcore/plugin/stages/content_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.db import IntegrityError, transaction
from django.db.models import Q

from pulpcore.plugin.models import ContentArtifact
from pulpcore.plugin.models import Content, ContentArtifact, ProgressReport

from .api import Stage

Expand Down Expand Up @@ -187,3 +187,63 @@ async def run(self):
async for d_content in self.items():
d_content.resolve()
await self.put(d_content)


class ContentAssociation(Stage):
"""
A Stages API stage that associates content units with `new_version`.
This stage stores all content unit primary keys in memory before running. This is done to
compute the units already associated but not received from `self._in_q`. These units are passed
via `self._out_q` to the next stage as a :class:`django.db.models.query.QuerySet`.
This stage creates a ProgressReport named 'Associating Content' that counts the number of units
associated. Since it's a stream the total count isn't known until it's finished.
If `mirror` was enabled, then content units may also be un-assocated (removed) from
`new_version`. A ProgressReport named 'Un-Associating Content' is created that counts the number
of units un-associated.
Args:
new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this
stage associates content with.
mirror (bool): Whether or not to "mirror" the stream of DeclarativeContent - whether content
not in the stream should be removed from the repository.
args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
"""

def __init__(self, new_version, mirror, *args, **kwargs):
super().__init__(*args, **kwargs)
self.new_version = new_version
self.allow_delete = mirror

async def run(self):
"""
The coroutine for this stage.
Returns:
The coroutine for this stage.
"""
with ProgressReport(message="Associating Content", code="associating.content") as pb:
to_delete = set(self.new_version.content.values_list("pk", flat=True))
async for batch in self.batches():
to_add = set()
for d_content in batch:
try:
to_delete.remove(d_content.content.pk)
except KeyError:
to_add.add(d_content.content.pk)
self.put(d_content)

if to_add:
self.new_version.add_content(Content.objects.filter(pk__in=to_add))
pb.increase_by(len(to_add))

if self.allow_delete:
with ProgressReport(
message="Un-Associating Content", code="unassociating.content"
) as pb:
if to_delete:
self.new_version.remove_content(Content.objects.filter(pk__in=to_delete))
pb.increase_by(len(to_delete))
12 changes: 7 additions & 5 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
QueryExistingArtifacts,
RemoteArtifactSaver,
)
from .association_stages import ContentAssociation, ContentUnassociation
from .content_stages import ContentSaver, QueryExistingContents, ResolveContentFutures
from .content_stages import (
ContentAssociation,
ContentSaver,
QueryExistingContents,
ResolveContentFutures,
)


class DeclarativeVersion:
Expand Down Expand Up @@ -141,9 +145,7 @@ def create(self):
with self.repository.new_version() as new_version:
loop = asyncio.get_event_loop()
stages = self.pipeline_stages(new_version)
stages.append(ContentAssociation(new_version))
if self.mirror:
stages.append(ContentUnassociation(new_version))
stages.append(ContentAssociation(new_version, self.mirror))
stages.append(EndStage())
pipeline = create_pipeline(stages)
loop.run_until_complete(pipeline)
Expand Down

0 comments on commit 0ae327a

Please sign in to comment.