diff --git a/pulp_docker/app/tasks/dedupe_save.py b/pulp_docker/app/tasks/dedupe_save.py deleted file mode 100644 index c42f06ef..00000000 --- a/pulp_docker/app/tasks/dedupe_save.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -Temporary module. - -This module contains temporary replacements of several pulpcore stages that did not properly -duplicates within the stream. The entire module should be deleted after #4060 is finished. - -https://pulp.plan.io/issues/4060 - -""" -from django.db import IntegrityError -from pulpcore.plugin.stages import Stage -from pulpcore.plugin.models import ContentArtifact, RemoteArtifact - -import logging -log = logging.getLogger(__name__) - - -class SerialContentSave(Stage): - """ - Save Content one at a time, combining duplicates. - """ - - async def run(self): - """ - The coroutine for this stage. - - Returns: - The coroutine for this stage. - - """ - async for dc in self.items(): - # Do not save Content that contains Artifacts which have not been downloaded - if not self.settled(dc): - await self.put(dc) - # already saved - elif not dc.content._state.adding: - await self.put(dc) - else: - self.save_and_dedupe_content(dc) - await self.put(dc) - - def save_and_dedupe_content(self, dc): - """ - Combine duplicate Content, save unique Content. - - Args: - dc (class:`~pulpcore.plugin.stages.DeclarativeContent`): Object containing Content to - be saved. - """ - model_type = type(dc.content) - unit_key = dc.content.natural_key_dict() - try: - dc.content.save() - except IntegrityError: - existing_content = model_type.objects.get(**unit_key) - dc.content = existing_content - assert not dc.content._state.adding - - self.create_content_artifacts(dc) - - def create_content_artifacts(self, dc): - """ - Create ContentArtifacts to associate saved Content to saved Artifacts. - - Args: - dc (class:`~pulpcore.plugin.stages.DeclarativeContent`): Object containing Content and - Artifacts to relate. - """ - for da in dc.d_artifacts: - content_artifact = ContentArtifact( - content=dc.content, - artifact=da.artifact, - relative_path=da.relative_path - ) - try: - content_artifact.save() - except IntegrityError: - content_artifact = ContentArtifact.objects.get( - content=dc.content, - artifact=da.artifact, - relative_path=da.relative_path - ) - - remote_artifact_data = { - 'url': da.url, - 'size': da.artifact.size, - 'md5': da.artifact.md5, - 'sha1': da.artifact.sha1, - 'sha224': da.artifact.sha224, - 'sha256': da.artifact.sha256, - 'sha384': da.artifact.sha384, - 'sha512': da.artifact.sha512, - 'remote': da.remote, - } - new_remote_artifact = RemoteArtifact( - content_artifact=content_artifact, **remote_artifact_data - ) - try: - new_remote_artifact.save() - except IntegrityError: - pass - - def settled(self, dc): - """ - Indicates that all Artifacts in this dc are saved. - - Args: - dc (class:`~pulpcore.plugin.stages.DeclarativeContent`): Object containing Artifacts - that may be saved. - - Returns: - bool: True when all Artifacts have been saved, False otherwise. - - """ - settled_dc = True - for da in dc.d_artifacts: - if da.artifact._state.adding: - settled_dc = False - return settled_dc diff --git a/pulp_docker/app/tasks/sync_stages.py b/pulp_docker/app/tasks/sync_stages.py index b010bc04..0282a0a8 100644 --- a/pulp_docker/app/tasks/sync_stages.py +++ b/pulp_docker/app/tasks/sync_stages.py @@ -1,3 +1,4 @@ +import asyncio import json import logging @@ -5,7 +6,7 @@ from urllib.parse import urljoin from django.db import IntegrityError -from pulpcore.plugin.models import Artifact +from pulpcore.plugin.models import Artifact, ProgressBar from pulpcore.plugin.stages import DeclarativeArtifact, DeclarativeContent, Stage from pulp_docker.app.models import (ImageManifest, MEDIA_TYPE, ManifestBlob, ManifestTag, @@ -21,19 +22,12 @@ } -class TempTag: +class DockerFirstStage(Stage): """ - This is a pseudo Tag that will either become a ManifestTag or a ManifestListTag. - """ - - def __init__(self, name): - """Make a temp tag.""" - self.name = name + The first stage of a pulp_docker sync pipeline. + In this stage all the content is discovered, including the nested one. -class TagListStage(Stage): - """ - The first stage of a pulp_docker sync pipeline. """ def __init__(self, remote): @@ -43,25 +37,128 @@ def __init__(self, remote): async def run(self): """ - Build and emit `DeclarativeContent` for each Tag. + DockerFirstStage. """ - log.debug("Fetching tags list for upstream repository: {repo}".format( - repo=self.remote.upstream_name - )) - relative_url = '/v2/{name}/tags/list'.format(name=self.remote.namespaced_upstream_name) - tag_list_url = urljoin(self.remote.url, relative_url) - list_downloader = self.remote.get_downloader(tag_list_url) - await list_downloader.run() - - with open(list_downloader.path) as tags_raw: - tags_dict = json.loads(tags_raw.read()) - tag_list = tags_dict['tags'] - - for tag_name in tag_list: - tag_dc = self.create_pending_tag(tag_name) - await self.put(tag_dc) - - def create_pending_tag(self, tag_name): + future_manifests = [] + tag_list = [] + to_download = [] + man_dcs = {} + total_blobs = [] + + with ProgressBar(message='Downloading tag list', total=1) as pb: + relative_url = '/v2/{name}/tags/list'.format(name=self.remote.namespaced_upstream_name) + tag_list_url = urljoin(self.remote.url, relative_url) + list_downloader = self.remote.get_downloader(tag_list_url) + await list_downloader.run() + + with open(list_downloader.path) as tags_raw: + tags_dict = json.loads(tags_raw.read()) + tag_list = tags_dict['tags'] + + pb.increment() + + with ProgressBar(message='Creating Download requests for Tags', total=len(tag_list)) as pb: + for tag_name in tag_list: + relative_url = '/v2/{name}/manifests/{tag}'.format( + name=self.remote.namespaced_upstream_name, + tag=tag_name, + ) + url = urljoin(self.remote.url, relative_url) + downloader = self.remote.get_downloader(url=url) + to_download.append(downloader.run(extra_data={'headers': V2_ACCEPT_HEADERS})) + pb.increment() + + pb_parsed_tags = ProgressBar(message='Parsing SchemaV2 Tags', state='running') + pb_parsed_ml_tags = ProgressBar(message='Parsing Manifest List Tags', state='running') + pb_parsed_m_tags = ProgressBar(message='Parsing Manifests Tags', state='running') + global pb_parsed_blobs + pb_parsed_blobs = ProgressBar(message='Parsing Blobs', state='running') + pb_parsed_man = ProgressBar(message='Parsing Manifests', state='running') + + for download_tag in asyncio.as_completed(to_download): + tag = await download_tag + with open(tag.path) as content_file: + raw = content_file.read() + content_data = json.loads(raw) + mediatype = content_data.get('mediaType') + if mediatype: + tag.artifact_attributes['file'] = tag.path + saved_artifact = Artifact(**tag.artifact_attributes) + try: + saved_artifact.save() + except IntegrityError: + del tag.artifact_attributes['file'] + saved_artifact = Artifact.objects.get(**tag.artifact_attributes) + tag_dc = self.create_tag(mediatype, saved_artifact, tag.url) + if type(tag_dc.content) is ManifestListTag: + list_dc = self.create_tagged_manifest_list( + tag_dc, content_data) + await self.put(list_dc) + pb_parsed_ml_tags.increment() + tag_dc.extra_data['list_relation'] = list_dc + for manifest_data in content_data.get('manifests'): + man_dc = self.create_manifest(list_dc, manifest_data) + future_manifests.append(man_dc.get_or_create_future()) + man_dcs[man_dc.content.digest] = man_dc + await self.put(man_dc) + pb_parsed_man.increment() + elif type(tag_dc.content) is ManifestTag: + man_dc = self.create_tagged_manifest(tag_dc, content_data) + await self.put(man_dc) + pb_parsed_m_tags.increment() + tag_dc.extra_data['man_relation'] = man_dc + self.handle_blobs(man_dc, content_data, total_blobs) + await self.put(tag_dc) + pb_parsed_tags.increment() + else: + # in case it is a schema1 continue to the next tag + continue + + pb_parsed_tags.state = 'completed' + pb_parsed_tags.total = pb_parsed_tags.done + pb_parsed_tags.save() + pb_parsed_ml_tags.state = 'completed' + pb_parsed_ml_tags.total = pb_parsed_ml_tags.done + pb_parsed_ml_tags.save() + pb_parsed_m_tags.state = 'completed' + pb_parsed_m_tags.total = pb_parsed_m_tags.done + pb_parsed_m_tags.save() + pb_parsed_man.state = 'completed' + pb_parsed_man.total = pb_parsed_man.done + pb_parsed_man.save() + + for manifest_future in asyncio.as_completed(future_manifests): + man = await manifest_future + with man._artifacts.get().file.open() as content_file: + raw = content_file.read() + content_data = json.loads(raw) + man_dc = man_dcs[man.digest] + self.handle_blobs(man_dc, content_data, total_blobs) + for blob in total_blobs: + await self.put(blob) + + pb_parsed_blobs.state = 'completed' + pb_parsed_blobs.total = pb_parsed_blobs.done + pb_parsed_blobs.save() + + def handle_blobs(self, man, content_data, total_blobs): + """ + Handle blobs. + """ + for layer in content_data.get("layers"): + if not self._include_layer(layer): + continue + blob_dc = self.create_blob(man, layer) + blob_dc.extra_data['blob_relation'] = man + total_blobs.append(blob_dc) + pb_parsed_blobs.increment() + layer = content_data.get('config') + blob_dc = self.create_blob(man, layer) + blob_dc.extra_data['config_relation'] = man + pb_parsed_blobs.increment() + total_blobs.append(blob_dc) + + def create_tag(self, mediatype, saved_artifact, url): """ Create `DeclarativeContent` for each tag. @@ -74,15 +171,18 @@ def create_pending_tag(self, tag_name): pulpcore.plugin.stages.DeclarativeContent: A Tag DeclarativeContent object """ + tag_name = url.split('/')[-1] relative_url = '/v2/{name}/manifests/{tag}'.format( name=self.remote.namespaced_upstream_name, tag=tag_name, ) url = urljoin(self.remote.url, relative_url) - tag = TempTag(name=tag_name) - manifest_artifact = Artifact() + if mediatype == MEDIA_TYPE.MANIFEST_LIST: + tag = ManifestListTag(name=tag_name) + elif mediatype == MEDIA_TYPE.MANIFEST_V2: + tag = ManifestTag(name=tag_name) da = DeclarativeArtifact( - artifact=manifest_artifact, + artifact=saved_artifact, url=url, relative_path=tag_name, remote=self.remote, @@ -91,76 +191,14 @@ def create_pending_tag(self, tag_name): tag_dc = DeclarativeContent(content=tag, d_artifacts=[da]) return tag_dc - -class ProcessContentStage(Stage): - """ - Process all Manifests, Manifest Lists, and Tags. - - For each processed type, create content from nested fields. This stage does not process - ManifestBlobs, which do not contain nested content. - """ - - def __init__(self, remote): - """ - Inform the stage about the remote to use. - """ - super().__init__() - self.remote = remote - - async def run(self): - """ - Create new Content for all unsaved content units with downloaded artifacts. - """ - async for dc in self.items(): - if dc.extra_data.get('processed'): - await self.put(dc) - continue - if type(dc.content) is ManifestBlob: - await self.put(dc) - continue - - # All docker content contains a single artifact. - assert len(dc.d_artifacts) == 1 - with dc.d_artifacts[0].artifact.file.open() as content_file: - raw = content_file.read() - content_data = json.loads(raw) - - if type(dc.content) is TempTag: - if content_data.get('mediaType') == MEDIA_TYPE.MANIFEST_LIST: - await self.create_and_process_tagged_manifest_list(dc, content_data) - await self.put(dc) - elif content_data.get('mediaType') == MEDIA_TYPE.MANIFEST_V2: - await self.create_and_process_tagged_manifest(dc, content_data) - await self.put(dc) - else: - assert content_data.get('schemaVersion') == 1 - elif type(dc.content) is ImageManifest: - for layer in content_data.get("layers"): - if not self._include_layer(layer): - continue - blob_dc = await self.create_pending_blob(layer) - blob_dc.extra_data['relation'] = dc - await self.put(blob_dc) - layer = content_data.get('config') - if layer and self._include_layer(layer): - config_blob_dc = await self.create_pending_blob(layer) - config_blob_dc.extra_data['config_relation'] = dc - await self.put(config_blob_dc) - dc.extra_data['processed'] = True - await self.put(dc) - else: - msg = "Unexpected type cannot be processed{tp}".format(tp=type(dc.content)) - raise Exception(msg) - - async def create_and_process_tagged_manifest_list(self, tag_dc, manifest_list_data): + def create_tagged_manifest_list(self, tag_dc, manifest_list_data): """ - Create a ManifestList and nested ImageManifests from the Tag artifact. + Create a ManifestList. Args: tag_dc (pulpcore.plugin.stages.DeclarativeContent): dc for a Tag manifest_list_data (dict): Data about a ManifestList """ - tag_dc.content = ManifestListTag(name=tag_dc.content.name) digest = "sha256:{digest}".format(digest=tag_dc.d_artifacts[0].artifact.sha256) relative_url = '/v2/{name}/manifests/{digest}'.format( name=self.remote.namespaced_upstream_name, @@ -180,22 +218,17 @@ async def create_and_process_tagged_manifest_list(self, tag_dc, manifest_list_da extra_data={'headers': V2_ACCEPT_HEADERS} ) list_dc = DeclarativeContent(content=manifest_list, d_artifacts=[da]) - for manifest in manifest_list_data.get('manifests'): - await self.create_pending_manifest(list_dc, manifest) - list_dc.extra_data['relation'] = tag_dc - list_dc.extra_data['processed'] = True - tag_dc.extra_data['processed'] = True - await self.put(list_dc) - - async def create_and_process_tagged_manifest(self, tag_dc, manifest_data): + + return list_dc + + def create_tagged_manifest(self, tag_dc, manifest_data): """ - Create a Manifest and nested ManifestBlobs from the Tag artifact. + Create an Image Manifest. Args: tag_dc (pulpcore.plugin.stages.DeclarativeContent): dc for a Tag manifest_data (dict): Data about a single new ImageManifest. """ - tag_dc.content = ManifestTag(name=tag_dc.content.name) digest = "sha256:{digest}".format(digest=tag_dc.d_artifacts[0].artifact.sha256) manifest = ImageManifest( digest=digest, @@ -215,25 +248,11 @@ async def create_and_process_tagged_manifest(self, tag_dc, manifest_data): extra_data={'headers': V2_ACCEPT_HEADERS} ) man_dc = DeclarativeContent(content=manifest, d_artifacts=[da]) - for layer in manifest_data.get('layers'): - if not self._include_layer(layer): - continue - blob_dc = await self.create_pending_blob(layer) - blob_dc.extra_data['relation'] = man_dc - await self.put(blob_dc) - layer = manifest_data.get('config') - if layer and self._include_layer(layer): - config_blob_dc = await self.create_pending_blob(layer) - config_blob_dc.extra_data['config_relation'] = man_dc - await self.put(config_blob_dc) - man_dc.extra_data['relation'] = tag_dc - tag_dc.extra_data['processed'] = True - man_dc.extra_data['processed'] = True - await self.put(man_dc) - - async def create_pending_manifest(self, list_dc, manifest_data): + return man_dc + + def create_manifest(self, list_dc, manifest_data): """ - Create a pending manifest from manifest data in a ManifestList. + Create an Image Manifest from manifest data in a ManifestList. Args: list_dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ManifestList @@ -245,9 +264,8 @@ async def create_pending_manifest(self, list_dc, manifest_data): digest=digest ) manifest_url = urljoin(self.remote.url, relative_url) - manifest_artifact = Artifact(sha256=digest[len("sha256:"):]) da = DeclarativeArtifact( - artifact=manifest_artifact, + artifact=Artifact(), url=manifest_url, relative_path=digest, remote=self.remote, @@ -261,16 +279,18 @@ async def create_pending_manifest(self, list_dc, manifest_data): man_dc = DeclarativeContent( content=manifest, d_artifacts=[da], - extra_data={'relation': list_dc} + extra_data={'relation': list_dc}, + does_batch=False, ) - await self.put(man_dc) + return man_dc - async def create_pending_blob(self, blob_data): + def create_blob(self, man_dc, blob_data): """ - Create a pending blob from a layer in the ImageManifest. + Create blob. Args: - blob_data (dict): Data about a single new blob. + man_dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ImageManifest + blob_data (dict): Data about a blob """ digest = blob_data['digest'] @@ -295,6 +315,7 @@ async def create_pending_blob(self, blob_data): content=blob, d_artifacts=[da], ) + return blob_dc def _include_layer(self, layer): @@ -326,30 +347,39 @@ async def run(self): Relate each item in the input queue to objects specified on the DeclarativeContent. """ async for dc in self.items(): + if dc.extra_data.get('relation'): - if type(dc.content) is ManifestList: - self.relate_manifest_list(dc) - elif type(dc.content) is ManifestBlob: - self.relate_blob(dc) - elif type(dc.content) is ImageManifest: - self.relate_manifest(dc) - - configured_dc = dc.extra_data.get('config_relation') - if configured_dc: - configured_dc.content.config_blob = dc.content - configured_dc.content.save() + self.relate_manifest_to_list(dc) + elif dc.extra_data.get('blob_relation'): + self.relate_blob(dc) + elif dc.extra_data.get('config_relation'): + self.relate_config_blob(dc) + elif dc.extra_data.get('list_relation'): + self.relate_manifest_list(dc) + elif dc.extra_data.get('man_relation'): + self.relate_manifest(dc) await self.put(dc) + def relate_config_blob(self, dc): + """ + Relate a ManifestBlob to a Manifest as a config layer. + + Args: + dc (pulpcore.plugin.stages.DeclarativeContent): dc for a Blob + """ + configured_dc = dc.extra_data.get('config_relation') + configured_dc.content.config_blob = dc.content + configured_dc.content.save() + def relate_blob(self, dc): """ Relate a ManifestBlob to a Manifest. Args: - dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ManifestList + dc (pulpcore.plugin.stages.DeclarativeContent): dc for a Blob """ - related_dc = dc.extra_data.get('relation') - assert related_dc is not None + related_dc = dc.extra_data.get('blob_relation') thru = BlobManifestBlob(manifest=related_dc.content, manifest_blob=dc.content) try: thru.save() @@ -358,43 +388,50 @@ def relate_blob(self, dc): def relate_manifest(self, dc): """ - Relate an ImageManifest to a Tag or ManifestList. + Relate an ImageManifest to a Tag. Args: - dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ManifestList + dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ManifestTag + """ + related_dc = dc.extra_data.get('man_relation') + assert dc.content.manifest is None + dc.content.manifest = related_dc.content + try: + dc.content.save() + except IntegrityError: + existing_tag = ManifestTag.objects.get(name=dc.content.name, + manifest=related_dc.content) + dc.content = existing_tag + + def relate_manifest_to_list(self, dc): + """ + Relate an ImageManifest to a ManifestList. + + Args: + dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ImageManifest """ related_dc = dc.extra_data.get('relation') - assert related_dc is not None - if type(related_dc.content) is ManifestTag: - assert related_dc.content.manifest is None - related_dc.content.manifest = dc.content - try: - related_dc.content.save() - except IntegrityError: - existing_tag = ManifestTag.objects.get(name=related_dc.content.name, - manifest=dc.content) - related_dc.content = existing_tag - elif type(related_dc.content) is ManifestList: - thru = ManifestListManifest(manifest_list=related_dc.content, manifest=dc.content) - try: - thru.save() - except IntegrityError: - pass + thru = ManifestListManifest(manifest_list=related_dc.content, manifest=dc.content) + try: + thru.save() + except IntegrityError: + pass def relate_manifest_list(self, dc): """ Relate a ManifestList to a Tag. Args: - dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ManifestList + dc (pulpcore.plugin.stages.DeclarativeContent): dc for a ManifestListTag """ - related_dc = dc.extra_data.get('relation') - assert type(related_dc.content) is ManifestListTag - assert related_dc.content.manifest_list is None - related_dc.content.manifest_list = dc.content + related_dc = dc.extra_data.get('list_relation') + assert dc.content.manifest_list is None + dc.content.manifest_list = related_dc.content try: - related_dc.content.save() + dc.content.save() except IntegrityError: - existing_tag = ManifestListTag.objects.get(name=related_dc.content.name, - manifest_list=dc.content) - related_dc.content = existing_tag + + existing_tag = ManifestListTag.objects.get(name=dc.content.name, + manifest_list=related_dc.content) + dc.content = existing_tag + pass diff --git a/pulp_docker/app/tasks/synchronize.py b/pulp_docker/app/tasks/synchronize.py index 353f35b9..098fd9fa 100644 --- a/pulp_docker/app/tasks/synchronize.py +++ b/pulp_docker/app/tasks/synchronize.py @@ -5,13 +5,17 @@ from pulpcore.plugin.stages import ( ArtifactDownloader, ArtifactSaver, + ContentSaver, DeclarativeVersion, RemoteArtifactSaver, + RemoveDuplicates, + ResolveContentFutures, + QueryExistingArtifacts, + QueryExistingContents, ) -from .sync_stages import InterrelateContent, ProcessContentStage, TagListStage +from .sync_stages import InterrelateContent, DockerFirstStage from pulp_docker.app.models import DockerRemote, ManifestTag, ManifestListTag -from pulp_docker.app.tasks.dedupe_save import SerialContentSave log = logging.getLogger(__name__) @@ -37,7 +41,10 @@ def synchronize(remote_pk, repository_pk): raise ValueError(_('A remote must have a url specified to synchronize.')) remove_duplicate_tags = [{'model': ManifestTag, 'field_names': ['name']}, {'model': ManifestListTag, 'field_names': ['name']}] - dv = DockerDeclarativeVersion(repository, remote, remove_duplicates=remove_duplicate_tags) + log.info(_('Synchronizing: repository={r} remote={p}').format( + r=repository.name, p=remote.name)) + first_stage = DockerFirstStage(remote) + dv = DockerDeclarativeVersion(first_stage, repository, remove_duplicates=remove_duplicate_tags) dv.create() @@ -46,13 +53,6 @@ class DockerDeclarativeVersion(DeclarativeVersion): Subclassed Declarative version creates a custom pipeline for Docker sync. """ - def __init__(self, repository, remote, mirror=True, remove_duplicates=None): - """Initialize the class.""" - self.repository = repository - self.remote = remote - self.mirror = mirror - self.remove_duplicates = remove_duplicates or [] - def pipeline_stages(self, new_version): """ Build a list of stages feeding into the ContentUnitAssociation stage. @@ -67,40 +67,18 @@ def pipeline_stages(self, new_version): list: List of :class:`~pulpcore.plugin.stages.Stage` instances """ - return [ - TagListStage(self.remote), - - # In: Pending Tags (not downloaded yet) - ArtifactDownloader(), - ArtifactSaver(), - ProcessContentStage(self.remote), - SerialContentSave(), - RemoteArtifactSaver(), - # Out: Finished Tags, Finished ManifestLists, Finished ImageManifests, - # Pending ImageManifests, Pending ManifestBlobs - - - # In: Pending ImageManifests, Pending Blobs - # In: Finished content (no-op) + pipeline = [ + self.first_stage, + QueryExistingArtifacts(), ArtifactDownloader(), ArtifactSaver(), - ProcessContentStage(self.remote), - SerialContentSave(), + QueryExistingContents(), + ContentSaver(), RemoteArtifactSaver(), - # Out: No-op (Finished Tags, ManifestLists, ImageManifests) - # Out: Finished ImageManifests, Finished ManifestBlobs, Pending ManifestBlobs - - # In: Pending Blobs - # In: Finished content (no-op) - ArtifactDownloader(), - ArtifactSaver(), - SerialContentSave(), - RemoteArtifactSaver(), - # Out: Finished content, Tags, ManifestLists, ImageManifests, ManifestBlobs - - # In: Tags, ManifestLists, ImageManifests, ManifestBlobs (downloaded, processed, and - # saved) - # Requires that all content (and related content in dc.extra_data) is already saved. + ResolveContentFutures(), InterrelateContent(), - # Out: Content that has been related to other Content. ] + for dupe_query_dict in self.remove_duplicates: + pipeline.append(RemoveDuplicates(new_version, **dupe_query_dict)) + + return pipeline diff --git a/setup.py b/setup.py index d443f9cb..5f30b1c2 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import find_packages, setup requirements = [ - 'pulpcore-plugin==0.1.0b21', + 'pulpcore-plugin==0.1rc1', ]