Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-27575: fix requestMemory and matching glidein nodes. #13

Merged
merged 2 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions doc/lsst.ctrl.bps/pipelines_check.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
pipelineYaml: "${OBS_SUBARU_DIR}/pipelines/DRP.yaml:processCcd"
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}"
project: dev
campaign: quick
submitPath: ${PWD}/submit/{outCollection}
computeSite: ncsapool
requestMemory: 2048
requestCpus: 1

# Make sure these values correspond to ones in the bin/run_demo.sh's
# pipetask command line.
payload:
runInit: true
payloadName: pcheck
butlerConfig: ${PIPELINES_CHECK_DIR}/DATA_REPO/butler.yaml
inCollection: HSC/calib,HSC/raw/all,refcats
outCollection: "shared/pipecheck/{timestamp}"
dataQuery: exposure=903342 AND detector=10

pipetask:
pipetaskInit:
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --skip-existing --register-dataset-types --qgraph {qgraphFile} --clobber-partial-outputs --no-versions"
assembleCoadd:
requestMemory: 8192

wmsServiceClass: lsst.ctrl.bps.wms.htcondor.htcondor_service.HTCondorService
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot'
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {qgraphFile} --clobber-partial-outputs --no-versions"
83 changes: 49 additions & 34 deletions doc/lsst.ctrl.bps/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,38 +122,11 @@ The remaining information tells BPS which workflow management system is being
used, how to convert Datasets and Pipetasks into compute jobs and what
resources those compute jobs need.

.. code-block:: YAML

operator: jdoe
pipelineYaml: "${OBS_SUBARU_DIR}/pipelines/DRP.yaml:processCcd"
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}"
project: dev
campaign: quick
submitPath: ${PWD}/submit/{outCollection}
computeSite: ncsapool
requestMemory: 2GB
requestCpus: 1

# Make sure these values correspond to ones in the bin/run_demo.sh's
# pipetask command line.
payload:
runInit: true
payloadName: pcheck
butlerConfig: ${PIPELINES_CHECK_DIR}/DATA_REPO/butler.yaml
inCollection: HSC/calib,HSC/raw/all,refcats
outCollection: "shared/pipecheck/{timestamp}"
dataQuery: exposure=903342 AND detector=10

pipetask:
pipetaskInit:
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --skip-existing --register-dataset-types --qgraph {qgraphFile} --no-versions"
assembleCoadd:
requestMemory: 8GB

wmsServiceClass: lsst.ctrl.bps.wms.htcondor.htcondor_service.HTCondorService
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot'
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {qgraphFile} --no-versions"
.. literalinclude:: pipelines_check.yaml
:language: YAML
:caption: ${CTRL_BPS_DIR}/doc/lsst.ctrl.bps/pipelines_check.yaml


.. _bps-submit:

Expand Down Expand Up @@ -279,6 +252,48 @@ order from most specific to general is: ``payload``, ``pipetask``, and ``site``.
subsections are pipetask labels where can override/set runtime settings for
particular pipetasks (currently no Quantum-specific settings).

**site**
settings for specific sites can be set here. Subsections are site names
which are matched to ``computeSite``. The following are examples for
specifying values needed to match jobs to glideins.

.. code-block:: YAML
:caption: HTCondor plugin example

site:
acsws02:
profile:
condor:
requirements: "(GLIDEIN_NAME == "test_gname")"
+GLIDEIN_NAME: "test_gname"

.. code-block:: YAML
:caption: Pegasus plugin example

site:
acsws02:
arch: x86_64
os: LINUX
directory:
shared-scratch:
path: /work/shared-scratch/${USER}
file-server:
operation: all
url: file:///work/shared-scratch/${USER}
profile:
pegasus:
style: condor
auxillary.local: true
condor:
universe: vanilla
getenv: true
requirements: '(ALLOCATED_NODE_SET == "${NODESET}")'
+JOB_NODE_SET: '"${NODESET}"'
dagman:
retry: 0
env:
PEGASUS_HOME: /usr/local/pegasus/current

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's a requirement, but by convention titles and section headings are set off from surrounding text by
a single blank line above and below.

Supported settings
^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -318,8 +333,8 @@ Supported settings
output.

**requestMemory**, optional
Amount of memory single Quantum execution of a particular pipetask will
need (e.g., 2GB).
Amount of memory, in MB, a single Quantum execution of a particular pipetask
will need (e.g., 2048).

**requestCpus**, optional
Number of cpus that a single Quantum execution of a particular pipetask
Expand Down Expand Up @@ -353,7 +368,7 @@ Supported settings
pipetask:
pipetask_init:
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --skip-existing --register-dataset-types --qgraph {qgraph_file} --no-versions"
requestMemory: 2GB
requestMemory: 2048

**templateDataId**
Template to use when creating job names (and HTCondor plugin then uses for
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/pre_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def pre_transform(config, out_prefix=None):
Parameters
----------
config : `.bps_config.BpsConfig`
Configuration values for BPS. In particular, looking for qgraph_file.
Configuration values for BPS. In particular, looking for qgraphFile.
out_prefix : `str` or None
Output path for the QuantumGraph and stdout/stderr from generating
the QuantumGraph.
Expand All @@ -59,7 +59,7 @@ def pre_transform(config, out_prefix=None):
result of running code that generates it.
"""
# Check to see if user provided pre-generated QuantumGraph.
found, input_qgraph_filename = config.search("qgraph_file")
found, input_qgraph_filename = config.search("qgraphFile")
if found:
if out_prefix is not None:
# Save a copy of the QuantumGraph file in out_prefix.
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def prepare(config, generic_workflow, out_prefix):

# Save QuantumGraphs.
# (putting after call to prepare so don't write a bunch of files if prepare fails)
found, when_to_save_job_qgraph = config.search('when_save_job_qgraph',
{'default': WhenToSaveQuantumGraphs.TRANSFORM})
found, when_to_save_job_qgraph = config.search("whenSaveJobQgraph",
{"default": WhenToSaveQuantumGraphs.TRANSFORM})
if found and when_to_save_job_qgraph == WhenToSaveQuantumGraphs.PREPARE:
for job_name in generic_workflow.nodes():
job = generic_workflow.get_job(job_name)
Expand Down
28 changes: 17 additions & 11 deletions python/lsst/ctrl/bps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def transform(config, clustered_quantum_graph, prefix):
if 'name' in clustered_quantum_graph.graph and clustered_quantum_graph.graph['name'] is not None:
name = clustered_quantum_graph.graph['name']
else:
_, name = config.search('uniqProcName', opt={'required': True})
_, name = config.search("uniqProcName", opt={"required": True})

generic_workflow = create_generic_workflow(config, clustered_quantum_graph, name, prefix)
generic_workflow_config = create_generic_workflow_config(config, prefix)

# Save QuantumGraphs.
found, when_to_save_job_qgraph = config.search('when_save_job_qgraph',
{'default': WhenToSaveQuantumGraphs.TRANSFORM})
found, when_to_save_job_qgraph = config.search("whenSaveJobQgraph",
{"default": WhenToSaveQuantumGraphs.TRANSFORM})
if found and when_to_save_job_qgraph == WhenToSaveQuantumGraphs.TRANSFORM:
for job_name in generic_workflow.nodes():
job = generic_workflow.get_job(job_name)
Expand Down Expand Up @@ -184,6 +184,12 @@ def create_init_workflow(config):
job.cmdline = create_command(config, "pipetaskInit", config[".bps_defined.run_qgraph_file"])
job.label = "init"
job.compute_site = config["computeSite"]
search_opt = {"curvals": {"curr_pipetask": "pipetaskInit"}, "required": False, "default": 0}
job.request_cpus = int(config.search("requestCpus", opt=search_opt)[1])
job.request_memory = int(config.search("requestMemory", opt=search_opt)[1])
job.request_disk = int(config.search("requestDisk", opt=search_opt)[1])
job.request_walltime = int(config.search("requestWalltime", opt=search_opt)[1])
update_job(config, job)
init_workflow.add_job(job)

_LOG.debug("creating init task input(s)")
Expand Down Expand Up @@ -315,14 +321,14 @@ def create_job_values_aggregate(config, generic_workflow):
label_counts[qnode.taskDef.label] += 1

search_opt = {"curvals": {"curr_pipetask": qnode.taskDef.label}, "required": False, "default": 0}
_, request_cpus = config.search("request_cpus", opt=search_opt)
job.request_cpus = max(job.request_cpus, request_cpus)
_, request_memory = config.search("request_memory", opt=search_opt)
job.request_memory = max(job.request_memory, request_memory)
_, request_disk = config.search("request_walltime", opt=search_opt)
job.request_disk += request_disk
_, request_walltime = config.search("request_walltime", opt=search_opt)
job.request_walltime += request_walltime
_, request_cpus = config.search("requestCpus", opt=search_opt)
job.request_cpus = max(job.request_cpus, int(request_cpus))
_, request_memory = config.search("requestMemory", opt=search_opt)
job.request_memory = max(job.request_memory, int(request_memory))
_, request_disk = config.search("requestDisk", opt=search_opt)
job.request_disk += int(request_disk)
_, request_walltime = config.search("requestWalltime", opt=search_opt)
job.request_walltime += int(request_walltime)

job.quanta_summary = ';'.join([f"{k}:{v}" for k, v in label_counts.items()])

Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/wms/htcondor/htcondor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from lsst.ctrl.bps.wms_service import BaseWmsWorkflow, BaseWmsService
from lsst.ctrl.bps.generic_workflow import GenericWorkflow
from .lssthtc import (htc_submit_dag, read_node_status, read_dag_node_log, condor_q, condor_history,
HTCDag, HTCJob, htc_jobs_to_wms_report)
HTCDag, HTCJob, htc_jobs_to_wms_report, htc_escape)

_LOG = logging.getLogger()

Expand Down Expand Up @@ -277,7 +277,7 @@ def translate_job_cmds(generic_workflow_job):
# Add extra "pass-thru" job commands
if generic_workflow_job.profile:
for key, val in generic_workflow_job.profile.items():
jobcmds[key] = val
jobcmds[key] = htc_escape(val)

return jobcmds

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/wms/htcondor/lssthtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def htc_escape(value):
new_value : `str`
Given string with characters escaped.
"""
return value.replace("\\", "\\\\").replace('"', '\\"').replace("'", "''")
return value.replace("\\", "\\\\").replace('"', '\\"').replace("'", "''").replace(""", '"')


def htc_write_attribs(stream, attrs):
Expand Down
11 changes: 10 additions & 1 deletion python/lsst/ctrl/bps/wms/pegasus/pegasus_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ def create_job(self, generic_workflow, gwf_job, peg_files):
else:
_LOG.warning("Job %s does not have any arguments", gwf_job.name)

if gwf_job.request_memory: # MB
job.addProfile(Profile(Namespace.CONDOR, "request_memory", gwf_job.request_memory))
if gwf_job.request_cpus: # cores
job.addProfile(Profile(Namespace.CONDOR, "request_cpus", gwf_job.request_cpus))
if gwf_job.request_disk: # MB
job.addProfile(Profile(Namespace.CONDOR, "request_disk", gwf_job.request_disk))
if gwf_job.priority: # MB
job.addProfile(Profile(Namespace.CONDOR, "priority", gwf_job.priority))

# Add extra job attributes
for key, value in gwf_job.profile.items():
job.addProfile(Profile(Namespace.CONDOR, key, value))
Expand Down Expand Up @@ -422,7 +431,7 @@ def _write_properties_file(self, out_prefix, filenames):
print(f"pegasus.catalog.replica.file={filenames['replica']}", file=outfh)

print("# This tells Pegasus where to find the Transformation Catalog.", file=outfh)
print("pegasus.catalog.transformation=Text")
print("pegasus.catalog.transformation=Text", file=outfh)
print(f"pegasus.catalog.transformation.file={filenames['transformation']}", file=outfh)

print("# Run Pegasus in shared file system mode.", file=outfh)
Expand Down