-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial implementation of lazy commands creation in PanDA plugin #36
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# This file is part of ctrl_bps. | ||
# | ||
# Developed for the LSST Data Management System. | ||
# This product includes software developed by the LSST Project | ||
# (https://www.lsst.org). | ||
# See the COPYRIGHT file at the top-level directory of this distribution | ||
# for details of code ownership. | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
import os | ||
import logging | ||
|
||
|
||
_LOG = logging.getLogger(__name__) | ||
|
||
|
||
class CommandLineEmbedder: | ||
""" | ||
Class embeds static (constant across a task) values | ||
into the pipeline execution command line | ||
and resolves submission side environment variables | ||
|
||
Parameters | ||
---------- | ||
config : `~lsst.ctrl.bps.BPSConfig` | ||
BPS configuration that includes the list of dynamic | ||
(uniques per job) and submission side resolved variables | ||
|
||
""" | ||
def __init__(self, config): | ||
self.leave_placeholder_params = config.get("placeholderParams", ['qgraphNodeId', 'qgraphId']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will require changes if the pipetask command lines change enough. And why these two values are special compared to others? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, they only variates across a task. They should not be resolved at the submission step and placeholders should be left for them for further processing on the edge node side. The are also not an environment variables but contribute to the actual command line construction. They also contributed to the pseudo input list for a task which is constructed at the submission step. |
||
self.submit_side_resolved = config.get("submitSideResolvedParams", ['USER']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should just change the submission yaml examples to use 'operator' instead of 'user' to solve this problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would agree, but I also think we should still have opportunity to resolve some variables at the submission step. |
||
|
||
def replace_static_parameters(self, cmd_line, lazy_vars): | ||
""" Substitutes the lazy parameters in the command line which | ||
are static, the same for every job in the workflow and could be | ||
defined once. This function offloads the edge node processing | ||
and number of parameters transferred together with job | ||
|
||
|
||
Parameters | ||
---------- | ||
cmd_line: `str` command line to be processed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use proper syntax for numpy docs. You need to do something like:
|
||
lazy_vars: `dict` of lazy variables and its values | ||
|
||
Returns | ||
------- | ||
processed command line | ||
""" | ||
for param_name, param_val in lazy_vars.items(): | ||
if param_name not in self.leave_placeholder_params: | ||
cmd_line = cmd_line.replace('{'+param_name+'}', param_val) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure that it will affect the cmd_line or not, but in the bps yaml, those "{value}" can actually contain f-string formats (e.g., {value:02}). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we are using f-string style logic here we should not be using simple string replace but instead should be calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to leave as is because format can raise exception |
||
return cmd_line | ||
|
||
def resolve_submission_side_env_vars(self, cmd_line): | ||
""" Substitutes the lazy parameters in the command line | ||
which are defined and resolved on the submission side | ||
|
||
Parameters | ||
---------- | ||
cmd_line: `str` command line to be processed | ||
|
||
Returns | ||
------- | ||
processed command line | ||
""" | ||
|
||
for param in self.submit_side_resolved: | ||
if os.getenv(param): | ||
cmd_line = cmd_line.replace('<ENV:'+param+'>', os.getenv(param)) | ||
else: | ||
_LOG.info("Expected parameter {0} is not found in the environment variables".format(param)) | ||
return cmd_line | ||
|
||
def attach_pseudo_file_params(self, lazy_vars): | ||
""" Adds the parameters needed to finalize creation of a pseudo file | ||
|
||
Parameters | ||
---------- | ||
lazy_vars: `dict` of values of to be substituted | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe call lazy_values? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a dictionary, the values in my subjective opinion more associates with a list... |
||
|
||
Returns | ||
------- | ||
pseudo input file name suffix | ||
""" | ||
|
||
file_suffix = "" | ||
for item in self.leave_placeholder_params: | ||
file_suffix += '+' + item + ':' + lazy_vars.get(item, '') | ||
return file_suffix | ||
|
||
def substitute_command_line(self, bps_file_name, cmd_line, lazy_vars, job_name): | ||
""" Preprocesses the command line leaving for the egde node evaluation | ||
only parameters which are job / environment dependent | ||
|
||
Parameters | ||
---------- | ||
bps_file_name: `str` input file name proposed by BPS | ||
cmd_line: `str` command line containing all lazy placeholders | ||
lazy_vars: `dict` of lazy parameter name/values | ||
job_name: `str` job name proposed by BPS | ||
|
||
Returns | ||
------- | ||
cmd_line: `str` | ||
processed command line | ||
file_name: `str` | ||
job pseudo input file name | ||
""" | ||
|
||
cmd_line = self.replace_static_parameters(cmd_line, lazy_vars) | ||
cmd_line = self.resolve_submission_side_env_vars(cmd_line) | ||
file_name = bps_file_name + "+" + job_name+self.attach_pseudo_file_params(lazy_vars) | ||
return cmd_line, file_name |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,23 @@ | ||
pipelineYaml: ${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/conf_example/HSC-PANDA.yaml | ||
pipelineYaml: /opt/lsst/software/HSC-PANDA.yaml | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an existing path on the IDF submit host(s)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
project: dev | ||
campaign: "configuration_example" | ||
submitPath: ${HOME}/submit/{outCollection} | ||
container_obs_panda_edge_node_dir: /opt/lsst/software/stack/stack/miniconda3-py38_4.9.2-0.6.0/Linux64/ctrl_bps/21.0.0-18-gf2cd492+6c749b2ca5/python/lsst/ctrl/bps/wms/panda/edgenode | ||
container_CTRL_MPEXEC_DIR: /opt/lsst/software/stack/stack/miniconda3-py38_4.9.2-0.6.0/Linux64/ctrl_mpexec/21.0.0-30-g82f2559+c64cb64c6b/ | ||
|
||
computing_cloud: LSST | ||
|
||
#This setting supersede cloud and memory requirements | ||
#computeSite: DOMA_LSST_GOOGLE_TEST_HIMEM | ||
|
||
maxwalltime: 90000 | ||
requestMemory: 2000 | ||
maxattempt: 1 | ||
|
||
whenSaveJobQgraph: "NEVER" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a heads up, planning on making this the default value in the execution butler version so it won't need to be here too much longer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the comment |
||
idds_server: "https://aipanda015.cern.ch:443/idds" | ||
|
||
placeholderParams: ['qgraphNodeId', 'qgraphId'] | ||
submitSideResolvedParams: ['USER'] | ||
|
||
payload: | ||
runInit: true | ||
payloadName: rc2 | ||
|
@@ -24,33 +29,33 @@ payload: | |
# tracts (smallest to largest): 9615, 9697, 9813 | ||
#dataQuery: "tract = 9813 and instrument='HSC' and skymap='hsc_rings_v1'" | ||
#dataQuery: "tract = 9615 and patch=30 and instrument='HSC' and skymap='hsc_rings_v1'" | ||
|
||
#Small (~1000 jobs) workflow | ||
#dataQuery: "tract = 9615 and patch=30 and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'" | ||
|
||
#Very small (~150 jobs) workflow | ||
dataQuery: "tract = 9615 and patch=30 and detector IN (10..20) and instrument='HSC' and skymap='hsc_rings_v1' and band in ('r', 'i')" | ||
|
||
#Very small (~2 jobs) workflow | ||
#dataQuery: "tract = 9615 and patch=30 and detector IN (10..11) and instrument='HSC' and skymap='hsc_rings_v1' and band in ('r')" | ||
|
||
|
||
# ~30k quanta | ||
#dataQuery: "tract = 9615 and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'" | ||
# ~150k quanta | ||
#dataQuery: "(tract = 9615 or tract=9697 or (tract=9813 and patch not in (0, 8, 9, 63, 72, 73, 80))) and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'" | ||
|
||
|
||
sw_image: "docker://spodolsky/centos:7-stack-lsst_distrib-w_2021_21" | ||
bucket: "s3://butler-us-central1-panda-dev/hsc" | ||
sw_image: "spodolsky/centos:7-stack-lsst_distrib-d_2021_07_05-debug_lazy_v1" | ||
fileDistributionEndPoint: "s3://butler-us-central1-panda-dev/hsc" | ||
s3_endpoint_url: "https://storage.googleapis.com" | ||
payload_folder: payload | ||
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "{container_obs_panda_edge_node_dir}/sw_runner _cmd_line_ ${{IN/L}} {container_obs_panda_edge_node_dir}" >&2;' | ||
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;setup lsst_distrib;\${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/sw_runner _cmd_line_ " >&2;' | ||
|
||
|
||
pipetask: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At some point soon, it would be good if this example wasn't a real science pipeline. We don't want to keep updating pipeline values in our examples every time the pipeline changes (e.g., the requestMemory changes). |
||
|
||
pipetaskInit: | ||
runQuantumCommand: "{container_CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --init-only --register-dataset-types --qgraph {bucket}/{payload_folder}/{uniqProcName}/${{filename}} --clobber-partial-outputs --no-versions --skip-existing" | ||
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --init-only --register-dataset-types --qgraph {fileDistributionEndPoint}/{payload_folder}/{uniqProcName}/<FILE:runQgraphFile> --clobber-outputs --no-versions --skip-existing" | ||
|
||
#This setting supersede cloud and memory requirements | ||
#computeSite: DOMA_LSST_GOOGLE_TEST | ||
|
||
measure: | ||
requestMemory: 8129 | ||
forcedPhotCcd: | ||
|
@@ -60,23 +65,23 @@ pipetask: | |
mergeMeasurements: | ||
requestMemory: 4096 | ||
writeObjectTable: | ||
requestMemory: 20000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml | ||
requestMemory: 12000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml | ||
consolidateObjectTable: | ||
requestMemory: 20000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml | ||
requestMemory: 12000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml | ||
assembleCoadd: | ||
requestMemory: 8192 | ||
makeWarp: | ||
requestMemory: 20000 #reduced from 85000: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml | ||
requestMemory: 12000 #reduced from 85000: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml | ||
jointcal: | ||
requestMemory: 20000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml | ||
requestMemory: 12000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml | ||
deblend: | ||
requestMemory: 3000 | ||
skyCorr: | ||
requestMemory: 11500 | ||
fgcmBuildStarsTable: | ||
requestMemory: 8192 | ||
fgcmFitCycle: | ||
requestMemory: 20000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step1.yaml | ||
requestMemory: 12000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step1.yaml | ||
fgcmOutputProducts: | ||
requestMemory: 8192 | ||
|
||
|
@@ -85,5 +90,5 @@ requestCpus: 1 | |
wmsServiceClass: lsst.ctrl.bps.wms.panda.panda_service.PanDAService | ||
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering | ||
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot' | ||
runQuantumCommand: '{container_CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {bucket}/{payload_folder}/{uniqProcName}/${{filename}} --qgraph-id ${{qgraph-id}} --qgraph-node-id ${{qgraph-node-id}} --clobber-partial-outputs --skip-existing --no-versions' | ||
runQuantumCommand: '${CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {fileDistributionEndPoint}/{payload_folder}/{uniqProcName}/<FILE:runQgraphFile> --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --skip-existing --no-versions' | ||
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,37 @@ | ||
#!/usr/bin/python | ||
""" | ||
This file is needed to decode the command line string sent from the BPS | ||
plugin -> PanDA -> Edge node cluster management -> Edge node -> Container. | ||
Once the command line hexified it can contain symbols often stripped such as | ||
',",` and multiline commands. | ||
plugin -> PanDA -> Edge node cluster management | ||
-> Edge node -> Container. This file is not a part | ||
of the BPS but a part of the payload wrapper. | ||
It decodes the hexified command line. | ||
""" | ||
import os | ||
import re | ||
import sys | ||
import binascii | ||
cmdline = str(binascii.unhexlify(sys.argv[1]).decode()) | ||
dataparams = sys.argv[2].split("+") | ||
cmdline = cmdline.replace("${filename}", dataparams[0]) | ||
if len(dataparams) > 2: | ||
cmdline = cmdline.replace("${qgraph-id}", dataparams[2]) | ||
cmdline = cmdline.replace("${qgraph-node-id}", dataparams[3]) | ||
print(cmdline) | ||
|
||
|
||
def replace_environment_vars(cmdline): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing docstring There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
""" | ||
Replaces placeholders to the actual environment variables. | ||
:param cmdline: `str` | ||
Command line | ||
:return: cmdline: `str` | ||
Processed command line | ||
""" | ||
envs_to_replace = re.findall(r'<ENV:(.*?)>', cmdline) | ||
for env_var in envs_to_replace: | ||
if value := os.getenv(env_var): | ||
cmdline = cmdline.replace('<ENV:' + env_var + '>', value) | ||
return cmdline | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the "<ENV" not cause problems if left in the cmdline? I was wondering why there wasn't an else or replace. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a failing scenario |
||
|
||
|
||
cmd_line = str(binascii.unhexlify(sys.argv[1]).decode()) | ||
data_params = sys.argv[2].split("+") | ||
cmd_line = replace_environment_vars(cmd_line) | ||
cmd_line = cmd_line.replace("<FILE:runQgraphFile>", data_params[0]) | ||
for key_value_pair in data_params[1:]: | ||
(key, value) = key_value_pair.split(":") | ||
cmd_line = cmd_line.replace("{" + key + "}", value) | ||
print(cmd_line) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is an actual executable python file we should document at the top that it's deliberately not importing any bps code for a reason. I guess we need to think about how plugins can publish their own additional commands and whether they can go in the path. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are right, everything what is under the edgenode is the wrapper code executed on the edge node. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,13 @@ | ||
#!/bin/bash | ||
# This is a starter script needed to initialize basic Rubin software environment inside the container and execute | ||
# the actual command line after decoding from hexed strings | ||
cd /tmp; | ||
ls /; | ||
export HOME=/tmp; | ||
export HOME=${PILOT_HOME} | ||
cd ${HOME} | ||
export S3_ENDPOINT_URL=https://storage.googleapis.com | ||
. /opt/lsst/software/stack/loadLSST.bash; | ||
DIR="$(dirname "${BASH_SOURCE[0]}")" # Get the directory name | ||
DIR="$(realpath "${DIR}")" # Resolve its full path | ||
unset PYTHONPATH | ||
. ${DIR}/../../../../../../../../../../../../loadLSST.bash; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Depending upon the number of subdirs in a container through subdirs not maintained by ctrl_bps seems really fragile. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't there an environment variable for the location of loadLSST.bash? In lsstsw it's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignore me, obviously you are trying to initialize the code in the first place... |
||
setup lsst_distrib; | ||
echo "$(/usr/bin/python $3/cmd_line_decoder.py $1 $2)" | ||
eval "$(/usr/bin/python $3/cmd_line_decoder.py $1 $2) >&2" | ||
echo "$(python3 $CTRL_BPS_DIR/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py $1 $2)" | ||
eval "$(python3 $CTRL_BPS_DIR/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py $1 $2) >&2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to put the proper file headers at the top of each file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I put the file header to this file.