Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gitlab ci: reorganize when we check for specs on mirrors #38626

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
171 changes: 85 additions & 86 deletions lib/spack/spack/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,49 +139,33 @@ def _add_dependency(spec_label, dep_label, deps):
deps[spec_label].add(dep_label)


def _get_spec_dependencies(
specs, deps, spec_labels, check_index_only=False, mirrors_to_check=None
):
spec_deps_obj = _compute_spec_deps(
specs, check_index_only=check_index_only, mirrors_to_check=mirrors_to_check
)
def _get_spec_dependencies(specs, deps, spec_labels):
spec_deps_obj = _compute_spec_deps(specs)

if spec_deps_obj:
dependencies = spec_deps_obj["dependencies"]
specs = spec_deps_obj["specs"]

for entry in specs:
spec_labels[entry["label"]] = {
"spec": entry["spec"],
"needs_rebuild": entry["needs_rebuild"],
}
spec_labels[entry["label"]] = entry["spec"]

for entry in dependencies:
_add_dependency(entry["spec"], entry["depends"], deps)


def stage_spec_jobs(specs, check_index_only=False, mirrors_to_check=None):
def stage_spec_jobs(specs):
"""Take a set of release specs and generate a list of "stages", where the
jobs in any stage are dependent only on jobs in previous stages. This
allows us to maximize build parallelism within the gitlab-ci framework.
Arguments:
specs (Iterable): Specs to build
check_index_only (bool): Regardless of whether DAG pruning is enabled,
all configured mirrors are searched to see if binaries for specs
are up to date on those mirrors. This flag limits that search to
the binary cache indices on those mirrors to speed the process up,
even though there is no garantee the index is up to date.
mirrors_to_checK: Optional mapping giving mirrors to check instead of
any configured mirrors.
Returns: A tuple of information objects describing the specs, dependencies
and stages:
spec_labels: A dictionary mapping the spec labels which are made of
(pkg-name/hash-prefix), to objects containing "spec" and "needs_rebuild"
keys. The root spec is the spec of which this spec is a dependency
and the spec is the formatted spec string for this spec.
spec_labels: A dictionary mapping the spec labels (which are formatted
as pkg-name/hash-prefix) to concrete specs.
deps: A dictionary where the keys should also have appeared as keys in
the spec_labels dictionary, and the values are the set of
Expand Down Expand Up @@ -210,13 +194,7 @@ def _remove_satisfied_deps(deps, satisfied_list):
deps = {}
spec_labels = {}

_get_spec_dependencies(
specs,
deps,
spec_labels,
check_index_only=check_index_only,
mirrors_to_check=mirrors_to_check,
)
_get_spec_dependencies(specs, deps, spec_labels)

# Save the original deps, as we need to return them at the end of the
# function. In the while loop below, the "dependencies" variable is
Expand All @@ -242,24 +220,36 @@ def _remove_satisfied_deps(deps, satisfied_list):
return spec_labels, deps, stages


def _print_staging_summary(spec_labels, dependencies, stages):
def _print_staging_summary(spec_labels, stages, mirrors_to_check, rebuild_decisions):
if not stages:
return

tty.msg(" Staging summary ([x] means a job needs rebuilding):")
mirrors = spack.mirror.MirrorCollection(mirrors=mirrors_to_check)
tty.msg("Checked the following mirrors for binaries:")
for m in mirrors.values():
tty.msg(" {0}".format(m.fetch_url))

tty.msg("Staging summary ([x] means a job needs rebuilding):")
for stage_index, stage in enumerate(stages):
tty.msg(" stage {0} ({1} jobs):".format(stage_index, len(stage)))
tty.msg(" stage {0} ({1} jobs):".format(stage_index, len(stage)))

for job in sorted(stage):
s = spec_labels[job]["spec"]
s = spec_labels[job]
rebuild = rebuild_decisions[job].rebuild
reason = rebuild_decisions[job].reason
reason_msg = " ({0})".format(reason) if reason else ""
tty.msg(
" [{1}] {0} -> {2}".format(
job, "x" if spec_labels[job]["needs_rebuild"] else " ", _get_spec_string(s)
" [{1}] {0} -> {2}{3}".format(
job, "x" if rebuild else " ", _get_spec_string(s), reason_msg
)
)
if rebuild_decisions[job].mirrors:
tty.msg(" found on the following mirrors:")
for murl in rebuild_decisions[job].mirrors:
tty.msg(" {0}".format(murl))


def _compute_spec_deps(spec_list, check_index_only=False, mirrors_to_check=None):
def _compute_spec_deps(spec_list):
"""
Computes all the dependencies for the spec(s) and generates a JSON
object which provides both a list of unique spec names as well as a
Expand Down Expand Up @@ -323,12 +313,8 @@ def append_dep(s, d):
tty.msg("Will not stage external pkg: {0}".format(s))
continue

up_to_date_mirrors = bindist.get_mirrors_for_spec(
spec=s, mirrors_to_check=mirrors_to_check, index_only=check_index_only
)

skey = _spec_deps_key(s)
spec_labels[skey] = {"spec": s, "needs_rebuild": not up_to_date_mirrors}
spec_labels[skey] = s

for d in s.dependencies(deptype=all):
dkey = _spec_deps_key(d)
Expand All @@ -338,14 +324,8 @@ def append_dep(s, d):

append_dep(skey, dkey)

for spec_label, spec_holder in spec_labels.items():
specs.append(
{
"label": spec_label,
"spec": spec_holder["spec"],
"needs_rebuild": spec_holder["needs_rebuild"],
}
)
for spec_label, concrete_spec in spec_labels.items():
specs.append({"label": spec_label, "spec": concrete_spec})

deps_json_obj = {"specs": specs, "dependencies": dependencies}

Expand All @@ -357,14 +337,14 @@ def _spec_matches(spec, match_string):


def _format_job_needs(
dep_jobs, osname, build_group, prune_dag, stage_spec_dict, enable_artifacts_buildcache
dep_jobs, osname, build_group, prune_dag, rebuild_decisions, enable_artifacts_buildcache
):
needs_list = []
for dep_job in dep_jobs:
dep_spec_key = _spec_deps_key(dep_job)
dep_spec_info = stage_spec_dict[dep_spec_key]
rebuild = rebuild_decisions[dep_spec_key].rebuild

if not prune_dag or dep_spec_info["needs_rebuild"]:
if not prune_dag or rebuild:
needs_list.append(
{
"job": get_job_name(dep_job, dep_job.architecture, build_group),
Expand Down Expand Up @@ -470,8 +450,7 @@ def get_spec_filter_list(env, affected_pkgs, dependent_traverse_depth=None):
def _build_jobs(spec_labels, stages):
for stage_jobs in stages:
for spec_label in stage_jobs:
spec_record = spec_labels[spec_label]
release_spec = spec_record["spec"]
release_spec = spec_labels[spec_label]
release_spec_dag_hash = release_spec.dag_hash()
yield release_spec, release_spec_dag_hash

Expand All @@ -492,6 +471,13 @@ def _unpack_script(script_section, op=_noop):
return script


class RebuildDecision(object):
def __init__(self):
self.rebuild = True
self.mirrors = []
self.reason = ""


class SpackCI:
"""Spack CI object used to generate intermediate representation
used by the CI generator(s).
Expand Down Expand Up @@ -944,23 +930,13 @@ def generate_gitlab_ci_yaml(
except bindist.FetchCacheError as e:
tty.warn(e)

try:
concrete_env_specs = [
spec_labels, dependencies, stages = stage_spec_jobs(
[
concrete
for abstract, concrete in env.concretized_specs()
if abstract in env.spec_lists["specs"]
]
spec_labels, dependencies, stages = stage_spec_jobs(
concrete_env_specs,
check_index_only=check_index_only,
mirrors_to_check=mirrors_to_check,
)
finally:
# Clean up remote mirror override if enabled
if remote_mirror_override:
spack.mirror.remove("ci_pr_mirror", cfg.default_modify_scope())
if spack_pipeline_type == "spack_pull_request":
spack.mirror.remove("ci_shared_pr_mirror", cfg.default_modify_scope())
)

all_job_names = []
output_object = {}
Expand All @@ -986,26 +962,37 @@ def generate_gitlab_ci_yaml(
spack_ci = SpackCI(ci_config, spec_labels, stages)
spack_ci_ir = spack_ci.generate_ir()

rebuild_decisions = {}

for stage_jobs in stages:
stage_name = "stage-{0}".format(stage_id)
stage_names.append(stage_name)
stage_id += 1

for spec_label in stage_jobs:
spec_record = spec_labels[spec_label]
release_spec = spec_record["spec"]
release_spec = spec_labels[spec_label]
release_spec_dag_hash = release_spec.dag_hash()

spec_record = RebuildDecision()
rebuild_decisions[spec_label] = spec_record

if prune_untouched_packages:
if release_spec not in affected_specs:
tty.debug(
"Pruning {0}/{1}, untouched by change.".format(
release_spec.name, release_spec.dag_hash()[:7]
)
)
spec_record["needs_rebuild"] = False
spec_record.rebuild = False
spec_record.reason = "Pruned, untouched by change."
continue

up_to_date_mirrors = bindist.get_mirrors_for_spec(
spec=release_spec, mirrors_to_check=mirrors_to_check, index_only=check_index_only
)

spec_record.rebuild = not up_to_date_mirrors
if up_to_date_mirrors:
spec_record.reason = "Pruned, found in mirrors"
spec_record.mirrors = [m["mirror_url"] for m in up_to_date_mirrors]
else:
spec_record.reason = "Scheduled, not found anywhere"

job_object = spack_ci_ir["jobs"][release_spec_dag_hash]["attributes"]

if not job_object:
Expand Down Expand Up @@ -1054,28 +1041,32 @@ def main_script_replacements(cmd):
# purposes, so we only get the direct dependencies.
dep_jobs = []
for dep_label in dependencies[spec_label]:
dep_jobs.append(spec_labels[dep_label]["spec"])
dep_jobs.append(spec_labels[dep_label])

job_object["needs"].extend(
_format_job_needs(
dep_jobs,
osname,
build_group,
prune_dag,
spec_labels,
rebuild_decisions,
enable_artifacts_buildcache,
)
)

rebuild_spec = spec_record["needs_rebuild"]
rebuild_spec = spec_record.rebuild

if prune_dag and not rebuild_spec and not copy_only_pipeline:
tty.debug(
"Pruning {0}/{1}, does not need rebuild.".format(
release_spec.name, release_spec.dag_hash()
)
)
continue
if not rebuild_spec and not copy_only_pipeline:
if prune_dag:
spec_record.reason = "Pruned, up-to-date"
continue
else:
# DAG pruning is disabled, force the spec to rebuild. The
# record still contains any mirrors on which the spec
# may have been found, so we can print them in the staging
# summary.
spec_record.rebuild = True
spec_record.reason = "Scheduled, DAG pruning disabled"

if broken_spec_urls is not None and release_spec_dag_hash in broken_spec_urls:
known_broken_specs_encountered.append(release_spec_dag_hash)
Expand Down Expand Up @@ -1115,6 +1106,8 @@ def main_script_replacements(cmd):
{"job": generate_job_name, "pipeline": "{0}".format(parent_pipeline_id)}
)

# Let downstream jobs know whether the spec needed rebuilding, regardless
# whether DAG pruning was enabled or not.
job_vars["SPACK_SPEC_NEEDS_REBUILD"] = str(rebuild_spec)

if cdash_handler:
Expand Down Expand Up @@ -1165,7 +1158,13 @@ def main_script_replacements(cmd):
job_id += 1

if print_summary:
_print_staging_summary(spec_labels, dependencies, stages)
_print_staging_summary(spec_labels, stages, mirrors_to_check, rebuild_decisions)

# Clean up remote mirror override if enabled
if remote_mirror_override:
spack.mirror.remove("ci_pr_mirror", cfg.default_modify_scope())
if spack_pipeline_type == "spack_pull_request":
spack.mirror.remove("ci_shared_pr_mirror", cfg.default_modify_scope())

tty.debug("{0} build jobs generated in {1} stages".format(job_id, stage_id))

Expand Down