Skip to content

Commit

Permalink
Merge pull request #84 from lsst/tickets/DM-32675
Browse files Browse the repository at this point in the history
Implemented requirements defined in DM-32675
  • Loading branch information
SergeyPod committed Dec 16, 2021
2 parents 1ed7923 + 371b9e3 commit fb400af
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 46 deletions.
2 changes: 1 addition & 1 deletion config/bps_idf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {runPreCmdOpts} run -b {butl


#this is a series of setup commands preceding the actual core SW execution
runnerCommand: 'logDir=/tmp/panda/${PANDAID}; mkdir ${logDir}; logFile=${logDir}/${REALTIME_LOGFILES}; touch ${logFile}; chmod ugo+w ${logFile}; ln -s ${logFile} ${PWD}/; ls -l ${PWD}/; docker run -v ${logFile}:/tmp/${REALTIME_LOGFILES} --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) --env S3_ENDPOINT_URL=${S3_ENDPOINT_URL} {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;cd /tmp;ls -al;setup lsst_distrib;pwd;python3 \${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py _cmd_line_ " >&2;'
runnerCommand: 'logDir=/tmp/panda/${PANDAID}; mkdir ${logDir}; logFile=${logDir}/${REALTIME_LOGFILES}; touch ${logFile}; chmod ugo+w ${logFile}; ln -s ${logFile} ${PWD}/; ls -l ${PWD}/; docker run --rm -v ${logFile}:/tmp/${REALTIME_LOGFILES} --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) --env S3_ENDPOINT_URL=${S3_ENDPOINT_URL} {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;cd /tmp;ls -al;setup lsst_distrib;pwd;python3 \${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py _cmd_line_ " >&2;'
wmsServiceClass: lsst.ctrl.bps.wms.panda.panda_service.PanDAService
4 changes: 4 additions & 0 deletions doc/changes/DM-32675.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* Large tasks (> 30k jobs) splitted into chunks
* Updated iDDS API usage for the most recent version
* Updated iDDS API initialization to force PanDA proxy using the IAM user name for submitted workflow
* Added limit on number of characters in the task pseudo inputs
128 changes: 105 additions & 23 deletions python/lsst/ctrl/bps/wms/panda/idds_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class RubinTask:
"""Computing queue where the task to be submitted"""
executable: str = None
"""The task command line to be executed"""
maxwalltime: int = None
max_walltime: int = None
"""Maximum allowed walltime in seconds"""
maxattempt: int = None
max_attempt: int = None
"""Maximum number of jobs attempts in a task"""
maxrss: int = None
max_rss: int = None
"""Maximum size of RAM to be used by a job"""
cloud: str = None
"""Computing cloud in CRIC registry where the task should
Expand Down Expand Up @@ -95,15 +95,13 @@ class IDDSWorkflowGenerator:
def __init__(self, bps_workflow, config):
self.bps_workflow = bps_workflow
self.bps_config = config
self.tasks_inputs = {}
self.jobs_steps = {}
self.tasks_steps = {}
self.tasks_cmd_lines = {}
self.dag_end_tasks = set()
_, v = config.search("maxwalltime", opt={"default": 90000})
self.maxwalltime = v
_, v = config.search("maxattempt", opt={"default": 5})
self.maxattempt = v
self.number_of_retries = {}
_, self.max_walltime = config.search("maxWalltime", opt={"default": 90000})
_, self.max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": 30000})

def define_task_name(self, step):
"""Return task name as a combination of the workflow name (unique
Expand Down Expand Up @@ -150,31 +148,112 @@ def define_tasks(self):
tasks = []
raw_dependency_map = self.create_raw_jobs_dependency_map()
tasks_dependency_map = self.split_map_over_tasks(raw_dependency_map)
for task_name, jobs in tasks_dependency_map.items():
tasks_dependency_map_chunked = self.split_tasks_into_chunks(tasks_dependency_map)
for task_name, jobs in tasks_dependency_map_chunked.items():
task = RubinTask()
task.step = task_name
task.name = task.step
picked_job_name = next(filter(
lambda job_name: self.bps_workflow.get_job(job_name).label
== self.tasks_steps[task_name],
self.bps_workflow))
== self.tasks_steps[task_name], self.bps_workflow))
bps_node = self.bps_workflow.get_job(picked_job_name)
task.queue = bps_node.queue
task.cloud = bps_node.compute_site
task.jobs_pseudo_inputs = list(jobs)
task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime
task.maxrss = bps_node.request_memory
task.max_attempt = self.number_of_retries.get(task_name, 3)
task.max_walltime = self.max_walltime
task.max_rss = bps_node.request_memory
task.executable = self.tasks_cmd_lines[task_name]
task.files_used_by_task = self.fill_input_files(task_name)
task.is_final = False
task.is_dag_end = self.tasks_steps[task_name] in self.dag_end_tasks
tasks.append(task)
self.add_dependencies(tasks, tasks_dependency_map)
self.add_dependencies(tasks, tasks_dependency_map_chunked)
final_task = self.get_final_task()
tasks.append(final_task)
return tasks

def split_tasks_into_chunks(self, tasks_dependency_map):
"""If a task is going to contain jobs whose number is above a
threshold this function splits such a large tasks into chunks.
Parameters
----------
tasks_dependency_map : `dict`
dependencies dictionary with task name in key and jobs
dependencies in values. The latter dict has a job input
parameters (PanDA pseudo file name) in the key and dict of
pairs (upstream task name) - (its PanDA pseudo file name)
which defines dependency for a job.
Returns
-------
tasks_dependency_map : `dict`
dependencies dictionary with chunked tasks where its needed.
"""
tasks_dependency_map_chunked = {}
tasks_chunked = {}

"""At this step only tasks names are updated to distribute
tasks over chunks
"""
for task_name, dependencies in tasks_dependency_map.items():
n_jobs_in_task = len(dependencies)
if n_jobs_in_task > self.max_jobs_per_task:
n_chunks = -(-n_jobs_in_task//self.max_jobs_per_task)
for pseudo_input, dependency in dependencies.items():
chunk_id = hash(pseudo_input) % n_chunks
task_name_chunked = self.get_task_name_with_chunk(task_name, chunk_id)
tasks_dependency_map_chunked.setdefault(task_name_chunked,
{})[pseudo_input] \
= dependency
self.tasks_steps[task_name_chunked] = \
self.tasks_steps[task_name]
self.tasks_cmd_lines[task_name_chunked] = \
self.tasks_cmd_lines[task_name]
tasks_chunked[task_name] = n_chunks
else:
tasks_dependency_map_chunked[task_name] = dependencies

"""This block propagates chunking over upstream dependencies
records.
"""
tasks_dependency_map_chunked_updated_dep = {}
for task, dependencies in tasks_dependency_map_chunked.items():
for pseudo_input, dependency in dependencies.items():
updated_dependencies = {}
for upstream_task_name, pseudo_inputs in dependency.items():
if upstream_task_name in tasks_chunked:
for upstream_pseudo_input in pseudo_inputs:
chunk_id = hash(upstream_pseudo_input) % tasks_chunked[upstream_task_name]
task_name_chunked = self.get_task_name_with_chunk(upstream_task_name, chunk_id)
chunked_task_name = task_name_chunked
updated_dependencies.setdefault(chunked_task_name, []).\
append(upstream_pseudo_input)
else:
updated_dependencies.setdefault(upstream_task_name, []).extend(pseudo_inputs)
tasks_dependency_map_chunked_updated_dep.setdefault(task, {}).setdefault(pseudo_input, {})\
.update(updated_dependencies)
return tasks_dependency_map_chunked_updated_dep

def get_task_name_with_chunk(self, task_name, chunk_id):
"""Concatenates file name and chunk ID
Parameters
----------
task_name : `str`
The name of the task
chunk_id : `int`
ID of the chunk
Returns
-------
task_name : `str`
Concatenated task name
"""
return f"{task_name}_chunk_{chunk_id}"

def get_final_task(self):
"""If final job exists in generic workflow, create DAG final task
Expand Down Expand Up @@ -204,9 +283,9 @@ def get_final_task(self):
task.dependencies = [{"name": "pure_pseudoinput+qgraphNodeId:+qgraphId:",
"submitted": False, "dependencies": []}]

task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime
task.maxrss = final_job.request_memory
task.max_attempt = self.number_of_retries.get(task.name, 3)
task.max_walltime = self.max_walltime
task.max_rss = final_job.request_memory
task.files_used_by_task = [bash_file]
task.is_final = True
task.is_dag_end = False
Expand Down Expand Up @@ -267,8 +346,13 @@ def create_raw_jobs_dependency_map(self):
cmd_line, pseudo_file_name = \
cmd_line_embedder.substitute_command_line(gwjob.executable.src_uri + ' ' + gwjob.arguments,
gwjob.cmdvals, job_name)
if len(pseudo_file_name) > 4000:
raise NameError('job pseudo input file name contains more than 4000 symbols. '
'Can not proceed.')

self.tasks_cmd_lines[self.define_task_name(gwjob.label)] = cmd_line
self.jobs_steps[pseudo_file_name] = gwjob.label
self.number_of_retries[self.define_task_name(gwjob.label)] = gwjob.number_of_retries
dependency_map[pseudo_file_name] = []
predecessors = self.bps_workflow.predecessors(job_name)
for parent_name in predecessors:
Expand Down Expand Up @@ -315,11 +399,9 @@ def split_map_over_tasks(self, raw_dependency_map):
"""
tasks_dependency_map = {}
for job, dependency in raw_dependency_map.items():
tasks_dependency_map.setdefault(self.define_task_name(self.jobs_steps[job]), {})[job] = \
self.split_dependencies_by_tasks(dependency)
self.tasks_inputs.setdefault(self.define_task_name(
self.get_task_by_job_name(job)), []).append(job)
self.tasks_steps[self.define_task_name(self.jobs_steps[job])] = self.jobs_steps[job]
task_name = self.define_task_name(self.jobs_steps[job])
tasks_dependency_map.setdefault(task_name, {})[job] = self.split_dependencies_by_tasks(dependency)
self.tasks_steps[task_name] = self.jobs_steps[job]
return tasks_dependency_map

def get_task_by_job_name(self, job_name):
Expand Down
30 changes: 8 additions & 22 deletions python/lsst/ctrl/bps/wms/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
from lsst.ctrl.bps.wms_service import BaseWmsWorkflow, BaseWmsService
from lsst.ctrl.bps.wms.panda.idds_tasks import IDDSWorkflowGenerator
from lsst.daf.butler import ButlerURI
from idds.workflow.workflow import Workflow as IDDS_client_workflow, AndCondition
from idds.doma.workflow.domapandawork import DomaPanDAWork
import idds.common.constants as idds_constants
from idds.workflowv2.workflow import Workflow as IDDS_client_workflow, AndCondition
from idds.doma.workflowv2.domapandawork import DomaPanDAWork
import idds.common.utils as idds_utils
import pandaclient.idds_api

Expand Down Expand Up @@ -120,7 +119,7 @@ def submit(self, workflow):
workflow : `lsst.ctrl.bps.BaseWorkflow`
A single PanDA iDDS workflow to submit
"""
idds_client_workflow = IDDS_client_workflow()
idds_client_workflow = IDDS_client_workflow(name=workflow.name)
files = self.copy_files_for_distribution(workflow.generated_tasks,
self.config['fileDistributionEndPoint'])
DAG_end_work = []
Expand All @@ -140,7 +139,7 @@ def submit(self, workflow):
task_log={"destination": "local", "value": "log.tgz", "dataset": "PandaJob_#{pandaid}/",
"token": "local", "param_type": "log", "type": "template"},
encode_command_line=True,
task_rss=task.maxrss,
task_rss=task.max_rss,
task_cloud=task.cloud,
)
idds_client_workflow.add_work(work)
Expand All @@ -155,24 +154,11 @@ def submit(self, workflow):
conditions.append(work.is_terminated)
and_cond = AndCondition(conditions=conditions, true_works=[DAG_final_work])
idds_client_workflow.add_condition(and_cond)

idds_request = {
'scope': 'workflow',
'name': workflow.name,
'requester': 'panda',
'request_type': idds_constants.RequestType.Workflow,
'transform_tag': 'workflow',
'status': idds_constants.RequestStatus.New,
'priority': 0,
'lifetime': 30,
'workload_id': idds_client_workflow.get_workload_id(),
'request_metadata': {'workload_id': idds_client_workflow.get_workload_id(),
'workflow': idds_client_workflow}
}
c = pandaclient.idds_api.get_api(idds_utils.json_dumps,
idds_host=self.config.get('iddsServer'), compress=True)
request_id = c.add_request(**idds_request)
_LOG.info("Submitted into iDDs with request id=%i", request_id)
idds_host=self.config.get('idds_server'),
compress=True, manager=True)
request_id = c.submit(idds_client_workflow, username=None, use_dataset_name=False)
_LOG.info("Submitted into iDDs with request id=%s", str(request_id))
workflow.run_id = request_id

@staticmethod
Expand Down

0 comments on commit fb400af

Please sign in to comment.