Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Problem: ArtifactSaver stage doesn't handle IntegrityErrors
Browse files Browse the repository at this point in the history
Solution: Use a QueryAndSaveArtifacts stage instead of ArtifactSaver

This patch introduces a new stage that first looks for existing artifact before trying to save any
artifacts.

re: #4060
https://pulp.plan.io/issues/4060
  • Loading branch information
dkliban committed Oct 9, 2018
1 parent 8de52f6 commit ac19147
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions pulp_docker/app/tasks/synchronizing.py
@@ -1,6 +1,8 @@
from gettext import gettext as _
import logging

from django.db.models import Q

from pulpcore.plugin.models import Artifact, ProgressBar, Repository # noqa
from pulpcore.plugin.stages import (
DeclarativeArtifact,
Expand Down Expand Up @@ -82,3 +84,83 @@ def read_my_metadata_file_somehow(self, path):
path: Path to the metadata file
"""
pass


class QueryAndSaveArtifacts(Stage):
"""
The stage that bulk saves only the artifacts that have not been saved before.
A stage that replaces :attr:`DeclarativeContent.d_artifacts` objects with
already-saved :class:`~pulpcore.plugin.models.Artifact` objects.
This stage expects :class:`~pulpcore.plugin.stages.DeclarativeContent` units from `in_q` and
inspects their associated :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object stores one
:class:`~pulpcore.plugin.models.Artifact`.
This stage inspects any unsaved :class:`~pulpcore.plugin.models.Artifact` objects and searches
using their metadata for existing saved :class:`~pulpcore.plugin.models.Artifact` objects inside
Pulp with the same digest value(s). Any existing :class:`~pulpcore.plugin.models.Artifact`
objects found will replace their unsaved counterpart in the
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object. Each remaining unsaved
:class:`~pulpcore.plugin.models.Artifact` is saved.
Each :class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to `out_q` after all of its
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects have been handled.
This stage drains all available items from `in_q` and batches everything into one large call to
the db for efficiency.
"""

async def __call__(self, in_q, out_q):
"""
The coroutine for this stage.
Args:
in_q (:class:`asyncio.Queue`): The queue to receive
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects from.
out_q (:class:`asyncio.Queue`): The queue to put
:class:`~pulpcore.plugin.stages.DeclarativeContent` into.
Returns:
The coroutine for this stage.
"""
async for batch in self.batches(in_q):
all_artifacts_q = Q(pk=None)
for content in batch:
for declarative_artifact in content.d_artifacts:
one_artifact_q = Q()
for digest_name in declarative_artifact.artifact.DIGEST_FIELDS:
digest_value = getattr(declarative_artifact.artifact, digest_name)
if digest_value:
key = {digest_name: digest_value}
one_artifact_q &= Q(**key)
if one_artifact_q:
all_artifacts_q |= one_artifact_q

for artifact in Artifact.objects.filter(all_artifacts_q):
for content in batch:
for declarative_artifact in content.d_artifacts:
for digest_name in artifact.DIGEST_FIELDS:
digest_value = getattr(declarative_artifact.artifact, digest_name)
if digest_value and digest_value == getattr(artifact, digest_name):
declarative_artifact.artifact = artifact
break

artifacts_to_save = []

for declarative_content in batch:
for declarative_artifact in declarative_content.d_artifacts:
if declarative_artifact.artifact.pk is None:
declarative_artifact.artifact.file = str(
declarative_artifact.artifact.file)
artifacts_to_save.append(declarative_artifact.artifact)

if artifacts_to_save:
Artifact.objects.bulk_create(artifacts_to_save)

for declarative_content in batch:
await out_q.put(declarative_content)
await out_q.put(None)

0 comments on commit ac19147

Please sign in to comment.