Skip to content

Commit

Permalink
Merge colliding structure content
Browse files Browse the repository at this point in the history
closes #599

Includes a DB migration that re-writes any colliding structure content,
thus altering existing repository versions. So re-written repo versions
will result in structurally identical publications, as compared to
before the change. The only difference is that post-migration
publications will lose any duplicate package paragraphs.

This change will be released as part of pulp_deb version 3.0.0!
  • Loading branch information
quba42 committed Jun 19, 2023
1 parent 6be8ba1 commit c2de7cc
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGES/599.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
This change includes a large DB migration to drop 'codename' and 'suite' from the uniqueness constraints of all structure content.
The migration will merge any resulting collisions and alter all records with a foreign key relation to the so eliminated content to point at the merge result instead.
1 change: 1 addition & 0 deletions CHANGES/599.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The codename and suite fields are removed from the ReleaseComponent and ReleaseArchitecture models and all associated filters and viewsets.
6 changes: 3 additions & 3 deletions docs/_scripts/structured_repo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ TASK_HREF=$(http ${BASE_ADDR}/pulp/api/v3/distributions/deb/apt/ name=myrepo bas
wait_until_task_finished $BASE_ADDR$TASK_HREF

# create the necessary content (release, comp, architecture)
RELEASE_HREF=$(http ${BASE_ADDR}/pulp/api/v3/content/deb/releases/ codename=mycodename suite=mysuite distribution=mydist | jq -r .pulp_href)
RELEASE_HREF=$(http ${BASE_ADDR}/pulp/api/v3/content/deb/releases/ distribution=mydist codename=mycodename suite=mysuite | jq -r .pulp_href)
# Note that creating the release is optional, but without it your published repo will use default values for the suite and the codename in the published Release file.
ARCH_HREF=$(http ${BASE_ADDR}/pulp/api/v3/content/deb/release_architectures/ architecture=ppc64 codename=mycodename suite=mysuite distribution=mydist | jq -r .pulp_href)
COMP_HREF=$(http ${BASE_ADDR}/pulp/api/v3/content/deb/release_components/ component=mycomp codename=mycodename suite=mysuite distribution=mydist | jq -r .pulp_href)
ARCH_HREF=$(http ${BASE_ADDR}/pulp/api/v3/content/deb/release_architectures/ distribution=mydist architecture=ppc64 | jq -r .pulp_href)
COMP_HREF=$(http ${BASE_ADDR}/pulp/api/v3/content/deb/release_components/ distribution=mydist component=mycomp | jq -r .pulp_href)
PKG_COMP_HREF=$(http ${BASE_ADDR}/pulp/api/v3/content/deb/package_release_components/ package=$PACKAGE_HREF release_component=$COMP_HREF | jq -r .pulp_href)

# add our content to the repository
Expand Down
312 changes: 312 additions & 0 deletions pulp_deb/app/migrations/0025_merge_colliding_structure_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
# Generated by Django 3.2.19 on 2023-05-09 12:35, extended manually;

import logging

from datetime import datetime

from django.db import migrations, models
from django.core.exceptions import ObjectDoesNotExist

BATCH_SIZE = 1000

log = logging.getLogger(__name__)


def merge_colliding_structure_content(apps, schema_editor):
ReleaseArchitecture = apps.get_model('deb', 'ReleaseArchitecture')
ReleaseComponent = apps.get_model('deb', 'ReleaseComponent')
PackageReleaseComponent = apps.get_model('deb', 'PackageReleaseComponent')
RepositoryContent = apps.get_model('core', 'RepositoryContent')
RepositoryVersion = apps.get_model('core', 'RepositoryVersion')

print("\n")
log.info("{}: Starting data migration!".format(datetime.now()))

def _get_content_repo_version_set(repo_version_set, repo_content):
version_added = repo_content.version_added.number
if repo_content.version_removed:
version_removed = repo_content.version_added.number
else:
version_removed = max(repo_version_set) + 1
return set([n for n in repo_version_set if version_added <= n < version_removed])

def _get_repo_content_to_update(duplicate_content_ids, content_to_keep):
# Note that len(duplicate_content_ids) is expected to be much smaller than BATCH_SIZE.
# We don't care if the batch is up to len(duplicate_content_ids) larger than BATCH_SIZE.
repo_content_to_update = []
for duplicate_content in RepositoryContent.objects.filter(
content_id__in=duplicate_content_ids
):
repo_version_set = set(
RepositoryVersion.objects.filter(
repository_id=duplicate_content.repository_id
).values_list('number', flat=True)
)
for keep_content in RepositoryContent.objects.filter(
content_id=content_to_keep, repository_id=duplicate_content.repository_id
):
if not keep_content.version_removed and not duplicate_content.version_removed:
# Neither repo_content was ever removed.
first_added = min(
keep_content.version_added.number,
duplicate_content.version_added.number,
)
if keep_content.version_added.number != first_added:
keep_content.version_added = duplicate_content.version_added
keep_content.save()
message = '{}: Merging repo_content "{}" into "{}".'
log.info(
message.format(
datetime.now(), duplicate_content.pulp_id, keep_content.pulp_id
)
)
duplicate_content.delete() # Does this work?
duplicate_content = keep_content
elif keep_content.version_removed and duplicate_content.version_removed:
# Both repo_contents were rmoved at some point.
versions1 = _get_content_repo_version_set(repo_version_set, keep_content)
versions2 = _get_content_repo_version_set(repo_version_set, duplicate_content)
if versions1.intersection(versions2):
# The two repo_content overlap.
joint_version_range = versions1.union(versions2)
first_added = min(joint_version_range)
last_removed = max(joint_version_range)
if keep_content.version_added.number != first_added:
keep_content.version_added = duplicate_content.version_added
if keep_content.version_removed.number != last_removed:
keep_content.version_removed = duplicate_content.version_removed
message = '{}: Merging repo_content "{}" into "{}".'
log.info(
message.format(
datetime.now(), duplicate_content.pulp_id, keep_content.pulp_id
)
)
keep_content.save()
duplicate_content.delete() # Does this work?
duplicate_content = keep_content
else:
# Exactly one repo_content has already been removed
versions1 = _get_content_repo_version_set(repo_version_set, keep_content)
versions2 = _get_content_repo_version_set(repo_version_set, duplicate_content)
if versions1.intersection(versions2):
# The two repo_content overlap.
first_added = min(versions1.union(versions2))
if keep_content.version_added.number != first_added:
keep_content.version_added = duplicate_content.version_added
if keep_content.version_removed:
keep_content.version_removed = None
message = '{}: Merging repo_content "{}" into "{}".'
log.info(
message.format(
datetime.now(), duplicate_content.pulp_id, keep_content.pulp_id
)
)
keep_content.save()
duplicate_content.delete() # Does this work?
duplicate_content = keep_content

duplicate_content.content_id = content_to_keep
repo_content_to_update.append(duplicate_content)

return repo_content_to_update

def _deduplicate_PRC(duplicate_component, component_to_keep):
duplicate_prcs = PackageReleaseComponent.objects.filter(
release_component=duplicate_component
)
repo_content_to_update = []
for duplicate_prc in duplicate_prcs.iterator(chunk_size=BATCH_SIZE):
try:
prc_to_keep = PackageReleaseComponent.objects.get(
release_component=component_to_keep,
package=duplicate_prc.package,
)
except ObjectDoesNotExist:
component = ReleaseComponent.objects.get(pk=component_to_keep)
prc_to_keep = PackageReleaseComponent.objects.create(
pulp_type='deb.package_release_component',
release_component=component,
package=duplicate_prc.package,
)

repo_content_to_update += _get_repo_content_to_update(
[duplicate_prc.pk], prc_to_keep.pk
)

if len(repo_content_to_update) >= BATCH_SIZE:
RepositoryContent.objects.bulk_update(repo_content_to_update, ["content_id"])

# Handle remaining content <= BATCH_SIZE:
if len(repo_content_to_update) > 0:
RepositoryContent.objects.bulk_update(repo_content_to_update, ["content_id"])

PackageReleaseComponent.objects.filter(pk__in=duplicate_prcs).delete()

# Deduplicate ReleaseArchitecture:
distributions = (
ReleaseArchitecture.objects.all()
.distinct('distribution')
.values_list('distribution', flat=True)
)

for distribution in distributions:
architectures = (
ReleaseArchitecture.objects.filter(distribution=distribution)
.distinct('architecture')
.values_list('architecture', flat=True)
)
architecture_ids_to_delete = []
repo_content_to_update = []
for architecture in architectures:
duplicate_architecture_ids = list(
ReleaseArchitecture.objects.filter(
distribution=distribution, architecture=architecture
).values_list('pk', flat=True)
)
if len(duplicate_architecture_ids) > 1:
architecture_to_keep = duplicate_architecture_ids.pop()
message = (
'{}: Merging duplicates for architecture "{}" in distribution "{}" into '
'ReleaseArchitecture "{}"!'
)
log.info(
message.format(datetime.now(), architecture, distribution, architecture_to_keep)
)
architecture_ids_to_delete += duplicate_architecture_ids
repo_content_to_update += _get_repo_content_to_update(
duplicate_architecture_ids, architecture_to_keep
)

if len(architecture_ids_to_delete) >= BATCH_SIZE:
# We assume len(repo_content_to_update)==len(architecture_ids_to_delete)!
RepositoryContent.objects.bulk_update(repo_content_to_update, ["content_id"])
repo_content_to_update = []

ReleaseArchitecture.objects.filter(pk__in=architecture_ids_to_delete).delete()
architecture_ids_to_delete = []

# Handle remaining content <= BATCH_SIZE:
if len(repo_content_to_update) > 0:
RepositoryContent.objects.bulk_update(repo_content_to_update, ["content_id"])

if len(architecture_ids_to_delete) > 0:
ReleaseArchitecture.objects.filter(pk__in=architecture_ids_to_delete).delete()

# Deduplicate ReleaseComponent:
distributions = (
ReleaseComponent.objects.all()
.distinct('distribution')
.values_list('distribution', flat=True)
)
for distribution in distributions:
components = (
ReleaseComponent.objects.filter(distribution=distribution)
.distinct('component')
.values_list('component', flat=True)
)
component_ids_to_delete = []
repo_content_to_update = []
for component in components:
duplicate_component_ids = list(
ReleaseComponent.objects.filter(
distribution=distribution, component=component
).values_list('pk', flat=True)
)
if len(duplicate_component_ids) > 1:
component_to_keep = duplicate_component_ids.pop()
message = (
'{}: Merging duplicates for component "{}" in distribution "{}" into '
'ReleaseComponent "{}"!'
)
log.info(message.format(datetime.now(), component, distribution, component_to_keep))
component_ids_to_delete += duplicate_component_ids
repo_content_to_update += _get_repo_content_to_update(
duplicate_component_ids, component_to_keep
)

# Deduplicate PackageReleaseComponents
for duplicate_component in duplicate_component_ids:
message = (
'{}: Handling PackageReleaseComponents for duplicate ReleaseComponent "{}"!'
)
log.info(message.format(datetime.now(), duplicate_component))
_deduplicate_PRC(duplicate_component, component_to_keep)

if len(component_ids_to_delete) >= BATCH_SIZE:
# We assume len(repo_content_to_update)==len(component_ids_to_delete)!
RepositoryContent.objects.bulk_update(repo_content_to_update, ["content_id"])
repo_content_to_update = []

ReleaseComponent.objects.filter(pk__in=component_ids_to_delete).delete()
component_ids_to_delete = []

# Handle remaining content <= BATCH_SIZE:
if len(repo_content_to_update) > 0:
RepositoryContent.objects.bulk_update(repo_content_to_update, ["content_id"])

if len(component_ids_to_delete) > 0:
ReleaseComponent.objects.filter(pk__in=component_ids_to_delete).delete()

log.info("{}: Data migration completed!\n".format(datetime.now()))


class Migration(migrations.Migration):
dependencies = [
('deb', '0024_add_release_fields'),
]

operations = [
migrations.RunPython(
merge_colliding_structure_content, reverse_code=migrations.RunPython.noop, elidable=True
),
migrations.RunSQL(
sql="SET CONSTRAINTS ALL IMMEDIATE;",
reverse_sql="",
),
migrations.AlterUniqueTogether(
name='releasearchitecture',
unique_together={('distribution', 'architecture')},
),
migrations.AlterUniqueTogether(
name='releasecomponent',
unique_together={('distribution', 'component')},
),
# Give a default value to fields for the sake of back migrating
migrations.AlterField(
model_name='releasearchitecture',
name='codename',
field=models.TextField(default=''),
),
migrations.AlterField(
model_name='releasearchitecture',
name='suite',
field=models.TextField(default=''),
),
migrations.AlterField(
model_name='releasecomponent',
name='codename',
field=models.TextField(default=''),
),
migrations.AlterField(
model_name='releasecomponent',
name='suite',
field=models.TextField(default=''),
),
# Before dropping the fields for good!
migrations.RemoveField(
model_name='releasearchitecture',
name='codename',
),
migrations.RemoveField(
model_name='releasearchitecture',
name='suite',
),
migrations.RemoveField(
model_name='releasecomponent',
name='codename',
),
migrations.RemoveField(
model_name='releasecomponent',
name='suite',
),
]
18 changes: 3 additions & 15 deletions pulp_deb/app/models/content/structure_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,12 @@ class ReleaseArchitecture(Content):

TYPE = "release_architecture"

architecture = models.TextField()
distribution = models.TextField()

# IMPORTANT: The following fields are only part of this model in order to avoid historical
# uniqueness constraint collisions. The plan is to drop these fields from this model ASAP! This
# will require a complex DB migration to sort out any collisions.
codename = models.TextField()
suite = models.TextField()
architecture = models.TextField()

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
unique_together = (("architecture", "distribution", "codename", "suite"),)
unique_together = (("distribution", "architecture"),)


class ReleaseComponent(Content):
Expand All @@ -63,12 +57,6 @@ class ReleaseComponent(Content):
distribution = models.TextField()
component = models.TextField()

# IMPORTANT: The following fields are only part of this model in order to avoid historical
# uniqueness constraint collisions. The plan is to drop these fields from this model ASAP! This
# will require a complex DB migration to sort out any collisions.
codename = models.TextField()
suite = models.TextField()

@property
def plain_component(self):
"""
Expand All @@ -87,7 +75,7 @@ def plain_component(self):

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
unique_together = (("distribution", "component", "codename", "suite"),)
unique_together = (("distribution", "component"),)


class PackageReleaseComponent(Content):
Expand Down

0 comments on commit c2de7cc

Please sign in to comment.