Skip to content

Commit

Permalink
Merge branch 'tickets/DM-27575' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleGower committed Nov 18, 2020
2 parents bc1d661 + 7849343 commit fc5c4ad
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 53 deletions.
29 changes: 29 additions & 0 deletions doc/lsst.ctrl.bps/pipelines_check.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
pipelineYaml: "${OBS_SUBARU_DIR}/pipelines/DRP.yaml:processCcd"
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}"
project: dev
campaign: quick
submitPath: ${PWD}/submit/{outCollection}
computeSite: ncsapool
requestMemory: 2048
requestCpus: 1

# Make sure these values correspond to ones in the bin/run_demo.sh's
# pipetask command line.
payload:
runInit: true
payloadName: pcheck
butlerConfig: ${PIPELINES_CHECK_DIR}/DATA_REPO/butler.yaml
inCollection: HSC/calib,HSC/raw/all,refcats
outCollection: "shared/pipecheck/{timestamp}"
dataQuery: exposure=903342 AND detector=10

pipetask:
pipetaskInit:
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --skip-existing --register-dataset-types --qgraph {qgraphFile} --clobber-partial-outputs --no-versions"
assembleCoadd:
requestMemory: 8192

wmsServiceClass: lsst.ctrl.bps.wms.htcondor.htcondor_service.HTCondorService
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot'
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {qgraphFile} --clobber-partial-outputs --no-versions"
83 changes: 49 additions & 34 deletions doc/lsst.ctrl.bps/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,38 +122,11 @@ The remaining information tells BPS which workflow management system is being
used, how to convert Datasets and Pipetasks into compute jobs and what
resources those compute jobs need.

.. code-block:: YAML

operator: jdoe
pipelineYaml: "${OBS_SUBARU_DIR}/pipelines/DRP.yaml:processCcd"
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}"
project: dev
campaign: quick
submitPath: ${PWD}/submit/{outCollection}
computeSite: ncsapool
requestMemory: 2GB
requestCpus: 1
# Make sure these values correspond to ones in the bin/run_demo.sh's
# pipetask command line.
payload:
runInit: true
payloadName: pcheck
butlerConfig: ${PIPELINES_CHECK_DIR}/DATA_REPO/butler.yaml
inCollection: HSC/calib,HSC/raw/all,refcats
outCollection: "shared/pipecheck/{timestamp}"
dataQuery: exposure=903342 AND detector=10
pipetask:
pipetaskInit:
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --skip-existing --register-dataset-types --qgraph {qgraphFile} --no-versions"
assembleCoadd:
requestMemory: 8GB
wmsServiceClass: lsst.ctrl.bps.wms.htcondor.htcondor_service.HTCondorService
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot'
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {qgraphFile} --no-versions"
.. literalinclude:: pipelines_check.yaml
:language: YAML
:caption: ${CTRL_BPS_DIR}/doc/lsst.ctrl.bps/pipelines_check.yaml


.. _bps-submit:

Expand Down Expand Up @@ -279,6 +252,48 @@ order from most specific to general is: ``payload``, ``pipetask``, and ``site``.
subsections are pipetask labels where can override/set runtime settings for
particular pipetasks (currently no Quantum-specific settings).

**site**
settings for specific sites can be set here. Subsections are site names
which are matched to ``computeSite``. The following are examples for
specifying values needed to match jobs to glideins.

.. code-block:: YAML
:caption: HTCondor plugin example
site:
acsws02:
profile:
condor:
requirements: "(GLIDEIN_NAME == "test_gname")"
+GLIDEIN_NAME: "test_gname"
.. code-block:: YAML
:caption: Pegasus plugin example
site:
acsws02:
arch: x86_64
os: LINUX
directory:
shared-scratch:
path: /work/shared-scratch/${USER}
file-server:
operation: all
url: file:///work/shared-scratch/${USER}
profile:
pegasus:
style: condor
auxillary.local: true
condor:
universe: vanilla
getenv: true
requirements: '(ALLOCATED_NODE_SET == "${NODESET}")'
+JOB_NODE_SET: '"${NODESET}"'
dagman:
retry: 0
env:
PEGASUS_HOME: /usr/local/pegasus/current
Supported settings
^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -318,8 +333,8 @@ Supported settings
output.

**requestMemory**, optional
Amount of memory single Quantum execution of a particular pipetask will
need (e.g., 2GB).
Amount of memory, in MB, a single Quantum execution of a particular pipetask
will need (e.g., 2048).

**requestCpus**, optional
Number of cpus that a single Quantum execution of a particular pipetask
Expand Down Expand Up @@ -353,7 +368,7 @@ Supported settings
pipetask:
pipetask_init:
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --skip-existing --register-dataset-types --qgraph {qgraph_file} --no-versions"
requestMemory: 2GB
requestMemory: 2048
**templateDataId**
Template to use when creating job names (and HTCondor plugin then uses for
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/pre_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def pre_transform(config, out_prefix=None):
Parameters
----------
config : `.bps_config.BpsConfig`
Configuration values for BPS. In particular, looking for qgraph_file.
Configuration values for BPS. In particular, looking for qgraphFile.
out_prefix : `str` or None
Output path for the QuantumGraph and stdout/stderr from generating
the QuantumGraph.
Expand All @@ -59,7 +59,7 @@ def pre_transform(config, out_prefix=None):
result of running code that generates it.
"""
# Check to see if user provided pre-generated QuantumGraph.
found, input_qgraph_filename = config.search("qgraph_file")
found, input_qgraph_filename = config.search("qgraphFile")
if found:
if out_prefix is not None:
# Save a copy of the QuantumGraph file in out_prefix.
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def prepare(config, generic_workflow, out_prefix):

# Save QuantumGraphs.
# (putting after call to prepare so don't write a bunch of files if prepare fails)
found, when_to_save_job_qgraph = config.search('when_save_job_qgraph',
{'default': WhenToSaveQuantumGraphs.TRANSFORM})
found, when_to_save_job_qgraph = config.search("whenSaveJobQgraph",
{"default": WhenToSaveQuantumGraphs.TRANSFORM})
if found and when_to_save_job_qgraph == WhenToSaveQuantumGraphs.PREPARE:
for job_name in generic_workflow.nodes():
job = generic_workflow.get_job(job_name)
Expand Down
28 changes: 17 additions & 11 deletions python/lsst/ctrl/bps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def transform(config, clustered_quantum_graph, prefix):
if 'name' in clustered_quantum_graph.graph and clustered_quantum_graph.graph['name'] is not None:
name = clustered_quantum_graph.graph['name']
else:
_, name = config.search('uniqProcName', opt={'required': True})
_, name = config.search("uniqProcName", opt={"required": True})

generic_workflow = create_generic_workflow(config, clustered_quantum_graph, name, prefix)
generic_workflow_config = create_generic_workflow_config(config, prefix)

# Save QuantumGraphs.
found, when_to_save_job_qgraph = config.search('when_save_job_qgraph',
{'default': WhenToSaveQuantumGraphs.TRANSFORM})
found, when_to_save_job_qgraph = config.search("whenSaveJobQgraph",
{"default": WhenToSaveQuantumGraphs.TRANSFORM})
if found and when_to_save_job_qgraph == WhenToSaveQuantumGraphs.TRANSFORM:
for job_name in generic_workflow.nodes():
job = generic_workflow.get_job(job_name)
Expand Down Expand Up @@ -184,6 +184,12 @@ def create_init_workflow(config):
job.cmdline = create_command(config, "pipetaskInit", config[".bps_defined.run_qgraph_file"])
job.label = "init"
job.compute_site = config["computeSite"]
search_opt = {"curvals": {"curr_pipetask": "pipetaskInit"}, "required": False, "default": 0}
job.request_cpus = int(config.search("requestCpus", opt=search_opt)[1])
job.request_memory = int(config.search("requestMemory", opt=search_opt)[1])
job.request_disk = int(config.search("requestDisk", opt=search_opt)[1])
job.request_walltime = int(config.search("requestWalltime", opt=search_opt)[1])
update_job(config, job)
init_workflow.add_job(job)

_LOG.debug("creating init task input(s)")
Expand Down Expand Up @@ -315,14 +321,14 @@ def create_job_values_aggregate(config, generic_workflow):
label_counts[qnode.taskDef.label] += 1

search_opt = {"curvals": {"curr_pipetask": qnode.taskDef.label}, "required": False, "default": 0}
_, request_cpus = config.search("request_cpus", opt=search_opt)
job.request_cpus = max(job.request_cpus, request_cpus)
_, request_memory = config.search("request_memory", opt=search_opt)
job.request_memory = max(job.request_memory, request_memory)
_, request_disk = config.search("request_walltime", opt=search_opt)
job.request_disk += request_disk
_, request_walltime = config.search("request_walltime", opt=search_opt)
job.request_walltime += request_walltime
_, request_cpus = config.search("requestCpus", opt=search_opt)
job.request_cpus = max(job.request_cpus, int(request_cpus))
_, request_memory = config.search("requestMemory", opt=search_opt)
job.request_memory = max(job.request_memory, int(request_memory))
_, request_disk = config.search("requestDisk", opt=search_opt)
job.request_disk += int(request_disk)
_, request_walltime = config.search("requestWalltime", opt=search_opt)
job.request_walltime += int(request_walltime)

job.quanta_summary = ';'.join([f"{k}:{v}" for k, v in label_counts.items()])

Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/wms/htcondor/htcondor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from lsst.ctrl.bps.wms_service import BaseWmsWorkflow, BaseWmsService
from lsst.ctrl.bps.generic_workflow import GenericWorkflow
from .lssthtc import (htc_submit_dag, read_node_status, read_dag_node_log, condor_q, condor_history,
HTCDag, HTCJob, htc_jobs_to_wms_report)
HTCDag, HTCJob, htc_jobs_to_wms_report, htc_escape)

_LOG = logging.getLogger()

Expand Down Expand Up @@ -277,7 +277,7 @@ def translate_job_cmds(generic_workflow_job):
# Add extra "pass-thru" job commands
if generic_workflow_job.profile:
for key, val in generic_workflow_job.profile.items():
jobcmds[key] = val
jobcmds[key] = htc_escape(val)

return jobcmds

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/wms/htcondor/lssthtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def htc_escape(value):
new_value : `str`
Given string with characters escaped.
"""
return value.replace("\\", "\\\\").replace('"', '\\"').replace("'", "''")
return value.replace("\\", "\\\\").replace('"', '\\"').replace("'", "''").replace(""", '"')


def htc_write_attribs(stream, attrs):
Expand Down
11 changes: 10 additions & 1 deletion python/lsst/ctrl/bps/wms/pegasus/pegasus_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ def create_job(self, generic_workflow, gwf_job, peg_files):
else:
_LOG.warning("Job %s does not have any arguments", gwf_job.name)

if gwf_job.request_memory: # MB
job.addProfile(Profile(Namespace.CONDOR, "request_memory", gwf_job.request_memory))
if gwf_job.request_cpus: # cores
job.addProfile(Profile(Namespace.CONDOR, "request_cpus", gwf_job.request_cpus))
if gwf_job.request_disk: # MB
job.addProfile(Profile(Namespace.CONDOR, "request_disk", gwf_job.request_disk))
if gwf_job.priority: # MB
job.addProfile(Profile(Namespace.CONDOR, "priority", gwf_job.priority))

# Add extra job attributes
for key, value in gwf_job.profile.items():
job.addProfile(Profile(Namespace.CONDOR, key, value))
Expand Down Expand Up @@ -422,7 +431,7 @@ def _write_properties_file(self, out_prefix, filenames):
print(f"pegasus.catalog.replica.file={filenames['replica']}", file=outfh)

print("# This tells Pegasus where to find the Transformation Catalog.", file=outfh)
print("pegasus.catalog.transformation=Text")
print("pegasus.catalog.transformation=Text", file=outfh)
print(f"pegasus.catalog.transformation.file={filenames['transformation']}", file=outfh)

print("# Run Pegasus in shared file system mode.", file=outfh)
Expand Down

0 comments on commit fc5c4ad

Please sign in to comment.