Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31887: Execution Butler bug fixes plus making it the default behavior. #51

Merged
merged 5 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions config/no_execution_butler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# THIS FILE MAY BE DEPRECATED IN FUTURE VERSIONS
#
# This turns off the default bps use of execution butler
# Execution butler is used to limit database connections.
# Turning off execution butler should only be done after
# reporting problems with it turned on.

# Turn off Execution Butler
executionButler:
whenCreate: "NEVER"
whenMerge: "NEVER"

# The pipetaskInit command is different when not using execution butler
pipetask:
pipetaskInit:
# Notes: cannot have --extend-run, should have both -o and --output-run
# because this is where the chaining is currently done
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {initPreCmdOpts} run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --qgraph {qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --init-only --register-dataset-types {extraInitOptions}"
3 changes: 3 additions & 0 deletions doc/changes/DM-31887.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* Fix variable substitution in merge job commands.
* Fix bug where final job doesn't appear in report.
* Fix bug in HTCondor plugin for reporting final job status when --id <path>.
1 change: 1 addition & 0 deletions doc/changes/DM-31887.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make using an execution butler the default.
22 changes: 0 additions & 22 deletions doc/lsst.ctrl.bps/execution_butler.yaml

This file was deleted.

6 changes: 0 additions & 6 deletions doc/lsst.ctrl.bps/pipelines_check_execution_butler.yaml

This file was deleted.

6 changes: 6 additions & 0 deletions python/lsst/ctrl/bps/bps_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ def search(self, key, opt=None):

_LOG.debug("curvals = %s", curvals)

# There's a problem with the searchobj being a BpsConfig
# and its handling of __getitem__. Until that part of
# BpsConfig is rewritten, force the searchobj to a Config.
if "searchobj" in opt:
opt["searchobj"] = Config(opt["searchobj"])

if key in curvals:
_LOG.debug("found %s in curvals", key)
found = True
Expand Down
21 changes: 15 additions & 6 deletions python/lsst/ctrl/bps/etc/bps_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ payload:

pipetask:
pipetaskInit:
# Notes: cannot have --extend-run, should have both -o and --output-run
# because this is where the chaining is currently done
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {initPreCmdOpts} run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --qgraph {qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --init-only --register-dataset-types {extraInitOptions}"
# Notes: Declaring and chaining now happen within execution butler steps.
# This command no longer needs -o and must have # --extend-run.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A "leftover" # after joining the lines (between have and --extend-run)?

runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {initPreCmdOpts} run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --qgraph {qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --init-only --extend-run {extraInitOptions}"
#OPT myTask:
#OPT requestCpus:
#OPT requestMemory:
Expand Down Expand Up @@ -74,7 +74,16 @@ saveClusteredQgraph: False
useLazyCommands: True
bpsUseShared: False

# Execution Butler not yet the default
executionButler:
whenCreate: "NEVER"
whenMerge: "NEVER"
whenCreate: "SUBMIT"
#OPT executionButlerDir: "/my/exec/butler/dir" # User-provided or defaults to executionButlerTemplate
createCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --save-execution-butler {executionButlerDir} -g {qgraphFile}"
whenMerge: "ALWAYS"
implementation: JOB # Added for future flexibility, e.g., if prefer workflow instead of shell script.
concurrencyLimit: db_limit

mergePreCmdOpts: "{defaultPreCmdOpts}"
command1: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} transfer-datasets {executionButlerDir} {butlerConfig} --collections {outCollection}"
command2: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} collection-chain {butlerConfig} {output} --flatten --mode=extend {inCollection}"
command3: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} collection-chain {butlerConfig} {output} --flatten --mode=prepend {outCollection}"

10 changes: 10 additions & 0 deletions python/lsst/ctrl/bps/generic_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,16 @@ def job_counts(self):
for job_name in self:
gwjob = self.get_job(job_name)
jcounts[gwjob.label] += 1

# Final is separate
final = self.get_final()
if isinstance(final, GenericWorkflow):
for job_name in final:
gwjob = final.get_job(job_name)
jcounts[gwjob.label] += 1
else:
jcounts[final.label] += 1

return jcounts

def __iter__(self):
Expand Down
3 changes: 2 additions & 1 deletion python/lsst/ctrl/bps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ def _create_final_command(config, prefix):
arguments : `str`
Command line needed to call the final script.
"""
search_opt = {'replaceVars': False, 'replaceEnvVars': False, 'expandEnvVars': False}
search_opt = {'replaceVars': False, 'replaceEnvVars': False, 'expandEnvVars': False,
'searchobj': config['executionButler']}

script_file = os.path.join(prefix, "final_job.bash")
with open(script_file, "w") as fh:
Expand Down
8 changes: 6 additions & 2 deletions python/lsst/ctrl/bps/wms/htcondor/htcondor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ def _get_run_summary(job):
Number of jobs per PipelineTask label in approximate pipeline order.
Format: <label>:<count>[;<label>:<count>]+
"""
summary = job.get("bps_run_summary", None)
summary = job.get("bps_job_summary", None)
if not summary:
summary, _ = summary_from_dag(job["Iwd"])
if not summary:
Expand Down Expand Up @@ -1186,7 +1186,11 @@ def _htc_node_status_to_wms_state(job):
elif status == NodeStatus.DONE:
wms_state = WmsStates.SUCCEEDED
elif status == NodeStatus.ERROR:
wms_state = WmsStates.FAILED
# Use job exist instead of post script exit
if "DAGMAN error 0" in job["StatusDetails"]:
wms_state = WmsStates.SUCCEEDED
else:
wms_state = WmsStates.FAILED

return wms_state

Expand Down
10 changes: 10 additions & 0 deletions python/lsst/ctrl/bps/wms/htcondor/lssthtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ def add_final_job(self, job):
job : `HTCJob`
HTCJob to add to the HTCDag as a FINAL job.
"""
# Add dag level attributes to each job
job.add_job_attrs(self.graph["attr"])

self.graph['final_job'] = job

def del_job(self, job_name):
Expand Down Expand Up @@ -859,6 +862,13 @@ def summary_from_dag(dir_name):
counts[label] += 1
else:
_LOG.warning("Parse DAG: unmatched job line: %s", line)
elif line.startswith("FINAL"):
m = re.match(r"FINAL ([^\s]+) jobs/([^/]+)/", line)
if m:
label = m.group(2)
job_name_to_pipetask[m.group(1)] = label
counts[label] += 1

except (OSError, PermissionError, StopIteration):
pass

Expand Down