From 5c79cc72daf9678112bb22d5aee4085725fe9a53 Mon Sep 17 00:00:00 2001 From: "D. Paolella" Date: Wed, 23 Nov 2022 13:41:04 +0100 Subject: [PATCH] Make functions async: - create_multi_manifest_list - build_multi_istag - sync_heterogeneous_payloads - sync_payloads - run - _check_nightly_consistency - check_nightlies_consistency - generate_assembly_issues_report - generate_assembly_report - create_multi_release_image - sync_heterogeneous_payloads - mirror_payload_content - create_multi_release_manifest_list - util.find_manifest_list_sha --- doozerlib/cli/release_gen_payload.py | 112 ++++++++++++++++----------- doozerlib/util.py | 4 +- 2 files changed, 67 insertions(+), 49 deletions(-) diff --git a/doozerlib/cli/release_gen_payload.py b/doozerlib/cli/release_gen_payload.py index 562eb182f..978d7017e 100644 --- a/doozerlib/cli/release_gen_payload.py +++ b/doozerlib/cli/release_gen_payload.py @@ -1,3 +1,4 @@ +import asyncio from datetime import datetime import hashlib import traceback @@ -102,7 +103,7 @@ def release_gen_payload(runtime: Runtime, is_name: str, is_namespace: str, organ read and propagate/expose this annotation in its display of the release image. """ runtime.initialize(mode="both", clone_distgits=False, clone_source=False, prevent_cloning=True) - GenPayloadCli( + pipeline = GenPayloadCli( runtime, is_name or assembly_imagestream_base_name(runtime), is_namespace or default_imagestream_namespace_base_name(), @@ -111,7 +112,9 @@ def release_gen_payload(runtime: Runtime, is_name: str, is_namespace: str, organ exclude_arch, skip_gc_tagging, emergency_ignore_issues, apply, apply_multi_arch, moist_run - ).run() + ) + + asyncio.get_event_loop().run_until_complete(pipeline.run()) def default_imagestream_base_name(version: str) -> str: @@ -259,7 +262,7 @@ def __init__( # do we proceed with this payload after weighing issues against permits? self.payload_permitted = False - def run(self): + async def run(self): """Main entry point once instantiated with CLI inputs.""" self.validate_parameters() @@ -268,14 +271,14 @@ def run(self): assembly_inspector = AssemblyInspector(rt, rt.build_retrying_koji_client()) self.payload_entries_for_arch = self.generate_payload_entries(assembly_inspector) - assembly_report: Dict = self.generate_assembly_report(assembly_inspector) + assembly_report: Dict = await self.generate_assembly_report(assembly_inspector) print(yaml.dump(assembly_report, default_flow_style=False, indent=2)) with self.output_path.joinpath("assembly-report.yaml").open(mode="w") as report_file: yaml.dump(assembly_report, stream=report_file, default_flow_style=False, indent=2) self.assess_assembly_viability() - self.sync_payloads() # even when not permitted, produce what we _would have_ synced + await self.sync_payloads() # even when not permitted, produce what we _would have_ synced if self.payload_permitted: exit(0) @@ -302,7 +305,7 @@ def validate_parameters(self): "Cannot create a multi nightly without including the full set of images. " "Either include all images/arches or omit --apply-multi-arch") - def generate_assembly_report(self, assembly_inspector: AssemblyInspector) -> Dict: + async def generate_assembly_report(self, assembly_inspector: AssemblyInspector) -> Dict: """Generate a status report of the search for inconsistencies across all payloads generated.""" rt = self.runtime report = dict( @@ -313,13 +316,13 @@ def generate_assembly_report(self, assembly_inspector: AssemblyInspector) -> Dic if ii is None ], # A list of metas where the assembly did not find a build ) - report["viable"], report["assembly_issues"] = self.generate_assembly_issues_report(assembly_inspector) + report["viable"], report["assembly_issues"] = await self.generate_assembly_issues_report(assembly_inspector) self.payload_permitted = report["viable"] return report - def generate_assembly_issues_report(self, assembly_inspector: AssemblyInspector) -> (bool, Dict[str, Dict]): + async def generate_assembly_issues_report(self, assembly_inspector: AssemblyInspector) -> (bool, Dict[str, Dict]): """Populate self.assembly_issues and payload entries with inconsistencies found.""" rt = self.runtime self.logger.info("Checking assembly content for inconsistencies.") @@ -344,7 +347,7 @@ def generate_assembly_issues_report(self, assembly_inspector: AssemblyInspector) self.detect_extend_payload_entry_issues(assembly_inspector) # If the assembly claims to have reference nightlies, assert that our payload matches them exactly. - self.assembly_issues.extend(PayloadGenerator.check_nightlies_consistency(assembly_inspector)) + self.assembly_issues.extend(await PayloadGenerator.check_nightlies_consistency(assembly_inspector)) return self.summarize_issue_permits(assembly_inspector) @@ -571,7 +574,7 @@ def assess_assembly_viability(self): self.apply = False self.apply_multi_arch = False - def sync_payloads(self): + async def sync_payloads(self): """ Use the payload entries that have been generated across the arches to mirror the images out to quay and create the imagestream definitions that will update the release-controller. @@ -587,20 +590,23 @@ def sync_payloads(self): False: dict() } + tasks = [] for arch, payload_entries in self.payload_entries_for_arch.items(): - self.mirror_payload_content(arch, payload_entries) + tasks.append(self.mirror_payload_content(arch, payload_entries)) for private_mode in self.privacy_modes: self.logger.info(f"Building payload files for architecture: {arch}; private: {private_mode}") self.generate_specific_payload_imagestreams(arch, private_mode, payload_entries, multi_specs) + await asyncio.gather(*tasks) if self.apply_multi_arch: if self.runtime.group_config.multi_arch.enabled: - self.sync_heterogeneous_payloads(multi_specs) + await self.sync_heterogeneous_payloads(multi_specs) else: self.logger.info("--apply-multi-arch is enabled but the group config / assembly does " "not have group.multi_arch.enabled==true") - def mirror_payload_content(self, arch: str, payload_entries: Dict[str, PayloadEntry]): + @exectools.limit_concurrency(500) + async def mirror_payload_content(self, arch: str, payload_entries: Dict[str, PayloadEntry]): """Ensure an arch's payload entries are synced out for the public to access.""" # Prevents writing the same destination twice (not supported by oc if in the same mirroring file): mirror_src_for_dest: Dict[str, str] = dict() @@ -626,7 +632,11 @@ def mirror_payload_content(self, arch: str, payload_entries: Dict[str, PayloadEn if self.apply or self.apply_multi_arch: self.logger.info(f"Mirroring images from {str(src_dest_path)}") - exectools.cmd_assert(f"oc image mirror --keep-manifest-list --filename={str(src_dest_path)}", retries=3, timeout=1800) + try: + await asyncio.wait_for(exectools.cmd_assert_async( + f"oc image mirror --keep-manifest-list --filename={str(src_dest_path)}", retries=3), timeout=1800) + except asyncio.TimeoutError: + pass def generate_specific_payload_imagestreams( self, arch: str, private_mode: bool, @@ -759,7 +769,7 @@ def update_single_arch_istags(apiobj: oc.APIObject): modify_and_replace_api_object(istream_apiobj, update_single_arch_istags, self.output_path, self.moist_run) return pruning_tags, adding_tags - def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str, PayloadEntry]]]): + async def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str, PayloadEntry]]]): """ We now generate the artifacts to create heterogeneous release payloads (suitable for clusters with multiple arches present). A heterogeneous or 'multi' release payload is a @@ -806,10 +816,10 @@ def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str multi_release_manifest_list_tag: str # The quay.io tag to preserve the multi payload multi_release_istag, multi_release_manifest_list_tag = self.get_multi_release_names(private_mode) - multi_istags: List[Dict] = list( - self.build_multi_istag(tag_name, arch_to_payload_entry, imagestream_namespace) - for tag_name, arch_to_payload_entry in multi_specs[private_mode].items() - ) + tasks = [] + for tag_name, arch_to_payload_entry in multi_specs[private_mode].items(): + tasks.append(self.build_multi_istag(tag_name, arch_to_payload_entry, imagestream_namespace)) + multi_istags: List[Dict] = await asyncio.gather(*tasks) # now multi_istags contains istags which all point to component manifest lists. We must # run oc adm release new on this set of tags -- once for each arch - to create the arch @@ -822,7 +832,7 @@ def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str # We will then stitch those arch specific payload images together into a release payload # manifest list. multi_release_dest: str = f"quay.io/{'/'.join(self.release_repo)}:{multi_release_manifest_list_tag}" - final_multi_pullspec: str = self.create_multi_release_image( + final_multi_pullspec: str = await self.create_multi_release_image( imagestream_name, multi_release_is, multi_release_dest, multi_release_istag, multi_specs, private_mode) self.logger.info(f"The final pull_spec for the multi release payload is: {final_multi_pullspec}") @@ -857,7 +867,7 @@ def get_multi_release_names(self, private_mode: bool) -> Tuple[str, str]: multi_release_istag = multi_release_manifest_list_tag return multi_release_istag, multi_release_manifest_list_tag - def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, PayloadEntry], imagestream_namespace: str) -> Dict: + async def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, PayloadEntry], imagestream_namespace: str) -> Dict: """Build a single imagestream tag for a component in a multi-arch payload.""" # There are two flows: # 1. The images for ALL arches were part of the same brew built manifest list. In this case, @@ -876,7 +886,8 @@ def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, Payl self.logger.info(f"Reusing brew manifest-list {output_digest_pullspec} for component {tag_name}") else: # Flow 2: Build a new manifest list and push it to quay. - output_digest_pullspec = self.create_multi_manifest_list(tag_name, arch_to_payload_entry, imagestream_namespace) + output_digest_pullspec = \ + await self.create_multi_manifest_list(tag_name, arch_to_payload_entry, imagestream_namespace) issues = list(issue # collect issues from each payload entry. for payload_entry in entries @@ -887,7 +898,7 @@ def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, Payl issues=issues, )) - def create_multi_manifest_list( + async def create_multi_manifest_list( self, tag_name: str, arch_to_payload_entry: Dict[str, PayloadEntry], imagestream_namespace: str) -> str: @@ -924,12 +935,13 @@ def create_multi_manifest_list( yaml.safe_dump( dict(image=output_pullspec, manifests=manifests), stream=ml, default_flow_style=False) - exectools.cmd_assert(f"manifest-tool push from-spec {str(component_manifest_path)}", retries=3) + await exectools.cmd_assert_async(f"manifest-tool push from-spec {str(component_manifest_path)}", retries=3) # we are pushing a new manifest list, so return its sha256 based pullspec - return exchange_pullspec_tag_for_shasum(output_pullspec, find_manifest_list_sha(output_pullspec)) + sha = await find_manifest_list_sha(output_pullspec) + return exchange_pullspec_tag_for_shasum(output_pullspec, sha) - def create_multi_release_image( + async def create_multi_release_image( self, imagestream_name: str, multi_release_is: Dict, multi_release_dest: str, multi_release_name: str, multi_specs: Dict[bool, Dict[str, Dict[str, PayloadEntry]]], private_mode: bool) -> str: @@ -946,24 +958,28 @@ def create_multi_release_image( yaml.safe_dump(multi_release_is, mf) arch_release_dests: Dict[str, str] = dict() # This will map arch names to a release payload pullspec we create for that arch (i.e. based on the arch's CVO image) + tasks = [] for arch, cvo_entry in multi_specs[private_mode]["cluster-version-operator"].items(): arch_release_dests[arch] = f"{multi_release_dest}-{arch}" # Create the arch specific release payload containing tags pointing to manifest list # component images. - exectools.cmd_assert([ - "oc", "adm", "release", "new", - f"--name={multi_release_name}", - "--reference-mode=source", - "--keep-manifest-list", - f"--from-image-stream-file={str(multi_release_is_path)}", - f"--to-image-base={cvo_entry.dest_pullspec}", - f"--to-image={arch_release_dests[arch]}", - "--metadata", json.dumps({"release.openshift.io/architecture": "multi"}) - ]) - - return self.create_multi_release_manifest_list(arch_release_dests, imagestream_name, multi_release_dest) - - def create_multi_release_manifest_list( + tasks.append( + exectools.cmd_assert_async([ + "oc", "adm", "release", "new", + f"--name={multi_release_name}", + "--reference-mode=source", + "--keep-manifest-list", + f"--from-image-stream-file={str(multi_release_is_path)}", + f"--to-image-base={cvo_entry.dest_pullspec}", + f"--to-image={arch_release_dests[arch]}", + "--metadata", json.dumps({"release.openshift.io/architecture": "multi"}) + ]) + ) + await asyncio.gather(*tasks) + + return await self.create_multi_release_manifest_list(arch_release_dests, imagestream_name, multi_release_dest) + + async def create_multi_release_manifest_list( self, arch_release_dests: Dict[str, str], imagestream_name: str, multi_release_dest: str) -> str: """ @@ -988,10 +1004,10 @@ def create_multi_release_manifest_list( yaml.safe_dump(ml_dict, stream=ml, default_flow_style=False) # Construct the top level manifest list release payload - exectools.cmd_assert(f"manifest-tool push from-spec {str(release_payload_ml_path)}", retries=3) + await exectools.cmd_assert_async(f"manifest-tool push from-spec {str(release_payload_ml_path)}", retries=3) # if we are actually pushing a manifest list, then we should derive a sha256 based pullspec - return exchange_pullspec_tag_for_shasum( - multi_release_dest, find_manifest_list_sha(multi_release_dest)) + sha = await find_manifest_list_sha(multi_release_dest) + return exchange_pullspec_tag_for_shasum(multi_release_dest, sha) def apply_multi_imagestream_update(self, final_multi_pullspec: str, imagestream_name: str, multi_release_istag: str): """ @@ -1360,7 +1376,7 @@ def get_group_payload_tag_mapping(assembly_inspector: AssemblyInspector, arch: s return members @staticmethod - def _check_nightly_consistency(assembly_inspector: AssemblyInspector, nightly: str, arch: str) -> List[AssemblyIssue]: + async def _check_nightly_consistency(assembly_inspector: AssemblyInspector, nightly: str, arch: str) -> List[AssemblyIssue]: runtime = assembly_inspector.runtime def terminal_issue(msg: str) -> List[AssemblyIssue]: @@ -1380,7 +1396,7 @@ def terminal_issue(msg: str) -> List[AssemblyIssue]: rc = -1 pullspec = f"registry.ci.openshift.org/ocp{rc_suffix}/release{rc_suffix}:{nightly}" while retries > 0: - rc, release_json_str, err = exectools.cmd_gather(f"oc adm release info {pullspec} -o=json") + rc, release_json_str, err = await exectools.cmd_gather_async(f"oc adm release info {pullspec} -o=json") if rc == 0: break runtime.logger.warn(f"Error accessing nightly release info for {pullspec}: {err}") @@ -1427,7 +1443,7 @@ def terminal_issue(msg: str) -> List[AssemblyIssue]: return issues @staticmethod - def check_nightlies_consistency(assembly_inspector: AssemblyInspector) -> List[AssemblyIssue]: + async def check_nightlies_consistency(assembly_inspector: AssemblyInspector) -> List[AssemblyIssue]: """ If this assembly has reference-releases, check whether the current images selected by the assembly are an exact match for the nightly contents. @@ -1437,7 +1453,9 @@ def check_nightlies_consistency(assembly_inspector: AssemblyInspector) -> List[A return [] issues: List[AssemblyIssue] = [] + tasks = [] for arch, nightly in basis.reference_releases.primitive().items(): - issues.extend(PayloadGenerator._check_nightly_consistency(assembly_inspector, nightly, arch)) + tasks.append(PayloadGenerator._check_nightly_consistency(assembly_inspector, nightly, arch)) + issues.extend(await asyncio.gather(*tasks)) return issues diff --git a/doozerlib/util.py b/doozerlib/util.py index 653477fcb..9cbd08c9b 100644 --- a/doozerlib/util.py +++ b/doozerlib/util.py @@ -781,9 +781,9 @@ def get_release_calc_previous(version, arch, return sort_semver(list(upgrade_from)) -def find_manifest_list_sha(pull_spec): +async def find_manifest_list_sha(pull_spec): cmd = 'oc image info --filter-by-os=linux/amd64 -o json {}'.format(pull_spec) - out, err = exectools.cmd_assert(cmd, retries=3) + out, err = await exectools.cmd_assert_async(cmd, retries=3) image_data = json.loads(out) if 'listDigest' not in image_data: raise ValueError('Specified image is not a manifest-list.')