Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

Commit

Permalink
Make functions async:
Browse files Browse the repository at this point in the history
- create_multi_manifest_list
- build_multi_istag
- sync_heterogeneous_payloads
- sync_payloads
- run
- _check_nightly_consistency
- check_nightlies_consistency
- generate_assembly_issues_report
- generate_assembly_report
- create_multi_release_image
- sync_heterogeneous_payloads
- mirror_payload_content
- create_multi_release_manifest_list
- util.find_manifest_list_sha
  • Loading branch information
locriandev committed Nov 25, 2022
1 parent cbb6a2f commit 5c79cc7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 49 deletions.
112 changes: 65 additions & 47 deletions doozerlib/cli/release_gen_payload.py
@@ -1,3 +1,4 @@
import asyncio
from datetime import datetime
import hashlib
import traceback
Expand Down Expand Up @@ -102,7 +103,7 @@ def release_gen_payload(runtime: Runtime, is_name: str, is_namespace: str, organ
read and propagate/expose this annotation in its display of the release image.
"""
runtime.initialize(mode="both", clone_distgits=False, clone_source=False, prevent_cloning=True)
GenPayloadCli(
pipeline = GenPayloadCli(
runtime,
is_name or assembly_imagestream_base_name(runtime),
is_namespace or default_imagestream_namespace_base_name(),
Expand All @@ -111,7 +112,9 @@ def release_gen_payload(runtime: Runtime, is_name: str, is_namespace: str, organ
exclude_arch,
skip_gc_tagging, emergency_ignore_issues,
apply, apply_multi_arch, moist_run
).run()
)

asyncio.get_event_loop().run_until_complete(pipeline.run())


def default_imagestream_base_name(version: str) -> str:
Expand Down Expand Up @@ -259,7 +262,7 @@ def __init__(
# do we proceed with this payload after weighing issues against permits?
self.payload_permitted = False

def run(self):
async def run(self):
"""Main entry point once instantiated with CLI inputs."""
self.validate_parameters()

Expand All @@ -268,14 +271,14 @@ def run(self):
assembly_inspector = AssemblyInspector(rt, rt.build_retrying_koji_client())

self.payload_entries_for_arch = self.generate_payload_entries(assembly_inspector)
assembly_report: Dict = self.generate_assembly_report(assembly_inspector)
assembly_report: Dict = await self.generate_assembly_report(assembly_inspector)

print(yaml.dump(assembly_report, default_flow_style=False, indent=2))
with self.output_path.joinpath("assembly-report.yaml").open(mode="w") as report_file:
yaml.dump(assembly_report, stream=report_file, default_flow_style=False, indent=2)

self.assess_assembly_viability()
self.sync_payloads() # even when not permitted, produce what we _would have_ synced
await self.sync_payloads() # even when not permitted, produce what we _would have_ synced

if self.payload_permitted:
exit(0)
Expand All @@ -302,7 +305,7 @@ def validate_parameters(self):
"Cannot create a multi nightly without including the full set of images. "
"Either include all images/arches or omit --apply-multi-arch")

def generate_assembly_report(self, assembly_inspector: AssemblyInspector) -> Dict:
async def generate_assembly_report(self, assembly_inspector: AssemblyInspector) -> Dict:
"""Generate a status report of the search for inconsistencies across all payloads generated."""
rt = self.runtime
report = dict(
Expand All @@ -313,13 +316,13 @@ def generate_assembly_report(self, assembly_inspector: AssemblyInspector) -> Dic
if ii is None
], # A list of metas where the assembly did not find a build
)
report["viable"], report["assembly_issues"] = self.generate_assembly_issues_report(assembly_inspector)
report["viable"], report["assembly_issues"] = await self.generate_assembly_issues_report(assembly_inspector)

self.payload_permitted = report["viable"]

return report

def generate_assembly_issues_report(self, assembly_inspector: AssemblyInspector) -> (bool, Dict[str, Dict]):
async def generate_assembly_issues_report(self, assembly_inspector: AssemblyInspector) -> (bool, Dict[str, Dict]):
"""Populate self.assembly_issues and payload entries with inconsistencies found."""
rt = self.runtime
self.logger.info("Checking assembly content for inconsistencies.")
Expand All @@ -344,7 +347,7 @@ def generate_assembly_issues_report(self, assembly_inspector: AssemblyInspector)
self.detect_extend_payload_entry_issues(assembly_inspector)

# If the assembly claims to have reference nightlies, assert that our payload matches them exactly.
self.assembly_issues.extend(PayloadGenerator.check_nightlies_consistency(assembly_inspector))
self.assembly_issues.extend(await PayloadGenerator.check_nightlies_consistency(assembly_inspector))

return self.summarize_issue_permits(assembly_inspector)

Expand Down Expand Up @@ -571,7 +574,7 @@ def assess_assembly_viability(self):
self.apply = False
self.apply_multi_arch = False

def sync_payloads(self):
async def sync_payloads(self):
"""
Use the payload entries that have been generated across the arches to mirror the images
out to quay and create the imagestream definitions that will update the release-controller.
Expand All @@ -587,20 +590,23 @@ def sync_payloads(self):
False: dict()
}

tasks = []
for arch, payload_entries in self.payload_entries_for_arch.items():
self.mirror_payload_content(arch, payload_entries)
tasks.append(self.mirror_payload_content(arch, payload_entries))
for private_mode in self.privacy_modes:
self.logger.info(f"Building payload files for architecture: {arch}; private: {private_mode}")
self.generate_specific_payload_imagestreams(arch, private_mode, payload_entries, multi_specs)
await asyncio.gather(*tasks)

if self.apply_multi_arch:
if self.runtime.group_config.multi_arch.enabled:
self.sync_heterogeneous_payloads(multi_specs)
await self.sync_heterogeneous_payloads(multi_specs)
else:
self.logger.info("--apply-multi-arch is enabled but the group config / assembly does "
"not have group.multi_arch.enabled==true")

def mirror_payload_content(self, arch: str, payload_entries: Dict[str, PayloadEntry]):
@exectools.limit_concurrency(500)
async def mirror_payload_content(self, arch: str, payload_entries: Dict[str, PayloadEntry]):
"""Ensure an arch's payload entries are synced out for the public to access."""
# Prevents writing the same destination twice (not supported by oc if in the same mirroring file):
mirror_src_for_dest: Dict[str, str] = dict()
Expand All @@ -626,7 +632,11 @@ def mirror_payload_content(self, arch: str, payload_entries: Dict[str, PayloadEn

if self.apply or self.apply_multi_arch:
self.logger.info(f"Mirroring images from {str(src_dest_path)}")
exectools.cmd_assert(f"oc image mirror --keep-manifest-list --filename={str(src_dest_path)}", retries=3, timeout=1800)
try:
await asyncio.wait_for(exectools.cmd_assert_async(
f"oc image mirror --keep-manifest-list --filename={str(src_dest_path)}", retries=3), timeout=1800)
except asyncio.TimeoutError:
pass

def generate_specific_payload_imagestreams(
self, arch: str, private_mode: bool,
Expand Down Expand Up @@ -759,7 +769,7 @@ def update_single_arch_istags(apiobj: oc.APIObject):
modify_and_replace_api_object(istream_apiobj, update_single_arch_istags, self.output_path, self.moist_run)
return pruning_tags, adding_tags

def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str, PayloadEntry]]]):
async def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str, PayloadEntry]]]):
"""
We now generate the artifacts to create heterogeneous release payloads (suitable for
clusters with multiple arches present). A heterogeneous or 'multi' release payload is a
Expand Down Expand Up @@ -806,10 +816,10 @@ def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str
multi_release_manifest_list_tag: str # The quay.io tag to preserve the multi payload
multi_release_istag, multi_release_manifest_list_tag = self.get_multi_release_names(private_mode)

multi_istags: List[Dict] = list(
self.build_multi_istag(tag_name, arch_to_payload_entry, imagestream_namespace)
for tag_name, arch_to_payload_entry in multi_specs[private_mode].items()
)
tasks = []
for tag_name, arch_to_payload_entry in multi_specs[private_mode].items():
tasks.append(self.build_multi_istag(tag_name, arch_to_payload_entry, imagestream_namespace))
multi_istags: List[Dict] = await asyncio.gather(*tasks)

# now multi_istags contains istags which all point to component manifest lists. We must
# run oc adm release new on this set of tags -- once for each arch - to create the arch
Expand All @@ -822,7 +832,7 @@ def sync_heterogeneous_payloads(self, multi_specs: Dict[bool, Dict[str, Dict[str
# We will then stitch those arch specific payload images together into a release payload
# manifest list.
multi_release_dest: str = f"quay.io/{'/'.join(self.release_repo)}:{multi_release_manifest_list_tag}"
final_multi_pullspec: str = self.create_multi_release_image(
final_multi_pullspec: str = await self.create_multi_release_image(
imagestream_name, multi_release_is, multi_release_dest, multi_release_istag,
multi_specs, private_mode)
self.logger.info(f"The final pull_spec for the multi release payload is: {final_multi_pullspec}")
Expand Down Expand Up @@ -857,7 +867,7 @@ def get_multi_release_names(self, private_mode: bool) -> Tuple[str, str]:
multi_release_istag = multi_release_manifest_list_tag
return multi_release_istag, multi_release_manifest_list_tag

def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, PayloadEntry], imagestream_namespace: str) -> Dict:
async def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, PayloadEntry], imagestream_namespace: str) -> Dict:
"""Build a single imagestream tag for a component in a multi-arch payload."""
# There are two flows:
# 1. The images for ALL arches were part of the same brew built manifest list. In this case,
Expand All @@ -876,7 +886,8 @@ def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, Payl
self.logger.info(f"Reusing brew manifest-list {output_digest_pullspec} for component {tag_name}")
else:
# Flow 2: Build a new manifest list and push it to quay.
output_digest_pullspec = self.create_multi_manifest_list(tag_name, arch_to_payload_entry, imagestream_namespace)
output_digest_pullspec = \
await self.create_multi_manifest_list(tag_name, arch_to_payload_entry, imagestream_namespace)

issues = list(issue # collect issues from each payload entry.
for payload_entry in entries
Expand All @@ -887,7 +898,7 @@ def build_multi_istag(self, tag_name: str, arch_to_payload_entry: Dict[str, Payl
issues=issues,
))

def create_multi_manifest_list(
async def create_multi_manifest_list(
self, tag_name: str,
arch_to_payload_entry: Dict[str, PayloadEntry],
imagestream_namespace: str) -> str:
Expand Down Expand Up @@ -924,12 +935,13 @@ def create_multi_manifest_list(
yaml.safe_dump(
dict(image=output_pullspec, manifests=manifests),
stream=ml, default_flow_style=False)
exectools.cmd_assert(f"manifest-tool push from-spec {str(component_manifest_path)}", retries=3)
await exectools.cmd_assert_async(f"manifest-tool push from-spec {str(component_manifest_path)}", retries=3)

# we are pushing a new manifest list, so return its sha256 based pullspec
return exchange_pullspec_tag_for_shasum(output_pullspec, find_manifest_list_sha(output_pullspec))
sha = await find_manifest_list_sha(output_pullspec)
return exchange_pullspec_tag_for_shasum(output_pullspec, sha)

def create_multi_release_image(
async def create_multi_release_image(
self, imagestream_name: str, multi_release_is: Dict, multi_release_dest: str,
multi_release_name: str, multi_specs: Dict[bool, Dict[str, Dict[str, PayloadEntry]]],
private_mode: bool) -> str:
Expand All @@ -946,24 +958,28 @@ def create_multi_release_image(
yaml.safe_dump(multi_release_is, mf)

arch_release_dests: Dict[str, str] = dict() # This will map arch names to a release payload pullspec we create for that arch (i.e. based on the arch's CVO image)
tasks = []
for arch, cvo_entry in multi_specs[private_mode]["cluster-version-operator"].items():
arch_release_dests[arch] = f"{multi_release_dest}-{arch}"
# Create the arch specific release payload containing tags pointing to manifest list
# component images.
exectools.cmd_assert([
"oc", "adm", "release", "new",
f"--name={multi_release_name}",
"--reference-mode=source",
"--keep-manifest-list",
f"--from-image-stream-file={str(multi_release_is_path)}",
f"--to-image-base={cvo_entry.dest_pullspec}",
f"--to-image={arch_release_dests[arch]}",
"--metadata", json.dumps({"release.openshift.io/architecture": "multi"})
])

return self.create_multi_release_manifest_list(arch_release_dests, imagestream_name, multi_release_dest)

def create_multi_release_manifest_list(
tasks.append(
exectools.cmd_assert_async([
"oc", "adm", "release", "new",
f"--name={multi_release_name}",
"--reference-mode=source",
"--keep-manifest-list",
f"--from-image-stream-file={str(multi_release_is_path)}",
f"--to-image-base={cvo_entry.dest_pullspec}",
f"--to-image={arch_release_dests[arch]}",
"--metadata", json.dumps({"release.openshift.io/architecture": "multi"})
])
)
await asyncio.gather(*tasks)

return await self.create_multi_release_manifest_list(arch_release_dests, imagestream_name, multi_release_dest)

async def create_multi_release_manifest_list(
self, arch_release_dests: Dict[str, str],
imagestream_name: str, multi_release_dest: str) -> str:
"""
Expand All @@ -988,10 +1004,10 @@ def create_multi_release_manifest_list(
yaml.safe_dump(ml_dict, stream=ml, default_flow_style=False)

# Construct the top level manifest list release payload
exectools.cmd_assert(f"manifest-tool push from-spec {str(release_payload_ml_path)}", retries=3)
await exectools.cmd_assert_async(f"manifest-tool push from-spec {str(release_payload_ml_path)}", retries=3)
# if we are actually pushing a manifest list, then we should derive a sha256 based pullspec
return exchange_pullspec_tag_for_shasum(
multi_release_dest, find_manifest_list_sha(multi_release_dest))
sha = await find_manifest_list_sha(multi_release_dest)
return exchange_pullspec_tag_for_shasum(multi_release_dest, sha)

def apply_multi_imagestream_update(self, final_multi_pullspec: str, imagestream_name: str, multi_release_istag: str):
"""
Expand Down Expand Up @@ -1360,7 +1376,7 @@ def get_group_payload_tag_mapping(assembly_inspector: AssemblyInspector, arch: s
return members

@staticmethod
def _check_nightly_consistency(assembly_inspector: AssemblyInspector, nightly: str, arch: str) -> List[AssemblyIssue]:
async def _check_nightly_consistency(assembly_inspector: AssemblyInspector, nightly: str, arch: str) -> List[AssemblyIssue]:
runtime = assembly_inspector.runtime

def terminal_issue(msg: str) -> List[AssemblyIssue]:
Expand All @@ -1380,7 +1396,7 @@ def terminal_issue(msg: str) -> List[AssemblyIssue]:
rc = -1
pullspec = f"registry.ci.openshift.org/ocp{rc_suffix}/release{rc_suffix}:{nightly}"
while retries > 0:
rc, release_json_str, err = exectools.cmd_gather(f"oc adm release info {pullspec} -o=json")
rc, release_json_str, err = await exectools.cmd_gather_async(f"oc adm release info {pullspec} -o=json")
if rc == 0:
break
runtime.logger.warn(f"Error accessing nightly release info for {pullspec}: {err}")
Expand Down Expand Up @@ -1427,7 +1443,7 @@ def terminal_issue(msg: str) -> List[AssemblyIssue]:
return issues

@staticmethod
def check_nightlies_consistency(assembly_inspector: AssemblyInspector) -> List[AssemblyIssue]:
async def check_nightlies_consistency(assembly_inspector: AssemblyInspector) -> List[AssemblyIssue]:
"""
If this assembly has reference-releases, check whether the current images selected by the
assembly are an exact match for the nightly contents.
Expand All @@ -1437,7 +1453,9 @@ def check_nightlies_consistency(assembly_inspector: AssemblyInspector) -> List[A
return []

issues: List[AssemblyIssue] = []
tasks = []
for arch, nightly in basis.reference_releases.primitive().items():
issues.extend(PayloadGenerator._check_nightly_consistency(assembly_inspector, nightly, arch))
tasks.append(PayloadGenerator._check_nightly_consistency(assembly_inspector, nightly, arch))
issues.extend(await asyncio.gather(*tasks))

return issues
4 changes: 2 additions & 2 deletions doozerlib/util.py
Expand Up @@ -781,9 +781,9 @@ def get_release_calc_previous(version, arch,
return sort_semver(list(upgrade_from))


def find_manifest_list_sha(pull_spec):
async def find_manifest_list_sha(pull_spec):
cmd = 'oc image info --filter-by-os=linux/amd64 -o json {}'.format(pull_spec)
out, err = exectools.cmd_assert(cmd, retries=3)
out, err = await exectools.cmd_assert_async(cmd, retries=3)
image_data = json.loads(out)
if 'listDigest' not in image_data:
raise ValueError('Specified image is not a manifest-list.')
Expand Down

0 comments on commit 5c79cc7

Please sign in to comment.