Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-42127: Add htcondor support for bps report --return-exit-codes #29

Merged
merged 8 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black
rev: 23.12.0
rev: 24.1.1
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -24,7 +24,7 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.1.8
rev: v0.1.15
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-42127.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added plugin support for reporting error exit codes with ``bps report``.
76 changes: 61 additions & 15 deletions python/lsst/ctrl/bps/htcondor/htcondor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,15 @@
_LOG.debug("job_ids = %s", job_ids)
return job_ids

def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_global=False):
def report(
self,
wms_workflow_id=None,
user=None,
hist=0,
pass_thru=None,
is_global=False,
return_exit_codes=False,
):
"""Return run information based upon given constraints.

Parameters
Expand All @@ -382,6 +390,13 @@
If set, all job queues (and their histories) will be queried for
job information. Defaults to False which means that only the local
job queue will be queried.
return_exit_codes : `bool`, optional
If set, return exit codes related to jobs with a
non-success status. Defaults to False, which means that only
the summary state is returned.

Only applicable in the context of a WMS with associated
handlers to return exit codes from jobs.

Returns
-------
Expand Down Expand Up @@ -541,9 +556,9 @@
out_prefix,
)
if "post" not in final_htjob.dagcmds:
final_htjob.dagcmds[
"post"
] = f"{os.path.dirname(__file__)}/final_post.sh {final.name} $DAG_STATUS $RETURN"
final_htjob.dagcmds["post"] = (

Check warning on line 559 in python/lsst/ctrl/bps/htcondor/htcondor_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/htcondor/htcondor_service.py#L559

Added line #L559 was not covered by tests
f"{os.path.dirname(__file__)}/final_post.sh {final.name} $DAG_STATUS $RETURN"
)
htc_workflow.dag.add_final_job(final_htjob)
elif final and isinstance(final, GenericWorkflow):
raise NotImplementedError("HTCondor plugin does not support a workflow as the final job")
Expand Down Expand Up @@ -1192,6 +1207,7 @@
"""
_LOG.debug("_create_detailed_report: id = %s, job = %s", wms_workflow_id, jobs[wms_workflow_id])
dag_job = jobs[wms_workflow_id]
task_jobs = {job_id: job_ad for job_id, job_ad in jobs.items() if job_id != wms_workflow_id}
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
report = WmsRunReport(
wms_id=f"{dag_job['ClusterId']}.{dag_job['ProcId']}",
global_wms_id=dag_job.get("GlobalJobId", "MISS"),
Expand All @@ -1207,20 +1223,20 @@
jobs=[],
total_number_jobs=dag_job["total_jobs"],
job_state_counts=dag_job["state_counts"],
exit_code_summary=_get_exit_code_summary(task_jobs),
)

for job_id, job_info in jobs.items():
for job_id, job_info in task_jobs.items():
try:
if job_info["ClusterId"] != int(float(wms_workflow_id)):
job_report = WmsJobReport(
wms_id=job_id,
name=job_info.get("DAGNodeName", job_id),
label=job_info.get("bps_job_label", pegasus_name_to_label(job_info["DAGNodeName"])),
state=_htc_status_to_wms_state(job_info),
)
if job_report.label == "init":
job_report.label = "pipetaskInit"
report.jobs.append(job_report)
job_report = WmsJobReport(

Check warning on line 1231 in python/lsst/ctrl/bps/htcondor/htcondor_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/htcondor/htcondor_service.py#L1231

Added line #L1231 was not covered by tests
wms_id=job_id,
name=job_info.get("DAGNodeName", job_id),
label=job_info.get("bps_job_label", pegasus_name_to_label(job_info["DAGNodeName"])),
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
state=_htc_status_to_wms_state(job_info),
)
if job_report.label == "init":
job_report.label = "pipetaskInit"
report.jobs.append(job_report)

Check warning on line 1239 in python/lsst/ctrl/bps/htcondor/htcondor_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/htcondor/htcondor_service.py#L1238-L1239

Added lines #L1238 - L1239 were not covered by tests
except KeyError as ex:
_LOG.error("Job missing key '%s': %s", str(ex), job_info)
raise
Expand Down Expand Up @@ -1392,6 +1408,36 @@
return summary


def _get_exit_code_summary(jobs):
"""Get the exit code summary for a run.

Parameters
----------
jobs : `dict` [`str`, `dict` [`str`, Any]]
Mapping HTCondor job id to job information.

Returns
-------
summary : `dict` [`str`, `list` [`int`]]
Jobs' exit codes per job label.
"""
summary = {}

Check warning on line 1424 in python/lsst/ctrl/bps/htcondor/htcondor_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/htcondor/htcondor_service.py#L1424

Added line #L1424 was not covered by tests
for job_id, job_ad in jobs.items():
job_label = job_ad["bps_job_label"]
summary.setdefault(job_label, [])
try:

Check warning on line 1428 in python/lsst/ctrl/bps/htcondor/htcondor_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/htcondor/htcondor_service.py#L1426-L1428

Added lines #L1426 - L1428 were not covered by tests
if job_ad["JobStatus"] in {JobStatus.COMPLETED, JobStatus.HELD}:
try:
exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["ExitCode"]
except KeyError:
_LOG.debug("Cannot determine exit code for job '%s'", job_id)

Check warning on line 1433 in python/lsst/ctrl/bps/htcondor/htcondor_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/htcondor/htcondor_service.py#L1430-L1433

Added lines #L1430 - L1433 were not covered by tests
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
else:
summary[job_label].append(exit_code)
except KeyError:
_LOG.debug("Attribute 'JobStatus' not found in classad for job '%s'", job_id)
return summary

Check warning on line 1438 in python/lsst/ctrl/bps/htcondor/htcondor_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/htcondor/htcondor_service.py#L1435-L1438

Added lines #L1435 - L1438 were not covered by tests
mxk62 marked this conversation as resolved.
Show resolved Hide resolved


def _get_state_counts_from_jobs(wms_workflow_id, jobs):
"""Count number of jobs per WMS state.

Expand Down