Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented a single qgraph file per workflow #30

Merged
merged 4 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/lsst/ctrl/bps/wms/panda/edgenode/cmd_line_decoder.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,9 @@
import sys
import binascii
cmdline = str(binascii.unhexlify(sys.argv[1]).decode())
cmdline = cmdline.replace("${IN/L}", sys.argv[2])
dataparams = sys.argv[2].split("+")
cmdline = cmdline.replace("${filename}", dataparams[0])
if len(dataparams) > 2:
cmdline = cmdline.replace("${qgraph-id}", dataparams[1])
cmdline = cmdline.replace("${qgraph-node-id}", dataparams[2])
print(cmdline)
1 change: 1 addition & 0 deletions python/lsst/ctrl/bps/wms/panda/edgenode/sw_runner
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# This is a starter script needed to initialize basic Rubin software environment inside the container and execute
# the actual command line after decoding from hexed strings
cd /tmp;
ls /;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking that this was intended to be committed (as opposed to accidental committing of a temporary debugging line).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this line is removed.

export HOME=/tmp;
export S3_ENDPOINT_URL=https://storage.googleapis.com
export AWS_ACCESS_KEY_ID=$(</cloudcreds/SINGULARITYENV_AWS_ACCESS_KEY_ID)
Expand Down
71 changes: 50 additions & 21 deletions python/lsst/ctrl/bps/wms/panda/idds_tasks.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""

"""
import os.path
from dataclasses import dataclass


Expand Down Expand Up @@ -39,10 +40,12 @@ def __init__(self, bps_workflow, config):
self.bps_workflow = bps_workflow
self.bps_config = config
self.tasks_inputs = {}
self.jobs_steps = {}
self.tasks_steps = {}
self.himem_tasks = set(config.get("himem_steps"))
self.computing_queue = config.get("computing_queue")
self.computing_queue_himem = config.get("computing_queue_himem")

self.qgraph_file = os.path.basename(config['bps_defined']['run_qgraph_file'])
_, v = config.search("maxwalltime", opt={"default": 90000})
self.maxwalltime = v
_, v = config.search("maxattempt", opt={"default": 5})
Expand Down Expand Up @@ -89,21 +92,20 @@ def define_tasks(self):
tasks_dependency_map = self.split_map_over_tasks(raw_dependency_map)
init_task_cmd_line = self.bps_workflow.nodes['pipetaskInit']['job'].cmdline
other_task_cmd_line = self.pick_non_init_cmdline()

for task_step, jobs in tasks_dependency_map.items():
for task_name, jobs in tasks_dependency_map.items():
task = RubinTask()
task.step = task_step
task.name = self.define_task_name(task.step)
task.queue = self.computing_queue_himem if task_step in self.himem_tasks else self.computing_queue
task.step = task_name
task.name = task.step
task.queue = self.computing_queue_himem if self.tasks_steps[task_name] \
in self.himem_tasks else self.computing_queue
task.lfns = list(jobs)
task.local_pfns = self.tasks_inputs[task.name]
task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime

# We take the commandline only from the first job because PanDA uses late binding and
# command line for each job in task is equal to each other in exception to the processing
# file name which is substituted by PanDA
if task_step == 'pipetaskInit':
if self.tasks_steps[task_name] == 'pipetaskInit':
task.executable = init_task_cmd_line
else:
task.executable = other_task_cmd_line
Expand All @@ -128,17 +130,19 @@ def add_dependencies(self, tasks, tasks_dependency_map):
"""
for task in tasks:
jobs = tasks_dependency_map[task.step]
dependencies = []
task.dependencies = []
for job, job_dependency in jobs.items():
job_dep = {
"name": job,
"submitted": False,
"dependencies": [{"task": dependency[0], "inputname":
dependency[1][0], "available": False} for dependency in
job_dependency.items()]
}
dependencies.append(job_dep)
task.dependencies = dependencies
input_files_dependencies = []
for taskname, files in job_dependency.items():
for file in files:
input_files_dependencies.append({"task": taskname,
"inputname": file, "available": False})
job_dep["dependencies"] = input_files_dependencies
task.dependencies.append(job_dep)

def create_raw_jobs_dependency_map(self):
""" Compute the DAG nodes dependency map (node - list of nodes) for each node in the workflow DAG
Expand All @@ -150,12 +154,17 @@ def create_raw_jobs_dependency_map(self):
"""
dependency_map = {}
for edge in self.bps_workflow.in_edges():
dependency_map.setdefault(edge[1], []).append(edge[0])
dependency_map.setdefault(self.create_pseudo_input_file_name(edge[1]), []).\
append(self.create_pseudo_input_file_name(edge[0]))
self.jobs_steps[self.create_pseudo_input_file_name(edge[1])] = \
self.bps_workflow.get_job(edge[1]).label
all_nodes = list(self.bps_workflow.nodes())
nodes_from_edges = set(list(dependency_map.keys()))
extra_nodes = [node for node in all_nodes if node not in nodes_from_edges]
for node in extra_nodes:
dependency_map.setdefault(node, [])
dependency_map.setdefault(self.create_pseudo_input_file_name(node), [])
self.jobs_steps[self.create_pseudo_input_file_name(node)] = \
self.bps_workflow.get_job(node).label
return dependency_map

def split_map_over_tasks(self, raw_dependency_map):
Expand All @@ -181,12 +190,11 @@ def split_map_over_tasks(self, raw_dependency_map):
"""
tasks_dependency_map = {}
for job, dependency in raw_dependency_map.items():
file_name = self.get_input_file(job)
file_local_src = self.bps_workflow.nodes.get(job).get("inputs")[file_name].src_uri
tasks_dependency_map.setdefault(self.get_task_by_job_name(job), {})[file_name] = \
tasks_dependency_map.setdefault(self.define_task_name(self.jobs_steps[job]), {})[job] = \
self.split_dependencies_by_tasks(dependency)
self.tasks_inputs.setdefault(self.define_task_name(
self.get_task_by_job_name(job)), []).append(file_local_src)
self.get_task_by_job_name(job)), []).append(job)
self.tasks_steps[self.define_task_name(self.jobs_steps[job])] = self.jobs_steps[job]
return tasks_dependency_map

def get_task_by_job_name(self, job_name):
Expand All @@ -209,7 +217,7 @@ def split_dependencies_by_tasks(self, dependencies):
dependencies_by_tasks = {}
for dependency in dependencies:
dependencies_by_tasks.setdefault(self.define_task_name(
self.get_task_by_job_name(dependency)), []).append(self.get_input_file(dependency))
self.jobs_steps[dependency]), []).append(dependency)
return dependencies_by_tasks

def get_input_file(self, job_name):
Expand All @@ -225,3 +233,24 @@ def get_input_file(self, job_name):
quantum graph file name
"""
return next(iter(self.bps_workflow.nodes.get(job_name).get("inputs")))

def create_pseudo_input_file_name(self, job_name):
""" Creates the pseudo input file name to provide exact location of data to be processed in terms
of pickle file and node

Parameters
----------
job_name: `str`
the name of the node in workflow DAG

Returns
-------
pseudo input file name
"""
qgraph_node_ids = self.bps_workflow.nodes.get(job_name).get("job").qgraph_node_ids
if qgraph_node_ids:
pseudo_input_file_name = self.qgraph_file+"+"+job_name + "+" + qgraph_node_ids[0].buildId + \
"+" + str(qgraph_node_ids[0].number)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed this was fine for this ticket, but just pointing it out for future. By limiting the node_ids to a single value, this code works today but definitely will not work as soon as a single GenericWorkflow job executes more than one Quantum.

else:
pseudo_input_file_name = self.qgraph_file+"+"+job_name
return pseudo_input_file_name
35 changes: 25 additions & 10 deletions python/lsst/ctrl/bps/wms/panda/panda_service.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import os
import logging
import binascii
import concurrent.futures

from lsst.ctrl.bps.wms_service import BaseWmsWorkflow, BaseWmsService
from lsst.ctrl.bps.wms.panda.idds_tasks import IDDSWorkflowGenerator
from lsst.daf.butler import ButlerURI
from idds.workflow.workflow import Workflow as IDDS_client_workflow
from idds.doma.workflow.domalsstwork import DomaLSSTWork
from idds.doma.workflow.domapandawork import DomaPanDAWork
import idds.common.constants as idds_constants
import idds.common.utils as idds_utils
import pandatools.idds_api
Expand Down Expand Up @@ -112,19 +113,22 @@ def submit(self, workflow):
idds_client_workflow = IDDS_client_workflow()

for idx, task in enumerate(workflow.generated_tasks):
work = DomaLSSTWork(
work = DomaPanDAWork(
executable=self.add_decoder_prefix(task.executable),
primary_input_collection={'scope': 'pseudo_dataset',
'name': 'pseudo_input_collection#' + str(idx)},
output_collections=[{'scope': 'pseudo_dataset',
'name': 'pseudo_output_collection#' + str(idx)}],
log_collections=[], dependency_map=task.dependencies,
task_name=task.name,
task_queue=task.queue)
task_queue=task.queue,
task_log={"destination": "local", "value": "log.tgz", "dataset": "PandaJob_#{pandaid}/",
"token": "local", "param_type": "log", "type": "template"}
)
idds_client_workflow.add_work(work)
idds_request = {
'scope': 'workflow',
'name': idds_client_workflow.get_name(),
'name': workflow.name,
'requester': 'panda',
'request_type': idds_constants.RequestType.Workflow,
'transform_tag': 'workflow',
Expand All @@ -137,9 +141,10 @@ def submit(self, workflow):
}
primary_init_work = idds_client_workflow.get_primary_initial_collection()
if primary_init_work:
idds_request['scope'] = primary_init_work['scope']
idds_request['name'] = primary_init_work['name']
c = pandatools.idds_api.get_api(idds_utils.json_dumps)
idds_request['scope'] = primary_init_work.scope

c = pandatools.idds_api.get_api(idds_utils.json_dumps,
idds_host=self.config.get('idds_server'), compress=True)
request_id = c.add_request(**idds_request)
_LOG.info("Submitted into iDDs with request id=%i", request_id)
workflow.run_id = request_id
Expand Down Expand Up @@ -196,8 +201,7 @@ def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_cla
idds_workflow.generated_tasks = workflow_generator.define_tasks()
cloud_prefix = config['bucket'] + '/' + \
config['payload_folder'] + '/' + config['workflowName'] + '/'
for task in idds_workflow.generated_tasks:
cls.copy_pickles_into_cloud(task.local_pfns, cloud_prefix)
cls.copy_pickles_into_cloud([config['bps_defined']['run_qgraph_file']], cloud_prefix)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very fragile code as opposed to checking the inputs for the jobs. I expect that this section will be revisited in the upcoming execution butler work. We should talk then to figure out if there are GenericWorkflow issues that need to be fixed so that this can be written more generically.

_LOG.debug("panda dag attribs %s", generic_workflow.run_attrs)
return idds_workflow

Expand All @@ -213,11 +217,22 @@ def copy_pickles_into_cloud(local_pfns, cloud_prefix):
cloud_prefix: `str`
Path on the cloud, including access protocol, bucket name to place files
"""

copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
future_file_copy = []
for src_path in local_pfns:
src = ButlerURI(src_path)
target_base_uri = ButlerURI(cloud_prefix)

# S3 clients explicitly instantiate here to overpass this
# https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
target_base_uri.exists()
src.exists()

target = target_base_uri.join(os.path.basename(src_path))
target.transfer_from(src)
future_file_copy.append(copy_executor.submit(target.transfer_from, src))
for future in concurrent.futures.as_completed(future_file_copy):
future.result()

def write(self, out_prefix):
"""
Expand Down