Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31541: Change pipetaskInit job to not read full QuantumGraph #44

Merged
merged 10 commits into from
Sep 9, 2021
2 changes: 2 additions & 0 deletions doc/changes/DM-31541.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Fix issue with accessing non-existing attributes when creating the final job.
* Fix issue preventing ``bps report`` from getting the run name correctly.
1 change: 1 addition & 0 deletions doc/changes/DM-31541.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Create list of node ids for the pipetask --init-only job.
1 change: 1 addition & 0 deletions doc/changes/DM-31541.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Complain about missing memory limit only if memory autoscaling is enabled.
22 changes: 22 additions & 0 deletions doc/lsst.ctrl.bps/execution_butler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
##################################################################################################################
# The following are default values to be moved to
# etc/bps_defaults.yaml when using execution butler becomes
# the default.
executionButler:
whenCreate: "SUBMIT"
#OPT executionButlerDir: "/my/exec/butler/dir" # User-provided or defaults to executionButlerTemplate
createCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --save-execution-butler {executionButlerDir} -g {qgraphFile} --skip-existing-in {outCollection}"
whenMerge: "ALWAYS"
implementation: JOB # Added for future flexibility, e.g., if prefer workflow instead of shell script.
concurrency_limit: db_limit

mergePreCmdOpts: "{defaultPreCmdOpts}"
command1: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} transfer-datasets {executionButlerDir} {butlerConfig} --collections {outCollection}"
command2: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} collection-chain {butlerConfig} {output} --flatten --mode=extend {inCollection}"
command3: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} collection-chain {butlerConfig} {output} --flatten --mode=prepend {outCollection}"

pipetask:
pipetaskInit:
# Notes: Declaring and chaining now happen within execution butler steps.
# This command no longer needs -o and must have # --extend-run.
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {initPreCmdOpts} run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --qgraph {qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --init-only --extend-run {extraInitOptions}"
30 changes: 3 additions & 27 deletions doc/lsst.ctrl.bps/pipelines_check_execution_butler.yaml
Original file line number Diff line number Diff line change
@@ -1,30 +1,6 @@
includeConfigs:
- ${CTRL_BPS_DIR}/doc/lsst.ctrl.bps/pipelines_check.yaml
- ${CTRL_BPS_DIR}/doc/lsst.ctrl.bps/execution_butler.yaml

##################################################################################################################
# The following are default values to be moved to
# etc/bps_defaults when using execution butler becomes
# the default.
executionButler:
whenCreate: "SUBMIT"
#USER executionButlerDir: "/my/exec/butler/dir" # if user provided, otherwise uses executionButlerTemplate
createCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -b {butlerConfig} --input {inCollection} --output-run {outCollection} --save-execution-butler {executionButlerDir} -g {qgraphFile}"
whenMerge: "ALWAYS"
implementation: JOB # JOB, WORKFLOW
concurrency_limit: db_limit

# What commands to run as part of the merge step:
command1: "${DAF_BUTLER_DIR}/bin/butler --log-level=VERBOSE transfer-datasets {executionButlerDir} {butlerConfig} --collections {outCollection}"
command2: "${DAF_BUTLER_DIR}/bin/butler collection-chain {butlerConfig} {output} {outCollection} --mode=prepend"

# For --replace-run behavior need to run two collection-chain commands
# in addition to the transfer-datasets:
#command2: "${DAF_BUTLER_DIR}/bin/butler collection-chain {butlerConfig} {output} --mode=pop 1"
#command3: "${DAF_BUTLER_DIR}/bin/butler collection-chain {butlerConfig} {output} --mode=prepend {outCollection}"

pipetask:
pipetaskInit:
# Notes: Declaring and chaining now happen within execution butler
# steps. So, this command no longer needs -o and must have
# --extend-run.
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --register-dataset-types --qgraph {qgraphFile} --extend-run {extraInitOptions}"
payload:
output: u/{operator}/pcheck_ebutler
17 changes: 17 additions & 0 deletions doc/lsst.ctrl.bps/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ Supported settings
**createQuantumGraph**
The command line specification for generating QuantumGraphs.

**executeMachinesPattern**, optional
A regular expression used for looking up available computational
resources. By default it is set to ``.*worker.*``.

**operator**
Name of the Operator who made a submission. Displayed in ``bps report``
output. Defaults to the Operator's username.
Expand Down Expand Up @@ -457,6 +461,19 @@ Supported settings

At the moment, this feature is only supported by the HTCondor plugin.

**memoryLimit**, optional
The memory threshold, in MB, to control the memory scaling.

Jobs whose memory requirements exceed this threshold will be removed from
the job queue even if maximal number of retires (defined by
``numberOfRetries``) has not been reached yet.

If not set, BPS will try to determine it automatically by querying
available computational resources (e.g. execute machines in an HTCondor
pool) which match the pattern defined by ``executeMachinesPattern``.

It has no effect if ``memoryMultiplier`` is not set.

**numberOfRetries**, optional
The maximum number of retries allowed for a job (must be non-negative).
The default value is ``None`` meaning that the job will be run only once.
Expand Down
16 changes: 13 additions & 3 deletions python/lsst/ctrl/bps/etc/bps_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
# Any extra site-specific settings needed for WMS
#USER computeSite: ncsapool

# Options that go between executable and command (e.g., between pipetask and run)
defaultPreCmdOpts: "--long-log --log-level=VERBOSE"
# for createQuantumGraph
qgraphPreCmdOpts: "{defaultPreCmdOpts}"
# for pipetaskInit
initPreCmdOpts: "{defaultPreCmdOpts}"
# for running Quanta
runPreCmdOpts: "{defaultPreCmdOpts}"


# Values defining input dataset as well as collection names of output
payload:
#USER payloadName: pipelines_check # Used in bps report, and default output collection
Expand All @@ -26,7 +36,7 @@ pipetask:
pipetaskInit:
# Notes: cannot have --extend-run, should have both -o and --output-run
# because this is where the chaining is currently done
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --init-only --register-dataset-types --qgraph {qgraphFile} {extraInitOptions}"
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {initPreCmdOpts} run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --qgraph {qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --init-only --register-dataset-types {extraInitOptions}"
#OPT myTask:
#OPT requestCpus:
#OPT requestMemory:
Expand All @@ -39,8 +49,8 @@ pipetask:
# Default commands and usage requests for creating QuantumGraph, running Quantum.
# Defaults to using full workflow QuantumGraph instead of per-job QuantumGraphs.
whenSaveJobQgraph: "NEVER"
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} {extraQgraphOptions}'
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} --output-run {outCollection} --qgraph {qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --skip-init-writes --extend-run --clobber-outputs --skip-existing {extraRunQuantumOptions}"
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask {qgraphPreCmdOpts} qgraph -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} -p {pipelineYaml} -q {qgraphFile} -d "{dataQuery}" {extraQgraphOptions}'
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {runPreCmdOpts} run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --qgraph {qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --skip-init-writes --extend-run {extraRunQuantumOptions}"
requestMemory: 2048
requestCpus: 1

Expand Down
25 changes: 20 additions & 5 deletions python/lsst/ctrl/bps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
_create_execution_butler
)


_LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -105,7 +106,7 @@ def update_job(config, job):
job.profile[key] = val


def add_workflow_init_nodes(config, generic_workflow):
def add_workflow_init_nodes(config, qgraph, generic_workflow):
"""Add nodes to workflow graph that perform initialization steps.

Assumes that all of the initialization should be executed prior to any
Expand All @@ -115,26 +116,30 @@ def add_workflow_init_nodes(config, generic_workflow):
----------
config : `lsst.ctrl.bps.BpsConfig`
BPS configuration.
qgraph : `lsst.pipe.base.graph.QuantumGraph`
The quantum graph the generic workflow represents.
generic_workflow : `lsst.ctrl.bps.GenericWorkflow`
Generic workflow to which the initialization steps should be added.
"""
# Create a workflow graph that will have task and file nodes necessary for
# initializing the pipeline execution
init_workflow = create_init_workflow(config, generic_workflow.get_file("runQgraphFile"))
init_workflow = create_init_workflow(config, qgraph, generic_workflow.get_file("runQgraphFile"))
_LOG.debug("init_workflow nodes = %s", init_workflow.nodes())
generic_workflow.add_workflow_source(init_workflow)
old_run_summary = generic_workflow.run_attrs.get("bps_run_summary", "")
init_summary = init_workflow.run_attrs.get("bps_run_summary", "")
generic_workflow.run_attrs["bps_run_summary"] = ';'.join(x for x in [init_summary, old_run_summary] if x)


def create_init_workflow(config, qgraph_gwfile):
def create_init_workflow(config, qgraph, qgraph_gwfile):
"""Create workflow for running initialization job(s).

Parameters
----------
config : `lsst.ctrl.bps.BpsConfig`
BPS configuration.
qgraph : `lsst.pipe.base.graph.QuantumGraph`
The quantum graph the generic workflow represents.
qgraph_gwfile : `lsst.ctrl.bps.GenericWorkflowFile`
File object for the full run QuantumGraph file.

Expand Down Expand Up @@ -166,6 +171,16 @@ def create_init_workflow(config, qgraph_gwfile):
# Handle aggregate values.
_handle_job_values_aggregate(job_values, gwjob)

# Pick a node id for each task (not quantum!) to avoid reading the entire
# quantum graph during the initialization stage.
node_ids = []
for task in qgraph.iterTaskGraph():
task_def = qgraph.findTaskDefByLabel(task.label)
node = next(iter(qgraph.getNodesForTask(task_def)))
node_ids.append(node.nodeId)
gwjob.cmdvals["qgraphId"] = qgraph.graphID
gwjob.cmdvals["qgraphNodeId"] = ",".join(sorted([f"{node_id.number}" for node_id in node_ids]))

# Save summary of Quanta in job.
gwjob.tags["quanta_summary"] = "pipetaskInit:1"

Expand Down Expand Up @@ -596,7 +611,7 @@ def create_generic_workflow(config, clustered_quanta_graph, name, prefix):

# Add initial workflow.
if config.get("runInit", "{default: False}"):
add_workflow_init_nodes(config, generic_workflow)
add_workflow_init_nodes(config, qgraph, generic_workflow)

generic_workflow.run_attrs.update({"bps_isjob": "True",
"bps_project": config["project"],
Expand Down Expand Up @@ -656,7 +671,7 @@ def add_final_job(config, generic_workflow, prefix):

job_values = _get_job_values(config, search_opt, None)
for field in dataclasses.fields(GenericWorkflowJob):
if not getattr(gwjob, field.name) and job_values[field.name]:
if not getattr(gwjob, field.name) and job_values.get(field.name, None):
setattr(gwjob, field.name, job_values[field.name])

update_job(config, gwjob)
Expand Down
59 changes: 33 additions & 26 deletions python/lsst/ctrl/bps/wms/htcondor/htcondor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,24 +304,26 @@ def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_cla
htc_workflow.dag.add_attribs({"bps_wms_service": service_class,
"bps_wms_workflow": f"{cls.__module__}.{cls.__name__}"})

# Determine pool specific settings for future reference.
search_opts = {"default": DEFAULT_HTC_EXEC_PATT}
_, site = config.search("computeSite")
if site:
search_opts["curvals"] = {"curr_site": site}
_, patt = config.search("executeMachinesPattern", opt=search_opts)

# Determine the hard limit for the memory requirement.
#
# Note:
# To reduce the number of data that need to be dealt with we are
# ignoring dynamic slots (if any) as, by definition, they cannot have
# more memory than the partitionable slot they are the part of.
constraint = f'SlotType != "Dynamic" && regexp("{patt}", Machine)'
pool_info = condor_status(constraint=constraint)
if not pool_info:
raise RuntimeError(f"No nodes in the HTCondor pool matches pattern '{patt}'")
config["bps_mem_limit"] = max(int(info["TotalSlotMemory"]) for info in pool_info.values())
found, limit = config.search('memoryLimit')
if not found:
search_opts = {"default": DEFAULT_HTC_EXEC_PATT}
_, site = config.search("computeSite")
if site:
search_opts["curvals"] = {"curr_site": site}
_, patt = config.search("executeMachinesPattern", opt=search_opts)

# To reduce the amount of data, ignore dynamic slots (if any) as,
# by definition, they cannot have more memory than
# the partitionable slot they are the part of.
constraint = f'SlotType != "Dynamic" && regexp("{patt}", Machine)'
pool_info = condor_status(constraint=constraint)
try:
limit = max(int(info["TotalSlotMemory"]) for info in pool_info.values())
except ValueError:
_LOG.debug("No execute machine in the pool matches %s", patt)
if limit:
config[".bps_defined.memory_limit"] = limit

# Create all DAG jobs
for job_name in generic_workflow:
Expand Down Expand Up @@ -480,7 +482,13 @@ def _translate_job_cmds(config, generic_workflow, gwjob):
jobcmds["request_memory"] = f"{gwjob.request_memory}"

if gwjob.memory_multiplier:
_, memory_limit = config.search("bps_mem_limit")
# Do not use try-except! At the moment, BpsConfig returns an empty
# string if it does not contain the key.
memory_limit = config[".bps_defined.memory_limit"]
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
if not memory_limit:
raise RuntimeError("Memory autoscaling enabled, but automatic detection of the memory limit "
"failed; setting it explicitly with 'memoryLimit' or changing worker node "
"search pattern 'executeMachinesPattern' might help.")
jobcmds["request_memory"] = _create_request_memory_expr(gwjob.request_memory, gwjob.memory_multiplier)

# Periodically release jobs which are being held due to exceeding
Expand Down Expand Up @@ -925,10 +933,12 @@ def _add_run_info(wms_path, job):
"""
path = Path(wms_path) / "jobs"
try:
jobdir = next(path.glob("*"), Path(wms_path))
subfile = next(path.glob("**/*.sub"))
except (StopIteration, PermissionError):
job["bps_run"] = "Unavailable"
else:
_LOG.debug("_add_run_info: subfile = %s", subfile)
try:
subfile = next(jobdir.glob("*.sub"))
_LOG.debug("_add_run_info: subfile = %s", subfile)
with open(subfile, "r") as fh:
for line in fh:
if line.startswith("+bps_"):
Expand All @@ -938,11 +948,8 @@ def _add_run_info(wms_path, job):
job[m.group(1)] = m.group(2).replace('"', "")
else:
_LOG.debug("Could not parse attribute: %s", line)
except StopIteration:
job["bps_run"] = "Missing"

except PermissionError:
job["bps_run"] = "PermissionError"
except PermissionError:
mxk62 marked this conversation as resolved.
Show resolved Hide resolved
job["bps_run"] = "PermissionError"
_LOG.debug("After adding job = %s", job)


Expand Down