Skip to content

Commit

Permalink
Merge pull request #36 from lsst/tickets/DM-30965
Browse files Browse the repository at this point in the history
Initial implementation of lazy commands creation in PanDA plugin
  • Loading branch information
SergeyPod committed Jul 14, 2021
2 parents c386b2b + bf87aa9 commit e9969b4
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 115 deletions.
126 changes: 126 additions & 0 deletions python/lsst/ctrl/bps/wms/panda/cmd_line_embedder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# This file is part of ctrl_bps.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import os
import logging


_LOG = logging.getLogger(__name__)


class CommandLineEmbedder:
"""
Class embeds static (constant across a task) values
into the pipeline execution command line
and resolves submission side environment variables
Parameters
----------
config : `~lsst.ctrl.bps.BPSConfig`
BPS configuration that includes the list of dynamic
(uniques per job) and submission side resolved variables
"""
def __init__(self, config):
self.leave_placeholder_params = config.get("placeholderParams", ['qgraphNodeId', 'qgraphId'])
self.submit_side_resolved = config.get("submitSideResolvedParams", ['USER'])

def replace_static_parameters(self, cmd_line, lazy_vars):
""" Substitutes the lazy parameters in the command line which
are static, the same for every job in the workflow and could be
defined once. This function offloads the edge node processing
and number of parameters transferred together with job
Parameters
----------
cmd_line: `str` command line to be processed
lazy_vars: `dict` of lazy variables and its values
Returns
-------
processed command line
"""
for param_name, param_val in lazy_vars.items():
if param_name not in self.leave_placeholder_params:
cmd_line = cmd_line.replace('{'+param_name+'}', param_val)
return cmd_line

def resolve_submission_side_env_vars(self, cmd_line):
""" Substitutes the lazy parameters in the command line
which are defined and resolved on the submission side
Parameters
----------
cmd_line: `str` command line to be processed
Returns
-------
processed command line
"""

for param in self.submit_side_resolved:
if os.getenv(param):
cmd_line = cmd_line.replace('<ENV:'+param+'>', os.getenv(param))
else:
_LOG.info("Expected parameter {0} is not found in the environment variables".format(param))
return cmd_line

def attach_pseudo_file_params(self, lazy_vars):
""" Adds the parameters needed to finalize creation of a pseudo file
Parameters
----------
lazy_vars: `dict` of values of to be substituted
Returns
-------
pseudo input file name suffix
"""

file_suffix = ""
for item in self.leave_placeholder_params:
file_suffix += '+' + item + ':' + lazy_vars.get(item, '')
return file_suffix

def substitute_command_line(self, bps_file_name, cmd_line, lazy_vars, job_name):
""" Preprocesses the command line leaving for the egde node evaluation
only parameters which are job / environment dependent
Parameters
----------
bps_file_name: `str` input file name proposed by BPS
cmd_line: `str` command line containing all lazy placeholders
lazy_vars: `dict` of lazy parameter name/values
job_name: `str` job name proposed by BPS
Returns
-------
cmd_line: `str`
processed command line
file_name: `str`
job pseudo input file name
"""

cmd_line = self.replace_static_parameters(cmd_line, lazy_vars)
cmd_line = self.resolve_submission_side_env_vars(cmd_line)
file_name = bps_file_name + "+" + job_name+self.attach_pseudo_file_params(lazy_vars)
return cmd_line, file_name
41 changes: 23 additions & 18 deletions python/lsst/ctrl/bps/wms/panda/conf_example/example_panda.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
pipelineYaml: ${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/conf_example/HSC-PANDA.yaml
pipelineYaml: /opt/lsst/software/HSC-PANDA.yaml
project: dev
campaign: "configuration_example"
submitPath: ${HOME}/submit/{outCollection}
container_obs_panda_edge_node_dir: /opt/lsst/software/stack/stack/miniconda3-py38_4.9.2-0.6.0/Linux64/ctrl_bps/21.0.0-18-gf2cd492+6c749b2ca5/python/lsst/ctrl/bps/wms/panda/edgenode
container_CTRL_MPEXEC_DIR: /opt/lsst/software/stack/stack/miniconda3-py38_4.9.2-0.6.0/Linux64/ctrl_mpexec/21.0.0-30-g82f2559+c64cb64c6b/

computing_cloud: LSST

#This setting supersede cloud and memory requirements
#computeSite: DOMA_LSST_GOOGLE_TEST_HIMEM

maxwalltime: 90000
requestMemory: 2000
maxattempt: 1

whenSaveJobQgraph: "NEVER"
idds_server: "https://aipanda015.cern.ch:443/idds"

placeholderParams: ['qgraphNodeId', 'qgraphId']
submitSideResolvedParams: ['USER']

payload:
runInit: true
payloadName: rc2
Expand All @@ -24,33 +29,33 @@ payload:
# tracts (smallest to largest): 9615, 9697, 9813
#dataQuery: "tract = 9813 and instrument='HSC' and skymap='hsc_rings_v1'"
#dataQuery: "tract = 9615 and patch=30 and instrument='HSC' and skymap='hsc_rings_v1'"

#Small (~1000 jobs) workflow
#dataQuery: "tract = 9615 and patch=30 and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'"

#Very small (~150 jobs) workflow
dataQuery: "tract = 9615 and patch=30 and detector IN (10..20) and instrument='HSC' and skymap='hsc_rings_v1' and band in ('r', 'i')"

#Very small (~2 jobs) workflow
#dataQuery: "tract = 9615 and patch=30 and detector IN (10..11) and instrument='HSC' and skymap='hsc_rings_v1' and band in ('r')"


# ~30k quanta
#dataQuery: "tract = 9615 and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'"
# ~150k quanta
#dataQuery: "(tract = 9615 or tract=9697 or (tract=9813 and patch not in (0, 8, 9, 63, 72, 73, 80))) and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'"


sw_image: "docker://spodolsky/centos:7-stack-lsst_distrib-w_2021_21"
bucket: "s3://butler-us-central1-panda-dev/hsc"
sw_image: "spodolsky/centos:7-stack-lsst_distrib-d_2021_07_05-debug_lazy_v1"
fileDistributionEndPoint: "s3://butler-us-central1-panda-dev/hsc"
s3_endpoint_url: "https://storage.googleapis.com"
payload_folder: payload
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "{container_obs_panda_edge_node_dir}/sw_runner _cmd_line_ ${{IN/L}} {container_obs_panda_edge_node_dir}" >&2;'
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;setup lsst_distrib;\${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/sw_runner _cmd_line_ " >&2;'


pipetask:

pipetaskInit:
runQuantumCommand: "{container_CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --init-only --register-dataset-types --qgraph {bucket}/{payload_folder}/{uniqProcName}/${{filename}} --clobber-partial-outputs --no-versions --skip-existing"
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --init-only --register-dataset-types --qgraph {fileDistributionEndPoint}/{payload_folder}/{uniqProcName}/<FILE:runQgraphFile> --clobber-outputs --no-versions --skip-existing"

#This setting supersede cloud and memory requirements
#computeSite: DOMA_LSST_GOOGLE_TEST

measure:
requestMemory: 8129
forcedPhotCcd:
Expand All @@ -60,23 +65,23 @@ pipetask:
mergeMeasurements:
requestMemory: 4096
writeObjectTable:
requestMemory: 20000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml
requestMemory: 12000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml
consolidateObjectTable:
requestMemory: 20000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml
requestMemory: 12000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml
assembleCoadd:
requestMemory: 8192
makeWarp:
requestMemory: 20000 #reduced from 85000: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml
requestMemory: 12000 #reduced from 85000: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml
jointcal:
requestMemory: 20000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml
requestMemory: 12000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml
deblend:
requestMemory: 3000
skyCorr:
requestMemory: 11500
fgcmBuildStarsTable:
requestMemory: 8192
fgcmFitCycle:
requestMemory: 20000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step1.yaml
requestMemory: 12000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step1.yaml
fgcmOutputProducts:
requestMemory: 8192

Expand All @@ -85,5 +90,5 @@ requestCpus: 1
wmsServiceClass: lsst.ctrl.bps.wms.panda.panda_service.PanDAService
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot'
runQuantumCommand: '{container_CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {bucket}/{payload_folder}/{uniqProcName}/${{filename}} --qgraph-id ${{qgraph-id}} --qgraph-node-id ${{qgraph-node-id}} --clobber-partial-outputs --skip-existing --no-versions'
runQuantumCommand: '${CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {fileDistributionEndPoint}/{payload_folder}/{uniqProcName}/<FILE:runQgraphFile> --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --skip-existing --no-versions'
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}"
41 changes: 31 additions & 10 deletions python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
#!/usr/bin/python
"""
This file is needed to decode the command line string sent from the BPS
plugin -> PanDA -> Edge node cluster management -> Edge node -> Container.
Once the command line hexified it can contain symbols often stripped such as
',",` and multiline commands.
plugin -> PanDA -> Edge node cluster management
-> Edge node -> Container. This file is not a part
of the BPS but a part of the payload wrapper.
It decodes the hexified command line.
"""
import os
import re
import sys
import binascii
cmdline = str(binascii.unhexlify(sys.argv[1]).decode())
dataparams = sys.argv[2].split("+")
cmdline = cmdline.replace("${filename}", dataparams[0])
if len(dataparams) > 2:
cmdline = cmdline.replace("${qgraph-id}", dataparams[2])
cmdline = cmdline.replace("${qgraph-node-id}", dataparams[3])
print(cmdline)


def replace_environment_vars(cmdline):
"""
Replaces placeholders to the actual environment variables.
:param cmdline: `str`
Command line
:return: cmdline: `str`
Processed command line
"""
envs_to_replace = re.findall(r'<ENV:(.*?)>', cmdline)
for env_var in envs_to_replace:
if value := os.getenv(env_var):
cmdline = cmdline.replace('<ENV:' + env_var + '>', value)
return cmdline


cmd_line = str(binascii.unhexlify(sys.argv[1]).decode())
data_params = sys.argv[2].split("+")
cmd_line = replace_environment_vars(cmd_line)
cmd_line = cmd_line.replace("<FILE:runQgraphFile>", data_params[0])
for key_value_pair in data_params[1:]:
(key, value) = key_value_pair.split(":")
cmd_line = cmd_line.replace("{" + key + "}", value)
print(cmd_line)
14 changes: 8 additions & 6 deletions python/lsst/ctrl/bps/wms/panda/edgenode/sw_runner
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/bin/bash
# This is a starter script needed to initialize basic Rubin software environment inside the container and execute
# the actual command line after decoding from hexed strings
cd /tmp;
ls /;
export HOME=/tmp;
export HOME=${PILOT_HOME}
cd ${HOME}
export S3_ENDPOINT_URL=https://storage.googleapis.com
. /opt/lsst/software/stack/loadLSST.bash;
DIR="$(dirname "${BASH_SOURCE[0]}")" # Get the directory name
DIR="$(realpath "${DIR}")" # Resolve its full path
unset PYTHONPATH
. ${DIR}/../../../../../../../../../../../../loadLSST.bash;
setup lsst_distrib;
echo "$(/usr/bin/python $3/cmd_line_decoder.py $1 $2)"
eval "$(/usr/bin/python $3/cmd_line_decoder.py $1 $2) >&2"
echo "$(python3 $CTRL_BPS_DIR/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py $1 $2)"
eval "$(python3 $CTRL_BPS_DIR/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py $1 $2) >&2"

0 comments on commit e9969b4

Please sign in to comment.