Skip to content

Commit

Permalink
Merge pull request #47 from lsst/tickets/DM-31900
Browse files Browse the repository at this point in the history
DM-31900: Compute site changes for PanDA
  • Loading branch information
SergeyPod committed Sep 29, 2021
2 parents ca22f6f + af03a42 commit 679a4d4
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
includeConfigs:
- ${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/conf_example/pipelines_check_idf.yaml
- pipelines_check_idf.yaml


#PANDA plugin specific settings:
idds_server: "https://aipanda015.cern.ch:443/idds"
placeholderParams: ['qgraphNodeId', 'qgraphId']

#IDF PanDA specific settings:
computing_cloud: LSST
computeSite: LSST

#SLAC PanDA specific settings:
#computing_cloud: US
Expand All @@ -24,52 +24,30 @@ campaign: quick
# the default.
executionButler:
whenCreate: "SUBMIT"
#USER executionButlerDir: "/my/exec/butler/dir" # if user provided, otherwise uses executionButlerTemplate
createCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -b {butlerConfig} --input {inCollection} --output-run {outCollection} --save-execution-butler {executionButlerDir} -g {qgraphFile}"
createCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --save-execution-butler {executionButlerDir} -g {qgraphFile} --skip-existing-in {outCollection}"

whenMerge: "ALWAYS"
implementation: JOB # JOB, WORKFLOW
concurrency_limit: db_limit

# What commands to run as part of the merge step:
command1: "${DAF_BUTLER_DIR}/bin/butler --log-level=VERBOSE transfer-datasets {executionButlerDir} {butlerConfig} --collections {outCollection}"
command2: "${DAF_BUTLER_DIR}/bin/butler collection-chain {butlerConfig} {output} {outCollection} --mode=prepend"

# For --replace-run behavior need to run two collection-chain commands
# in addition to the transfer-datasets:
#command2: "${DAF_BUTLER_DIR}/bin/butler collection-chain {butlerConfig} {output} --mode=pop 1"
#command3: "${DAF_BUTLER_DIR}/bin/butler collection-chain {butlerConfig} {output} --mode=prepend {outCollection}"
mergePreCmdOpts: "{defaultPreCmdOpts}"
command1: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} transfer-datasets {executionButlerDir} {butlerConfig} --collections {outCollection}"
command2: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} collection-chain {butlerConfig} {output} --flatten --mode=extend {inCollection}"
command3: "${DAF_BUTLER_DIR}/bin/butler {mergePreCmdOpts} collection-chain {butlerConfig} {output} --flatten --mode=prepend {outCollection}"
queue: "DOMA_LSST_GOOGLE_MERGE"

pipetask:
pipetaskInit:
# Notes: Declaring and chaining now happen within execution butler
# steps. So, this command no longer needs -o and must have
# --extend-run.
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} -i {inCollection} --output-run {outCollection} --init-only --register-dataset-types --qgraph {fileDistributionEndPoint}/{qgraphFile} --extend-run --clobber-outputs --no-versions"
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {initPreCmdOpts} run -b {butlerConfig} -i {inCollection} -o {output} --output-run {outCollection} --qgraph {fileDistributionEndPoint}/{qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --clobber-outputs --init-only --extend-run {extraInitOptions}"
forcedPhotCoadd:
queue: "DOMA_LSST_GOOGLE_TEST_HIMEM_NON_PREEMPT"

# Default commands and usage requests for creating QuantumGraph, running Quantum.
# Defaults to using full workflow QuantumGraph instead of per-job QuantumGraphs.
whenSaveJobQgraph: "NEVER"
createQuantumGraph: '${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -d "{dataQuery}" -b {butlerConfig} -i {inCollection} -p {pipelineYaml} -q {qgraphFile} {pipelineOptions}'
runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask --long-log run -b {butlerConfig} --output-run {outCollection} --qgraph {fileDistributionEndPoint}/{qgraphFile} --qgraph-id {qgraphId} --qgraph-node-id {qgraphNodeId} --skip-init-writes --extend-run --clobber-outputs --skip-existing"

requestMemory: 2048
requestCpus: 1

#this is a series of setup commands preceding the actual core SW execution
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) --env S3_ENDPOINT_URL=${S3_ENDPOINT_URL} {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;cd /tmp;ls -a;setup lsst_distrib;pwd;python3 \${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py _cmd_line_ " >&2;'
wmsServiceClass: lsst.ctrl.bps.wms.panda.panda_service.PanDAService
clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clustering

# Template for bps filenames
submitPath: ${PWD}/submit/{outCollection}
qgraphFileTemplate: "{uniqProcName}.qgraph"
executionButlerTemplate: "{submitPath}/EXEC_REPO-{uniqProcName}"
subDirTemplate: "{label}/{tract}/{patch}/{visit.day_obs}/{exposure.day_obs}/{band}/{subfilter}/{physical_filter}/{visit}/{exposure}"
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}"

# Whether to output bps-specific intermediate files
saveDot: False
saveGenericWorkflow: False
saveClusteredQgraph: False

# Temporary backward-compatibility settings
useLazyCommands: True
useBpsShared: False
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ pipelineYaml: "${OBS_SUBARU_DIR}/pipelines/DRP.yaml#processCcd"

payload:
payloadName: pipelines_check
runInit: true
output: "u/{operator}/{payload_name}"
outCollection: "{output}/{timestamp}"
butlerConfig: s3://butler-us-central1-panda-dev/hsc/butler.yaml
inCollection: HSC/calib,HSC/raw/all,refcats
dataQuery: "tract = 9615 and patch=30 and detector IN (10..11) and instrument='HSC' and skymap='hsc_rings_v1' and band in ('r')"
output: u/{operator}/pcheck_ebutler

sw_image: "spodolsky/centos:7-stack-lsst_distrib-d_2021_09_06"
sw_image: "lsstsqre/centos:7-stack-lsst_distrib-w_2021_39"
fileDistributionEndPoint: "s3://butler-us-central1-panda-dev/hsc/{payload_folder}/{uniqProcName}/"
s3_endpoint_url: "https://storage.googleapis.com"
payload_folder: payload
runner_command: 'docker run --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) --env S3_ENDPOINT_URL=${S3_ENDPOINT_URL} {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;cd /tmp;ls -a;setup lsst_distrib;pwd;python3 \${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py _cmd_line_ " >&2;'
18 changes: 12 additions & 6 deletions python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ def replace_placeholders(cmd_line, tag, replancements):


def replace_environment_vars(cmd_line):
"""
Replaces placeholders to the actual environment variables.
""" Replaces placeholders to the actual environment variables.
Parameters
----------
cmd_line : `str`
Expand All @@ -46,8 +46,8 @@ def replace_environment_vars(cmd_line):


def replace_files_placeholders(cmd_line, files):
"""
Replaces placeholders for files.
"""Replaces placeholders for files.
Parameters
----------
cmd_line : `str`
Expand All @@ -74,8 +74,8 @@ def replace_files_placeholders(cmd_line, files):


def deliver_input_files(src_path, files, skip_copy):
"""
Delivers input files needed for a job
"""Delivers input files needed for a job
Parameters
----------
src_path : `str`
Expand Down Expand Up @@ -116,7 +116,13 @@ def deliver_input_files(src_path, files, skip_copy):
cmd_line = str(binascii.unhexlify(sys.argv[1]).decode())
data_params = sys.argv[2].split("+")
cmd_line = replace_environment_vars(cmd_line)

"""This call replaces the pipetask command line placeholders
with actual data provided in the script call
in form placeholder1:file1+placeholder2:file2:...
"""
cmd_line = replace_files_placeholders(cmd_line, sys.argv[4])

for key_value_pair in data_params[1:]:
(key, value) = key_value_pair.split(":")
cmd_line = cmd_line.replace("{" + key + "}", value)
Expand Down
9 changes: 4 additions & 5 deletions python/lsst/ctrl/bps/wms/panda/idds_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def __init__(self, bps_workflow, config):
self.tasks_steps = {}
self.tasks_cmd_lines = {}
self.dag_end_tasks = set()
self.computing_cloud = config.get("computing_cloud")
_, v = config.search("maxwalltime", opt={"default": 90000})
self.maxwalltime = v
_, v = config.search("maxattempt", opt={"default": 5})
Expand Down Expand Up @@ -160,12 +159,12 @@ def define_tasks(self):
== self.tasks_steps[task_name],
self.bps_workflow))
bps_node = self.bps_workflow.get_job(picked_job_name)
task.queue = bps_node.compute_site
task.queue = bps_node.queue
task.cloud = bps_node.compute_site
task.jobs_pseudo_inputs = list(jobs)
task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime
task.maxrss = bps_node.request_memory
task.cloud = self.computing_cloud
task.executable = self.tasks_cmd_lines[task_name]
task.files_used_by_task = self.fill_input_files(task_name)
task.is_final = False
Expand Down Expand Up @@ -197,7 +196,8 @@ def get_final_task(self):

task.step = final_job.label
task.name = self.define_task_name(final_job.label)
task.queue = final_job.compute_site
task.queue = final_job.queue
task.cloud = final_job.compute_site
task.jobs_pseudo_inputs = []

# This string implements empty pattern for dependencies
Expand All @@ -207,7 +207,6 @@ def get_final_task(self):
task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime
task.maxrss = final_job.request_memory
task.cloud = self.computing_cloud
task.files_used_by_task = [bash_file]
task.is_final = True
task.is_dag_end = False
Expand Down
2 changes: 0 additions & 2 deletions python/lsst/ctrl/bps/wms/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ def add_decoder_prefix(self, cmd_line, distribution_path, files):
----------
cmd_line : `str`
UTF-8 based functional part of the command line
distribution_path : `str`
URI of path where all files are located for distribution
files `list` [`str`]
File names needed for a task
Expand Down

0 comments on commit 679a4d4

Please sign in to comment.