Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-29614: Have bps report show info from multiple submit nodes #61

Merged
merged 23 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6c10243
Make bps report show info from multiple nodes
mxk62 Sep 13, 2021
b8d4ab3
Use FileNotFound when DAGMan files are missing
mxk62 Sep 16, 2021
a7d0c9d
Introduce --global option for bps report
mxk62 Oct 5, 2021
6db11a0
Persist DAGMan job global id
mxk62 Oct 6, 2021
5ec27ad
Make sure version comparison works correctly
mxk62 Oct 6, 2021
e4eebb5
Fix type descriptions in docstrings here and there
mxk62 Oct 6, 2021
c1b55f8
Try to keep naming convention consistent
mxk62 Oct 6, 2021
2380f2d
Fix arguments in cancel()
mxk62 Oct 19, 2021
8c73c43
Introduce --global option for bps cancel
mxk62 Oct 19, 2021
06b5a5b
Make bps cancel --global work with HTCondor plugin
mxk62 Oct 19, 2021
07493ba
Update the signature of the report() in PanDA
mxk62 Oct 19, 2021
f8f4f74
Include global job id in the detailed report
mxk62 Oct 19, 2021
8eb53a9
Change how report() displays error messages
mxk62 Oct 19, 2021
05f8556
Describe the new feature in the changelog
mxk62 Oct 20, 2021
40d2270
Remove argument name when initializing Schedds
mxk62 Oct 20, 2021
dc00a84
Fix issue with displaying status of deleted jobs
mxk62 Oct 20, 2021
476095f
Change error message displayed by cancel()
mxk62 Oct 21, 2021
6ddca14
Reorder printing messages in report()
mxk62 Oct 22, 2021
4e247d2
Refactor HTCondorService.report() a bit
mxk62 Oct 22, 2021
103f01b
Turn the code saving DAGMan job info to a function
mxk62 Oct 22, 2021
e32d449
Improve comments/docstrings using reviewer's input
mxk62 Oct 22, 2021
0a1d0e1
Make sure Pegasus plugin remain operational
mxk62 Oct 25, 2021
cfd1fee
Rewrite _report_from_id to make it easier to read
mxk62 Oct 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-29614.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce a new option, --global, to bps cancel and bps report which allows the user to interact (cancel or get the report on) with jobs in any job queue of a workflow management system using distributed job queues, e.g., HTCondor.
22 changes: 15 additions & 7 deletions python/lsst/ctrl/bps/cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
_LOG = logging.getLogger(__name__)


def cancel(wms_service, wms_id=None, user=None, require_bps=True, pass_thru=None):
def cancel(wms_service, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):
"""Cancel submitted workflows.

Parameters
Expand All @@ -45,23 +45,31 @@ def cancel(wms_service, wms_id=None, user=None, require_bps=True, pass_thru=None
Whether to require given run_id/user to be a bps submitted job.
pass_thru : `str`, optional
Information to pass through to WMS.
is_global : `bool`, optional
If set, all available job queues will be checked for jobs to cancel.
Defaults to False which means that only a local job queue will be
checked.

Only applicable in the context of a WMS using distributed job queues
(e.g., HTCondor).
"""
_LOG.debug("Cancel params: wms_id=%s, user=%s, require_bps=%s, pass_thru=%s",
wms_id, user, require_bps, pass_thru)
_LOG.debug("Cancel params: wms_id=%s, user=%s, require_bps=%s, pass_thru=%s, is_global=%s",
wms_id, user, require_bps, pass_thru, is_global)

if isinstance(wms_service, str):
wms_service_class = doImport(wms_service)
service = wms_service_class({})
else:
service = wms_service

jobs = service.list_submitted_jobs(wms_id, user, require_bps, pass_thru)
jobs = service.list_submitted_jobs(wms_id, user, require_bps, pass_thru, is_global)
if len(jobs) == 0:
print("0 jobs found matching arguments.")
print("No job matches the search criteria. "
"Hints: Double check id, and/or use --global to search all job queues.")
else:
for job_id in sorted(jobs):
results = service.cancel(job_id, pass_thru)
if results[0]:
print(f"Successfully canceled: {job_id}")
print(f"Successfully canceled job with id '{job_id}'")
else:
print(f"Couldn't cancel job with id = {job_id} ({results[1]})")
print(f"Couldn't cancel job with id '{job_id}' ({results[1]})")
6 changes: 5 additions & 1 deletion python/lsst/ctrl/bps/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def submit(*args, **kwargs):
default=0.0,
help="Search WMS history X days for completed info.")
@click.option("--pass-thru",
help="Pass the given string to the WMS service class")
help="Pass the given string to the WMS service class.")
@click.option("--global/--no-global", "is_global", default=False,
help="Query all available job queues for job information.")
def report(*args, **kwargs):
"""Display execution status for submitted workflows.
"""
Expand All @@ -117,6 +119,8 @@ def report(*args, **kwargs):
help="Only cancel jobs submitted via bps.")
@click.option("--pass-thru", "pass_thru", default=str(),
help="Pass the given string to the WMS service.")
@click.option("--global/--no-global", "is_global", default=False,
help="Cancel jobs matching the search criteria from all job queues.")
def cancel(*args, **kwargs):
"""Cancel submitted workflow(s).
"""
Expand Down
22 changes: 18 additions & 4 deletions python/lsst/ctrl/bps/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def submit_driver(config_file, **kwargs):
print(f"Run Id: {wms_workflow.run_id}")


def report_driver(wms_service, run_id, user, hist_days, pass_thru):
def report_driver(wms_service, run_id, user, hist_days, pass_thru, is_global=False):
"""Print out summary of jobs submitted for execution.

Parameters
Expand All @@ -276,11 +276,18 @@ def report_driver(wms_service, run_id, user, hist_days, pass_thru):
Number of days
pass_thru : `str`
A string to pass directly to the WMS service class.
is_global : `bool`, optional
If set, all available job queues will be queried for job information.
Defaults to False which means that only a local job queue will be
queried for information.

Only applicable in the context of a WMS using distributed job queues
(e.g., HTCondor).
"""
report(wms_service, run_id, user, hist_days, pass_thru)
report(wms_service, run_id, user, hist_days, pass_thru, is_global=is_global)


def cancel_driver(wms_service, run_id, user, require_bps, pass_thru):
def cancel_driver(wms_service, run_id, user, require_bps, pass_thru, is_global=False):
"""Cancel submitted workflows.

Parameters
Expand All @@ -295,5 +302,12 @@ def cancel_driver(wms_service, run_id, user, require_bps, pass_thru):
Whether to require given run_id/user to be a bps submitted job.
pass_thru : `str`
Information to pass through to WMS.
is_global : `bool`, optional
If set, all available job queues will be checked for jobs to cancel.
Defaults to False which means that only a local job queue will be
checked.

Only applicable in the context of a WMS using distributed job queues
(e.g., HTCondor).
"""
cancel(wms_service, run_id, user, require_bps, pass_thru)
cancel(wms_service, run_id, user, require_bps, pass_thru, is_global=is_global)
49 changes: 35 additions & 14 deletions python/lsst/ctrl/bps/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
_LOG = logging.getLogger(__name__)


def report(wms_service, run_id, user, hist_days, pass_thru):
def report(wms_service, run_id, user, hist_days, pass_thru, is_global=False):
"""Print out summary of jobs submitted for execution.

Parameters
Expand All @@ -52,6 +52,13 @@ def report(wms_service, run_id, user, hist_days, pass_thru):
Number of days
pass_thru : `str`
A string to pass directly to the WMS service class.
is_global : `bool`, optional
If set, all available job queues will be queried for job information.
Defaults to False which means that only a local job queue will be
queried for information.

Only applicable in the context of a WMS using distributed job queues
(e.g., HTCondor).
"""
wms_service_class = doImport(wms_service)
wms_service = wms_service_class({})
Expand All @@ -61,22 +68,21 @@ def report(wms_service, run_id, user, hist_days, pass_thru):
if run_id:
hist_days = max(hist_days, 2)

runs, message = wms_service.report(run_id, user, hist_days, pass_thru)
runs, message = wms_service.report(run_id, user, hist_days, pass_thru, is_global=is_global)

if run_id:
if not runs:
print(f"No information found for id='{run_id}'.")
print(f"Double check id and retry with a larger --hist value"
f"(currently: {hist_days})")
for run in runs:
print_single_run_summary(run)
print_single_run_summary(run, is_global=is_global)
if not runs and not message:
print(f"No records found for job id '{run_id}'. "
f"Hints: Double check id, retry with a larger --hist value (currently: {hist_days}), "
f"and/or use --global to search all job queues.")
else:
summary = init_summary()
for run in sorted(runs, key=lambda j: j.wms_id):
summary = add_single_run_summary(summary, run)
for run in sorted(runs, key=lambda j: j.wms_id if not is_global else j.global_wms_id):
summary = add_single_run_summary(summary, run, is_global=is_global)
for line in summary.pformat_all():
print(line)
print("\n\n")
if message:
print(message)
print("\n\n")
Expand Down Expand Up @@ -104,7 +110,7 @@ def init_summary():
return Table(dtype=columns)


def add_single_run_summary(summary, run_report):
def add_single_run_summary(summary, run_report, is_global=False):
"""Add a single run info to the summary.

Parameters
Expand All @@ -113,6 +119,13 @@ def add_single_run_summary(summary, run_report):
The table representing the run summary.
run_report : `lsst.ctrl.bps.WmsRunReport`
Information for single run.
is_global : `bool`, optional
If set, all available job queues will be queried for job information.
Defaults to False which means that only a local job queue will be
queried for information.

Only applicable in the context of a WMS using distributed job queues
(e.g., HTCondor).
"""
# Flag any running workflow that might need human attention
run_flag = " "
Expand All @@ -136,7 +149,7 @@ def add_single_run_summary(summary, run_report):
run_flag,
run_report.state.name,
percent_succeeded,
run_report.wms_id,
run_report.global_wms_id if is_global else run_report.wms_id,
run_report.operator,
run_report.project,
run_report.campaign,
Expand Down Expand Up @@ -187,23 +200,31 @@ def group_jobs_by_label(jobs):
return by_label


def print_single_run_summary(run_report):
def print_single_run_summary(run_report, is_global=False):
"""Print runtime info for single run including job summary per task abbrev.

Parameters
----------
run_report : `lsst.ctrl.bps.WmsRunReport`
Summary runtime info for a run + runtime info for jobs.
is_global : `bool`, optional
If set, all available job queues will be queried for job information.
Defaults to False which means that only a local job queue will be
queried for information.

Only applicable in the context of a WMS using distributed job queues
(e.g., HTCondor).
"""
# Print normal run summary.
summary = init_summary()
summary = add_single_run_summary(summary, run_report)
summary = add_single_run_summary(summary, run_report, is_global=is_global)
for line in summary.pformat_all():
print(line)
print("\n\n")

# Print more run information.
print(f"Path: {run_report.path}")
print(f"Global job id: {run_report.global_wms_id}")
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
print("\n\n")

by_label = group_jobs_by_label(run_report.jobs)
Expand Down