Skip to content

Commit

Permalink
Fix sync optimization when sync_policy=mirror_complete
Browse files Browse the repository at this point in the history
  • Loading branch information
dralley committed Oct 27, 2021
1 parent 35d761e commit ed92318
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGES/9535.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug that could result in incomplete repo metadata when "mirror_complete" sync policy is combined with the "optimize" option.
187 changes: 101 additions & 86 deletions pulp_rpm/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,10 @@ def get_treeinfo_data(remote, remote_url):

def get_sync_details(remote, url, sync_policy, version):
with tempfile.TemporaryDirectory("."):
try:
result = get_repomd_file(remote, url)
repomd_path = result.path
repomd = cr.Repomd(repomd_path)
repomd_checksum = get_sha256(repomd_path)
except Exception:
repomd_checksum = None
result = get_repomd_file(remote, url)
repomd_path = result.path
repomd = cr.Repomd(repomd_path)
repomd_checksum = get_sha256(repomd_path)

return {
"url": remote.url, # use the original remote url so that mirrorlists are optimizable
Expand All @@ -449,105 +446,123 @@ def get_sync_details(remote, url, sync_policy, version):
"repomd_checksum": repomd_checksum,
}

mirror = sync_policy.startswith("mirror")
mirror_metadata = sync_policy == SYNC_POLICIES.MIRROR_COMPLETE

repo_sync_config = {}
# this is the "directory" of the repo within the target repo location - for the primary
# repo, they are the same
PRIMARY_REPO = ""

def is_subrepo(directory):
return directory != PRIMARY_REPO

with tempfile.TemporaryDirectory("."):
remote_url = fetch_remote_url(remote, url)
sync_details = get_sync_details(
remote, remote_url, sync_policy, repository.latest_version()
)
last_sync_details = repository.last_sync_details

if optimize and should_optimize_sync(sync_details, last_sync_details):
repo_sync_config[PRIMARY_REPO] = {
"should_skip": should_optimize_sync(sync_details, repository.last_sync_details),
"sync_details": sync_details,
"url": remote_url,
"repo": repository,
}

treeinfo = get_treeinfo_data(remote, remote_url)

if treeinfo:
treeinfo["repositories"] = {}
for repodata in set(treeinfo["download"]["repodatas"]):
if repodata == DIST_TREE_MAIN_REPO_PATH:
treeinfo["repositories"].update({repodata: None})
continue
name = f"{repodata}-{treeinfo['hash']}"
sub_repo, created = RpmRepository.objects.get_or_create(name=name, user_hidden=True)
if created:
sub_repo.save()
directory = treeinfo["repo_map"][repodata]
treeinfo["repositories"].update({directory: str(sub_repo.pk)})
path = f"{repodata}/"
new_url = urlpath_sanitize(remote_url, path)

try:
subrepo_sync_details = get_sync_details(
remote, new_url, sync_policy, sub_repo.latest_version()
)
except ClientResponseError as exc:
if is_subrepo(directory) and exc.status == 404:
log.warning("Unable to sync sub-repo '{}' from treeinfo.".format(directory))
continue
raise exc

repo_sync_config[directory] = {
"should_skip": should_optimize_sync(
subrepo_sync_details, sub_repo.last_sync_details
),
"sync_details": subrepo_sync_details,
"url": new_url,
"repo": sub_repo,
}

# If all repos are exactly the same, we should skip all further processing, even in
# metadata-mirror mode
if optimize and all([config["should_skip"] for config in repo_sync_config.values()]):
with ProgressReport(
message="Skipping Sync (no change from previous sync)", code="sync.was_skipped"
) as pb:
pb.done = 1
pb.done = len(repo_sync_config)
pb.total = len(repo_sync_config)
return

treeinfo = get_treeinfo_data(remote, remote_url)

sub_repos = []
skipped_syncs = 0
repo_sync_results = {}

mirror = sync_policy.startswith("mirror")
mirror_metadata = sync_policy == SYNC_POLICIES.MIRROR_COMPLETE

if treeinfo:
treeinfo["repositories"] = {}
for repodata in set(treeinfo["download"]["repodatas"]):
if repodata == DIST_TREE_MAIN_REPO_PATH:
treeinfo["repositories"].update({repodata: None})
# If some repos need to be synced and others do not, we go through them all
for directory, repo_config in repo_sync_config.items():
repo = repo_config["repo"]
# If metadata_mirroring is enabled we cannot skip any syncs, because the generated
# publication needs to contain exactly the same metadata at the same paths.
if not mirror_metadata and optimize and repo_config["should_skip"]:
skipped_syncs += 1
continue
name = f"{repodata}-{treeinfo['hash']}"
sub_repo, created = RpmRepository.objects.get_or_create(name=name, user_hidden=True)
if created:
sub_repo.save()
directory = treeinfo["repo_map"][repodata]
treeinfo["repositories"].update({directory: str(sub_repo.pk)})
path = f"{repodata}/"
new_url = urlpath_sanitize(remote_url, path)

try:
with tempfile.TemporaryDirectory("."):
get_repomd_file(remote, new_url)
except ClientResponseError as exc:
if exc.status == 404:
continue
raise exc
else:
subrepo_sync_details = get_sync_details(
remote, new_url, sync_policy, sub_repo.latest_version()
)
last_sync_details = sub_repo.last_sync_details

if optimize and should_optimize_sync(subrepo_sync_details, last_sync_details):
continue

stage = RpmFirstStage(
remote,
sub_repo,
deferred_download,
mirror_metadata,
skip_types=skip_types,
new_url=new_url,
namespace=directory,
)
dv = RpmDeclarativeVersion(first_stage=stage, repository=sub_repo, mirror=mirror)
subrepo_version = dv.create()

# save sync parameters to allow for sync optimization
if subrepo_version:
subrepo_sync_details["most_recent_version"] = subrepo_version.number
sub_repos.append((directory, subrepo_version))
stage = RpmFirstStage(
remote,
repo,
deferred_download,
mirror_metadata,
skip_types=skip_types,
new_url=repo_config["url"],
treeinfo=(treeinfo if not is_subrepo(directory) else None),
namespace=directory,
)

sub_repo.last_sync_details = subrepo_sync_details
sub_repo.save()
dv = RpmDeclarativeVersion(first_stage=stage, repository=repo, mirror=mirror)
repo_version = dv.create() or repo.latest_version()

first_stage = RpmFirstStage(
remote,
repository,
deferred_download,
mirror_metadata,
skip_types=skip_types,
treeinfo=treeinfo,
new_url=remote_url,
)
dv = RpmDeclarativeVersion(first_stage=first_stage, repository=repository, mirror=mirror)
version = dv.create()
repo_config["sync_details"]["most_recent_version"] = repo_version.number
repo.last_sync_details = repo_config["sync_details"]
repo.save()

# save sync parameters to allow for sync optimization
if version:
sync_details["most_recent_version"] = version.number
repo_sync_results[directory] = repo_version

repository.last_sync_details = sync_details
repository.save()
if skipped_syncs:
with ProgressReport(
message="Skipping Sync (no change from previous sync)", code="sync.was_skipped"
) as pb:
pb.done = skipped_syncs
pb.total = len(repo_sync_config)

if mirror_metadata:
version_to_publish = version if version else repository.latest_version()
with RpmPublication.create(version_to_publish, pass_through=False) as publication:
add_metadata_to_publication(publication, version_to_publish)
for (name, subrepo_version) in sub_repos:
add_metadata_to_publication(publication, subrepo_version, prefix=name)
with RpmPublication.create(
repo_sync_results[PRIMARY_REPO], pass_through=False
) as publication:
for (path, repo_version) in repo_sync_results.items():
add_metadata_to_publication(publication, repo_version, prefix=path)

return version
return repo_sync_results[PRIMARY_REPO]


class RpmDeclarativeVersion(DeclarativeVersion):
Expand Down Expand Up @@ -614,7 +629,7 @@ def __init__(
skip_types=None,
new_url=None,
treeinfo=None,
namespace=None,
namespace="",
):
"""
The first stage of a pulp_rpm sync pipeline.
Expand Down

0 comments on commit ed92318

Please sign in to comment.