From 80c78c5bb39f78f954b69799c65b23351d9a3a6e Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Wed, 27 Oct 2021 14:53:45 -0400 Subject: [PATCH] Fix sync optimization when sync_policy=mirror_complete backports: #9535 https://pulp.plan.io/issues/9535 fixes #9536 (cherry picked from commit ed923180558e125bb08c85f738db66254bbe7818) --- CHANGES/9536.bugfix | 2 + pulp_rpm/app/tasks/synchronizing.py | 187 +++++++++++++++------------- 2 files changed, 103 insertions(+), 86 deletions(-) create mode 100644 CHANGES/9536.bugfix diff --git a/CHANGES/9536.bugfix b/CHANGES/9536.bugfix new file mode 100644 index 000000000..e06b21f99 --- /dev/null +++ b/CHANGES/9536.bugfix @@ -0,0 +1,2 @@ +Fixed a bug that could result in incomplete repo metadata when "mirror_complete" sync policy is combined with the "optimize" option. +(backported from #9535) diff --git a/pulp_rpm/app/tasks/synchronizing.py b/pulp_rpm/app/tasks/synchronizing.py index ce672693f..6c3f187b0 100644 --- a/pulp_rpm/app/tasks/synchronizing.py +++ b/pulp_rpm/app/tasks/synchronizing.py @@ -424,13 +424,10 @@ def get_treeinfo_data(remote, remote_url): def get_sync_details(remote, url, sync_policy, version): with tempfile.TemporaryDirectory("."): - try: - result = get_repomd_file(remote, url) - repomd_path = result.path - repomd = cr.Repomd(repomd_path) - repomd_checksum = get_sha256(repomd_path) - except Exception: - repomd_checksum = None + result = get_repomd_file(remote, url) + repomd_path = result.path + repomd = cr.Repomd(repomd_path) + repomd_checksum = get_sha256(repomd_path) return { "url": remote.url, # use the original remote url so that mirrorlists are optimizable @@ -441,105 +438,123 @@ def get_sync_details(remote, url, sync_policy, version): "repomd_checksum": repomd_checksum, } + mirror = sync_policy.startswith("mirror") + mirror_metadata = sync_policy == SYNC_POLICIES.MIRROR_COMPLETE + + repo_sync_config = {} + # this is the "directory" of the repo within the target repo location - for the primary + # repo, they are the same + PRIMARY_REPO = "" + + def is_subrepo(directory): + return directory != PRIMARY_REPO + with tempfile.TemporaryDirectory("."): remote_url = fetch_remote_url(remote) sync_details = get_sync_details( remote, remote_url, sync_policy, repository.latest_version() ) - last_sync_details = repository.last_sync_details - if optimize and should_optimize_sync(sync_details, last_sync_details): + repo_sync_config[PRIMARY_REPO] = { + "should_skip": should_optimize_sync(sync_details, repository.last_sync_details), + "sync_details": sync_details, + "url": remote_url, + "repo": repository, + } + + treeinfo = get_treeinfo_data(remote, remote_url) + + if treeinfo: + treeinfo["repositories"] = {} + for repodata in set(treeinfo["download"]["repodatas"]): + if repodata == DIST_TREE_MAIN_REPO_PATH: + treeinfo["repositories"].update({repodata: None}) + continue + name = f"{repodata}-{treeinfo['hash']}" + sub_repo, created = RpmRepository.objects.get_or_create(name=name, user_hidden=True) + if created: + sub_repo.save() + directory = treeinfo["repo_map"][repodata] + treeinfo["repositories"].update({directory: str(sub_repo.pk)}) + path = f"{repodata}/" + new_url = urlpath_sanitize(remote_url, path) + + try: + subrepo_sync_details = get_sync_details( + remote, new_url, sync_policy, sub_repo.latest_version() + ) + except ClientResponseError as exc: + if is_subrepo(directory) and exc.status == 404: + log.warning("Unable to sync sub-repo '{}' from treeinfo.".format(directory)) + continue + raise exc + + repo_sync_config[directory] = { + "should_skip": should_optimize_sync( + subrepo_sync_details, sub_repo.last_sync_details + ), + "sync_details": subrepo_sync_details, + "url": new_url, + "repo": sub_repo, + } + + # If all repos are exactly the same, we should skip all further processing, even in + # metadata-mirror mode + if optimize and all([config["should_skip"] for config in repo_sync_config.values()]): with ProgressReport( message="Skipping Sync (no change from previous sync)", code="sync.was_skipped" ) as pb: - pb.done = 1 + pb.done = len(repo_sync_config) + pb.total = len(repo_sync_config) return - treeinfo = get_treeinfo_data(remote, remote_url) - - sub_repos = [] + skipped_syncs = 0 + repo_sync_results = {} - mirror = sync_policy.startswith("mirror") - mirror_metadata = sync_policy == SYNC_POLICIES.MIRROR_COMPLETE - - if treeinfo: - treeinfo["repositories"] = {} - for repodata in set(treeinfo["download"]["repodatas"]): - if repodata == DIST_TREE_MAIN_REPO_PATH: - treeinfo["repositories"].update({repodata: None}) + # If some repos need to be synced and others do not, we go through them all + for directory, repo_config in repo_sync_config.items(): + repo = repo_config["repo"] + # If metadata_mirroring is enabled we cannot skip any syncs, because the generated + # publication needs to contain exactly the same metadata at the same paths. + if not mirror_metadata and optimize and repo_config["should_skip"]: + skipped_syncs += 1 continue - name = f"{repodata}-{treeinfo['hash']}" - sub_repo, created = RpmRepository.objects.get_or_create(name=name, user_hidden=True) - if created: - sub_repo.save() - directory = treeinfo["repo_map"][repodata] - treeinfo["repositories"].update({directory: str(sub_repo.pk)}) - path = f"{repodata}/" - new_url = urlpath_sanitize(remote_url, path) - - try: - with tempfile.TemporaryDirectory("."): - get_repomd_file(remote, new_url) - except ClientResponseError as exc: - if exc.status == 404: - continue - raise exc - else: - subrepo_sync_details = get_sync_details( - remote, new_url, sync_policy, sub_repo.latest_version() - ) - last_sync_details = sub_repo.last_sync_details - if optimize and should_optimize_sync(subrepo_sync_details, last_sync_details): - continue - - stage = RpmFirstStage( - remote, - sub_repo, - deferred_download, - mirror_metadata, - skip_types=skip_types, - new_url=new_url, - namespace=directory, - ) - dv = RpmDeclarativeVersion(first_stage=stage, repository=sub_repo, mirror=mirror) - subrepo_version = dv.create() - - # save sync parameters to allow for sync optimization - if subrepo_version: - subrepo_sync_details["most_recent_version"] = subrepo_version.number - sub_repos.append((directory, subrepo_version)) + stage = RpmFirstStage( + remote, + repo, + deferred_download, + mirror_metadata, + skip_types=skip_types, + new_url=repo_config["url"], + treeinfo=(treeinfo if not is_subrepo(directory) else None), + namespace=directory, + ) - sub_repo.last_sync_details = subrepo_sync_details - sub_repo.save() + dv = RpmDeclarativeVersion(first_stage=stage, repository=repo, mirror=mirror) + repo_version = dv.create() or repo.latest_version() - first_stage = RpmFirstStage( - remote, - repository, - deferred_download, - mirror_metadata, - skip_types=skip_types, - treeinfo=treeinfo, - new_url=remote_url, - ) - dv = RpmDeclarativeVersion(first_stage=first_stage, repository=repository, mirror=mirror) - version = dv.create() + repo_config["sync_details"]["most_recent_version"] = repo_version.number + repo.last_sync_details = repo_config["sync_details"] + repo.save() - # save sync parameters to allow for sync optimization - if version: - sync_details["most_recent_version"] = version.number + repo_sync_results[directory] = repo_version - repository.last_sync_details = sync_details - repository.save() + if skipped_syncs: + with ProgressReport( + message="Skipping Sync (no change from previous sync)", code="sync.was_skipped" + ) as pb: + pb.done = skipped_syncs + pb.total = len(repo_sync_config) if mirror_metadata: - version_to_publish = version if version else repository.latest_version() - with RpmPublication.create(version_to_publish, pass_through=False) as publication: - add_metadata_to_publication(publication, version_to_publish) - for (name, subrepo_version) in sub_repos: - add_metadata_to_publication(publication, subrepo_version, prefix=name) + with RpmPublication.create( + repo_sync_results[PRIMARY_REPO], pass_through=False + ) as publication: + for (path, repo_version) in repo_sync_results.items(): + add_metadata_to_publication(publication, repo_version, prefix=path) - return version + return repo_sync_results[PRIMARY_REPO] class RpmDeclarativeVersion(DeclarativeVersion): @@ -591,7 +606,7 @@ def __init__( skip_types=None, new_url=None, treeinfo=None, - namespace=None, + namespace="", ): """ The first stage of a pulp_rpm sync pipeline.