Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of lazy commands creation in PanDA plugin #36

Merged
merged 1 commit into from Jul 14, 2021

Conversation

SergeyPod
Copy link
Contributor

No description provided.

Copy link
Member

@timj timj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some quick comments. You should always strive to be thinking about what code is general purpose and what is panda-specific.


class CommandLineEmbedder:
"""
Class embeds static (constant across a task) values into the pipeline execution command line
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something that is completely generic and not panda-specific at all. Is the issue here that BPS is using a default syntax that htcondor understands internally but which panda needs to have code to expand? Either way this surely should be in a generic part of bps so that other plugins can use it. Is there panda-specific code here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below about <ENV syntax being plugin specific. Also, currently the HTCondor/Pegasus plugins don't care specifically about the ids vs other values. We could have the code that builds the GenericWorkflow ensure that only variables that have different values within a workflow are left, but there hasn't been any request to do so.


Parameters
----------
cmd_line: `str` command line to be processed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use proper syntax for numpy docs. You need to do something like:

cmd_line : `str`
    Command line string to be processed.

envs_to_replace = re.findall(r'<ENV:(.*?)>', cmdline)
for env_var in envs_to_replace:
if os.getenv(env_var):
cmdline = cmdline.replace('<ENV:'+env_var+'>', os.getenv(env_var))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused because it looks like this env var replacement code is not using the envvar replacement code from earlier. Surely there should only be one place in BPS that knows how to replace the special BPS <ENV syntax with the environment variable value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part in to be executed on the computing node, another parts to be executed on the submission side, they are running in different locations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time of execution does not explain logic duplication though. It seems to me that env-replacement is a generic concept that deserves a single implementation location.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing the <ENV syntax for inside jobs is plugin specific (e.g., HTCondor wants parens instead of braces). Otherwise we should just have bps put normal shell syntax in the GenericWorkflow command lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't initialize the BPS code in the light weight execution wrapper, this is why this code is separated. Another thing is that submission side replaces the lazy parameters which are static or should be initialized on the submission node, and the edge node performs the actual lazy variables resolution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring at the top should be very clear about why this file doesn't depend on any code from BPS.

My comment about <ENV syntax is that this is implementing the generic solution of replacing the ENV string with the value of the environment variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated file header


if len(qgraphNodeId) > 0:
cmd_line = cmd_line.replace("{qgraphNodeId}", qgraphNodeId)
print(cmd_line)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is an actual executable python file we should document at the top that it's deliberately not importing any bps code for a reason. I guess we need to think about how plugins can publish their own additional commands and whether they can go in the path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, everything what is under the edgenode is the wrapper code executed on the edge node.

@@ -0,0 +1,94 @@
import os
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to put the proper file headers at the top of each file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I put the file header to this file.


def replace_static_parameters(self, cmd_line, lazy_vars):
""" Substitutes the lazy parameters in the command line which are static,
the same for every job in the workflow and could be defined once. This function offloads the edge
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is too long. I think this means that bps has not enabled the 79 character limit for docstrings in setup.cfg.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if there was some project-centric place for these common pieces. We copied it from some place at the beginning, and obviously should do so again. Feel free to do that on this ticket or I'll do it on my ticket.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked with max-doc-length. There will be several places in regular bps code that need changed because I mistakenly thought that 79-character maximum was only for docstrings and in code comments could be the same length as the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a ticket to update the setup.cfg file and fix the problems (https://jira.lsstcorp.org/browse/DM-31043).


file_suffix = ""
for item in self.leave_placeholder_params:
file_suffix += '+'+item+':'+lazy_vars.get(item, '')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add spaces around +

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

"""

copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
future_file_copy = []
for src_path in local_pfns:
src = ButlerURI(src_path)
target_base_uri = ButlerURI(cloud_prefix)
target_base_uri = ButlerURI(file_placement_path)

# S3 clients explicitly instantiate here to overpass this
# https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
target_base_uri.exists()
src.exists()

target = target_base_uri.join(os.path.basename(src_path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use os.path.basename if you are using ButlerURI already. Use target_base_uri.join(src.basename())

Copy link
Collaborator

@MichelleGower MichelleGower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double check all files have LSST header (https://developer.lsst.io/python/style.html?highlight=license#each-python-file-must-contain-the-standard-license-preamble) and docstrings. Multiple questions which might just be resolved by more comments. Not clear if there is still an issue with the GenericWorkflow's lazy command lines or if just wasn't clear on how to use them. Since it was actually running, I leave it to @timj how much has to be changed this round before merging.

fgcmOutputProducts:
requestMemory: 8192

saveDot: False
requestCpus: 1
wmsServiceClass: lsst.ctrl.bps.wms.panda.panda_service.PanDAService
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot'
runQuantumCommand: '{container_CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --extend-run --skip-init-writes --qgraph {bucket}/{payload_folder}/{uniqProcName}/${{filename}} --qgraph-id ${{qgraph-id}} --qgraph-node-id ${{qgraph-node-id}} --clobber-partial-outputs --skip-existing --no-versions'
createQuantumGraph: '<ENV:CTRL_MPEXEC_DIR>/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} --qgraph-dot {qgraphFile}.dot'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BPS will make the conversion to <ENV: in the GenericWorkflow. The user's yaml should have ${CTRL_MPEXEC_DIR}. See doc/lsst.ctrl.bps/pipelines_check.yaml

for env_var in envs_to_replace:
if os.getenv(env_var):
cmdline = cmdline.replace('<ENV:'+env_var+'>', os.getenv(env_var))
return cmdline
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the "<ENV" not cause problems if left in the cmdline? I was wondering why there wasn't an else or replace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a failing scenario

data_params = sys.argv[2].split("+")
cmd_line = replace_environment_vars(cmd_line)
cmd_line = cmd_line.replace("<FILE:runQgraphFile>", data_params[0])
qgraphId = data_params[2].split(":")[1]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicitly knowing that there will be qgraphID and qgraphNodeId is hard-coded behavior that will mean that the plugin will need to be changed if the command line changes.

s3_endpoint_url: "https://storage.googleapis.com"
payload_folder: payload
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "{container_obs_panda_edge_node_dir}/sw_runner _cmd_line_ ${{IN/L}} {container_obs_panda_edge_node_dir}" >&2;'
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;setup lsst_distrib;\<ENV:CTRL_BPS_DIR_REPLACE>/python/lsst/ctrl/bps/wms/panda/edgenode/sw_runner _cmd_line_ " >&2;'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "<ENV:CTRL_BPS_DIR_REPLACE>" for? (<ENV: should be internal bps/plugin syntax not seen by user).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per discussion this parameter was replaced to an ordinary environment variable.

s3_endpoint_url: "https://storage.googleapis.com"
payload_folder: payload
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "{container_obs_panda_edge_node_dir}/sw_runner _cmd_line_ ${{IN/L}} {container_obs_panda_edge_node_dir}" >&2;'
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;setup lsst_distrib;\<ENV:CTRL_BPS_DIR_REPLACE>/python/lsst/ctrl/bps/wms/panda/edgenode/sw_runner _cmd_line_ " >&2;'


pipetask:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point soon, it would be good if this example wasn't a real science pipeline. We don't want to keep updating pipeline values in our examples every time the pipeline changes (e.g., the requestMemory changes).

@@ -1,18 +1,24 @@
pipelineYaml: ${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/conf_example/HSC-PANDA.yaml
pipelineYaml: /opt/lsst/software/HSC-PANDA.yaml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an existing path on the IDF submit host(s)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


#This setting supersede cloud and memory requirements
#computeSite: DOMA_LSST_GOOGLE_TEST_HIMEM

maxwalltime: 90000
requestMemory: 2000
maxattempt: 1

whenSaveJobQgraph: "NEVER"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up, planning on making this the default value in the execution butler version so it won't need to be here too much longer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment

@@ -237,24 +242,3 @@ def get_input_file(self, job_name):
quantum graph file name
"""
return next(iter(self.bps_workflow.nodes.get(job_name).get("inputs")))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another heads up, this is one of the lines that won't work after the execution butler changes because it isn't using the API and the internal details are changing. (self.bps_workflow.get_job_inputs(job_name, data=True) Not sure if you need just the GenericWorkflowFile names or the whole objects.

@@ -198,38 +201,38 @@ def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_cla
idds_workflow = cls(generic_workflow.name, config)
workflow_generator = IDDSWorkflowGenerator(generic_workflow, config)
idds_workflow.generated_tasks = workflow_generator.define_tasks()
cloud_prefix = config['bucket'] + '/' + \
file_placement_path = config['fileDistributionEndPoint'] + '/' + \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use os.path.join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

config['payload_folder'] + '/' + config['workflowName'] + '/'
cls.copy_pickles_into_cloud([config['bps_defined']['run_qgraph_file']], cloud_prefix)
cls.copy_files_for_distribution([config['bps_defined']['run_qgraph_file']], file_placement_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which files need to be copied should be determined from the GenericWorkflow itself and not hardcoding a specific one from the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichelleGower , How can I extract this list of files to be used from the GenericWorkflow? Iterate over jobs and collect input files names? Or there is a more compact way? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see in the snippet you've sent me. Thanks.

"""

for param in self.submit_side_resolved:
cmd_line = cmd_line.replace('<ENV:'+param+'>', os.getenv(param))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail if the env var does not exist but it might be preferable to at least catch it and report which environment variable you expected to exist because otherwise replace() argument 2 must be str, not None is a bit opaque.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

def replace_environment_vars(cmdline):
envs_to_replace = re.findall(r'<ENV:(.*?)>', cmdline)
for env_var in envs_to_replace:
if os.getenv(env_var):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using walrus operator here:

if (value := os.getenv(env_var)):
    cmdLine = cmdLine.replace(..., value)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion, fixed

Copy link
Contributor Author

@SergeyPod SergeyPod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments implemented


"""
def __init__(self, config):
self.leave_placeholder_params = config.get("placeholderParams", ['qgraphNodeId', 'qgraphId'])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they only variates across a task. They should not be resolved at the submission step and placeholders should be left for them for further processing on the edge node side. The are also not an environment variables but contribute to the actual command line construction. They also contributed to the pseudo input list for a task which is constructed at the submission step.

"""
def __init__(self, config):
self.leave_placeholder_params = config.get("placeholderParams", ['qgraphNodeId', 'qgraphId'])
self.submit_side_resolved = config.get("submitSideResolvedParams", ['USER'])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree, but I also think we should still have opportunity to resolve some variables at the submission step.


for param_name, param_val in lazy_vars.items():
if param_name not in self.leave_placeholder_params:
cmd_line = cmd_line.replace('{'+param_name+'}', param_val)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""

for param in self.submit_side_resolved:
cmd_line = cmd_line.replace('<ENV:'+param+'>', os.getenv(param))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


Parameters
----------
lazy_vars: `dict` of values of to be substituted
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a dictionary, the values in my subjective opinion more associates with a list...

envs_to_replace = re.findall(r'<ENV:(.*?)>', cmdline)
for env_var in envs_to_replace:
if os.getenv(env_var):
cmdline = cmdline.replace('<ENV:'+env_var+'>', os.getenv(env_var))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated file header

for env_var in envs_to_replace:
if os.getenv(env_var):
cmdline = cmdline.replace('<ENV:'+env_var+'>', os.getenv(env_var))
return cmdline
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a failing scenario

@@ -198,38 +201,38 @@ def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_cla
idds_workflow = cls(generic_workflow.name, config)
workflow_generator = IDDSWorkflowGenerator(generic_workflow, config)
idds_workflow.generated_tasks = workflow_generator.define_tasks()
cloud_prefix = config['bucket'] + '/' + \
file_placement_path = config['fileDistributionEndPoint'] + '/' + \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

config['payload_folder'] + '/' + config['workflowName'] + '/'
cls.copy_pickles_into_cloud([config['bps_defined']['run_qgraph_file']], cloud_prefix)
cls.copy_files_for_distribution([config['bps_defined']['run_qgraph_file']], file_placement_path)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichelleGower , How can I extract this list of files to be used from the GenericWorkflow? Iterate over jobs and collect input files names? Or there is a more compact way? Thanks.

config['payload_folder'] + '/' + config['workflowName'] + '/'
cls.copy_pickles_into_cloud([config['bps_defined']['run_qgraph_file']], cloud_prefix)
cls.copy_files_for_distribution([config['bps_defined']['run_qgraph_file']], file_placement_path)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see in the snippet you've sent me. Thanks.

@SergeyPod SergeyPod force-pushed the tickets/DM-30965 branch 4 times, most recently from de527c5 to bf87aa9 Compare July 14, 2021 20:47
@SergeyPod SergeyPod merged commit e9969b4 into master Jul 14, 2021
@SergeyPod SergeyPod deleted the tickets/DM-30965 branch July 14, 2021 20:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants