Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented requirements defined in DM-32675 #84

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/bps_idf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ runQuantumCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask {runPreCmdOpts} run -b {butl


#this is a series of setup commands preceding the actual core SW execution
runnerCommand: 'logDir=/tmp/panda/${PANDAID}; mkdir ${logDir}; logFile=${logDir}/${REALTIME_LOGFILES}; touch ${logFile}; chmod ugo+w ${logFile}; ln -s ${logFile} ${PWD}/; ls -l ${PWD}/; docker run -v ${logFile}:/tmp/${REALTIME_LOGFILES} --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) --env S3_ENDPOINT_URL=${S3_ENDPOINT_URL} {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;cd /tmp;ls -al;setup lsst_distrib;pwd;python3 \${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py _cmd_line_ " >&2;'
runnerCommand: 'logDir=/tmp/panda/${PANDAID}; mkdir ${logDir}; logFile=${logDir}/${REALTIME_LOGFILES}; touch ${logFile}; chmod ugo+w ${logFile}; ln -s ${logFile} ${PWD}/; ls -l ${PWD}/; docker run --rm -v ${logFile}:/tmp/${REALTIME_LOGFILES} --network host --privileged --env AWS_ACCESS_KEY_ID=$(</credentials/AWS_ACCESS_KEY_ID) --env AWS_SECRET_ACCESS_KEY=$(</credentials/AWS_SECRET_ACCESS_KEY) --env PGPASSWORD=$(</credentials/PGPASSWORD) --env S3_ENDPOINT_URL=${S3_ENDPOINT_URL} {sw_image} /bin/bash -c "source /opt/lsst/software/stack/loadLSST.bash;cd /tmp;ls -al;setup lsst_distrib;pwd;python3 \${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py _cmd_line_ " >&2;'
wmsServiceClass: lsst.ctrl.bps.wms.panda.panda_service.PanDAService
4 changes: 4 additions & 0 deletions doc/changes/DM-32675.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* Large tasks (> 30k jobs) splitted into chunks
* Updated iDDS API usage for the most recent version
* Updated iDDS API initialization to force PanDA proxy using the IAM user name for submitted workflow
* Added limit on number of characters in the task pseudo inputs
128 changes: 105 additions & 23 deletions python/lsst/ctrl/bps/wms/panda/idds_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class RubinTask:
"""Computing queue where the task to be submitted"""
executable: str = None
"""The task command line to be executed"""
maxwalltime: int = None
max_walltime: int = None
"""Maximum allowed walltime in seconds"""
maxattempt: int = None
max_attempt: int = None
"""Maximum number of jobs attempts in a task"""
maxrss: int = None
max_rss: int = None
"""Maximum size of RAM to be used by a job"""
cloud: str = None
"""Computing cloud in CRIC registry where the task should
Expand Down Expand Up @@ -95,15 +95,13 @@ class IDDSWorkflowGenerator:
def __init__(self, bps_workflow, config):
self.bps_workflow = bps_workflow
self.bps_config = config
self.tasks_inputs = {}
self.jobs_steps = {}
self.tasks_steps = {}
self.tasks_cmd_lines = {}
self.dag_end_tasks = set()
_, v = config.search("maxwalltime", opt={"default": 90000})
self.maxwalltime = v
_, v = config.search("maxattempt", opt={"default": 5})
self.maxattempt = v
self.number_of_retries = {}
_, self.max_walltime = config.search("maxWalltime", opt={"default": 90000})
_, self.max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": 30000})

def define_task_name(self, step):
"""Return task name as a combination of the workflow name (unique
Expand Down Expand Up @@ -150,31 +148,112 @@ def define_tasks(self):
tasks = []
raw_dependency_map = self.create_raw_jobs_dependency_map()
tasks_dependency_map = self.split_map_over_tasks(raw_dependency_map)
for task_name, jobs in tasks_dependency_map.items():
tasks_dependency_map_chunked = self.split_tasks_into_chunks(tasks_dependency_map)
for task_name, jobs in tasks_dependency_map_chunked.items():
task = RubinTask()
task.step = task_name
task.name = task.step
picked_job_name = next(filter(
lambda job_name: self.bps_workflow.get_job(job_name).label
== self.tasks_steps[task_name],
self.bps_workflow))
== self.tasks_steps[task_name], self.bps_workflow))
bps_node = self.bps_workflow.get_job(picked_job_name)
task.queue = bps_node.queue
task.cloud = bps_node.compute_site
task.jobs_pseudo_inputs = list(jobs)
task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime
task.maxrss = bps_node.request_memory
task.max_attempt = self.number_of_retries.get(task_name, 3)
task.max_walltime = self.max_walltime
task.max_rss = bps_node.request_memory
task.executable = self.tasks_cmd_lines[task_name]
task.files_used_by_task = self.fill_input_files(task_name)
task.is_final = False
task.is_dag_end = self.tasks_steps[task_name] in self.dag_end_tasks
tasks.append(task)
self.add_dependencies(tasks, tasks_dependency_map)
self.add_dependencies(tasks, tasks_dependency_map_chunked)
final_task = self.get_final_task()
tasks.append(final_task)
return tasks

def split_tasks_into_chunks(self, tasks_dependency_map):
"""If a task is going to contain jobs whose number is above a
threshold this function splits such a large tasks into chunks.

Parameters
----------
tasks_dependency_map : `dict`
dependencies dictionary with task name in key and jobs
dependencies in values. The latter dict has a job input
parameters (PanDA pseudo file name) in the key and dict of
pairs (upstream task name) - (its PanDA pseudo file name)
which defines dependency for a job.

Returns
-------
tasks_dependency_map : `dict`
dependencies dictionary with chunked tasks where its needed.
"""
tasks_dependency_map_chunked = {}
tasks_chunked = {}

"""At this step only tasks names are updated to distribute
tasks over chunks
"""
for task_name, dependencies in tasks_dependency_map.items():
n_jobs_in_task = len(dependencies)
if n_jobs_in_task > self.max_jobs_per_task:
n_chunks = -(-n_jobs_in_task//self.max_jobs_per_task)
for pseudo_input, dependency in dependencies.items():
chunk_id = hash(pseudo_input) % n_chunks
SergeyPod marked this conversation as resolved.
Show resolved Hide resolved
task_name_chunked = self.get_task_name_with_chunk(task_name, chunk_id)
tasks_dependency_map_chunked.setdefault(task_name_chunked,
{})[pseudo_input] \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since using task_name + "chunk" + str(chunk_id) in multiple places, please make it a variable or a function so if we need to change it we can safely do it in one place. Could use f-string. If zero-padding the chunk_id will make them show up in an order that would be helpful to someone monitoring the run, please consider doing that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

= dependency
self.tasks_steps[task_name_chunked] = \
self.tasks_steps[task_name]
self.tasks_cmd_lines[task_name_chunked] = \
self.tasks_cmd_lines[task_name]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No updates for self.tasks_inputs? Are those actually used anywhere? If not, they should be removed from the rest of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed this data structure, was an artefact

tasks_chunked[task_name] = n_chunks
else:
tasks_dependency_map_chunked[task_name] = dependencies

"""This block propagates chunking over upstream dependencies
records.
"""
tasks_dependency_map_chunked_updated_dep = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need comment here briefly describing next block of code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for task, dependencies in tasks_dependency_map_chunked.items():
for pseudo_input, dependency in dependencies.items():
updated_dependencies = {}
for upstream_task_name, pseudo_inputs in dependency.items():
if upstream_task_name in tasks_chunked:
for upstream_pseudo_input in pseudo_inputs:
chunk_id = hash(upstream_pseudo_input) % tasks_chunked[upstream_task_name]
task_name_chunked = self.get_task_name_with_chunk(upstream_task_name, chunk_id)
chunked_task_name = task_name_chunked
updated_dependencies.setdefault(chunked_task_name, []).\
append(upstream_pseudo_input)
else:
updated_dependencies.setdefault(upstream_task_name, []).extend(pseudo_inputs)
tasks_dependency_map_chunked_updated_dep.setdefault(task, {}).setdefault(pseudo_input, {})\
.update(updated_dependencies)
return tasks_dependency_map_chunked_updated_dep

def get_task_name_with_chunk(self, task_name, chunk_id):
"""Concatenates file name and chunk ID

Parameters
----------
task_name : `str`
The name of the task

chunk_id : `int`
ID of the chunk

Returns
-------
task_name : `str`
Concatenated task name
"""
return f"{task_name}_chunk_{chunk_id}"

def get_final_task(self):
"""If final job exists in generic workflow, create DAG final task

Expand Down Expand Up @@ -204,9 +283,9 @@ def get_final_task(self):
task.dependencies = [{"name": "pure_pseudoinput+qgraphNodeId:+qgraphId:",
"submitted": False, "dependencies": []}]

task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime
task.maxrss = final_job.request_memory
task.max_attempt = self.number_of_retries.get(task.name, 3)
task.max_walltime = self.max_walltime
task.max_rss = final_job.request_memory
task.files_used_by_task = [bash_file]
task.is_final = True
task.is_dag_end = False
Expand Down Expand Up @@ -267,8 +346,13 @@ def create_raw_jobs_dependency_map(self):
cmd_line, pseudo_file_name = \
cmd_line_embedder.substitute_command_line(gwjob.executable.src_uri + ' ' + gwjob.arguments,
gwjob.cmdvals, job_name)
if len(pseudo_file_name) > 4000:
raise NameError('job pseudo input file name contains more than 4000 symbols. '
'Can not proceed.')

self.tasks_cmd_lines[self.define_task_name(gwjob.label)] = cmd_line
self.jobs_steps[pseudo_file_name] = gwjob.label
self.number_of_retries[self.define_task_name(gwjob.label)] = gwjob.number_of_retries
dependency_map[pseudo_file_name] = []
predecessors = self.bps_workflow.predecessors(job_name)
for parent_name in predecessors:
Expand Down Expand Up @@ -315,11 +399,9 @@ def split_map_over_tasks(self, raw_dependency_map):
"""
tasks_dependency_map = {}
for job, dependency in raw_dependency_map.items():
tasks_dependency_map.setdefault(self.define_task_name(self.jobs_steps[job]), {})[job] = \
self.split_dependencies_by_tasks(dependency)
self.tasks_inputs.setdefault(self.define_task_name(
self.get_task_by_job_name(job)), []).append(job)
self.tasks_steps[self.define_task_name(self.jobs_steps[job])] = self.jobs_steps[job]
task_name = self.define_task_name(self.jobs_steps[job])
tasks_dependency_map.setdefault(task_name, {})[job] = self.split_dependencies_by_tasks(dependency)
self.tasks_steps[task_name] = self.jobs_steps[job]
return tasks_dependency_map

def get_task_by_job_name(self, job_name):
Expand Down
30 changes: 8 additions & 22 deletions python/lsst/ctrl/bps/wms/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
from lsst.ctrl.bps.wms_service import BaseWmsWorkflow, BaseWmsService
from lsst.ctrl.bps.wms.panda.idds_tasks import IDDSWorkflowGenerator
from lsst.daf.butler import ButlerURI
from idds.workflow.workflow import Workflow as IDDS_client_workflow, AndCondition
from idds.doma.workflow.domapandawork import DomaPanDAWork
import idds.common.constants as idds_constants
from idds.workflowv2.workflow import Workflow as IDDS_client_workflow, AndCondition
from idds.doma.workflowv2.domapandawork import DomaPanDAWork
import idds.common.utils as idds_utils
import pandaclient.idds_api

Expand Down Expand Up @@ -120,7 +119,7 @@ def submit(self, workflow):
workflow : `lsst.ctrl.bps.BaseWorkflow`
A single PanDA iDDS workflow to submit
"""
idds_client_workflow = IDDS_client_workflow()
idds_client_workflow = IDDS_client_workflow(name=workflow.name)
files = self.copy_files_for_distribution(workflow.generated_tasks,
self.config['fileDistributionEndPoint'])
DAG_end_work = []
Expand All @@ -140,7 +139,7 @@ def submit(self, workflow):
task_log={"destination": "local", "value": "log.tgz", "dataset": "PandaJob_#{pandaid}/",
"token": "local", "param_type": "log", "type": "template"},
encode_command_line=True,
task_rss=task.maxrss,
task_rss=task.max_rss,
task_cloud=task.cloud,
)
idds_client_workflow.add_work(work)
Expand All @@ -155,24 +154,11 @@ def submit(self, workflow):
conditions.append(work.is_terminated)
and_cond = AndCondition(conditions=conditions, true_works=[DAG_final_work])
idds_client_workflow.add_condition(and_cond)

idds_request = {
'scope': 'workflow',
'name': workflow.name,
'requester': 'panda',
'request_type': idds_constants.RequestType.Workflow,
'transform_tag': 'workflow',
'status': idds_constants.RequestStatus.New,
'priority': 0,
'lifetime': 30,
'workload_id': idds_client_workflow.get_workload_id(),
'request_metadata': {'workload_id': idds_client_workflow.get_workload_id(),
'workflow': idds_client_workflow}
}
c = pandaclient.idds_api.get_api(idds_utils.json_dumps,
idds_host=self.config.get('iddsServer'), compress=True)
request_id = c.add_request(**idds_request)
_LOG.info("Submitted into iDDs with request id=%i", request_id)
idds_host=self.config.get('idds_server'),
compress=True, manager=True)
request_id = c.submit(idds_client_workflow, username=None, use_dataset_name=False)
SergeyPod marked this conversation as resolved.
Show resolved Hide resolved
_LOG.info("Submitted into iDDs with request id=%s", str(request_id))
workflow.run_id = request_id

@staticmethod
Expand Down