Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented passing memory requirements, removed mandatary queue spec #33

Merged
merged 1 commit into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions python/lsst/ctrl/bps/wms/panda/conf_example/HSC-PANDA.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
description: HSC-PANDA
instrument: lsst.obs.subaru.HyperSuprimeCam
imports:
-
location: "/opt/lsst/software/stack/stack/miniconda3-py38_4.9.2-0.6.0/Linux64/obs_subaru/21.0.0-33-g9da9dc4e+fb9fd5bfd4/pipelines/DRP.yaml"
exclude:
- fgcm
- jointcal
- skyCorr
tasks:
makeWarp:
class: lsst.pipe.tasks.makeCoaddTempExp.MakeWarpTask
config:
useGlobalExternalPhotoCalib: False
doApplyExternalPhotoCalib: False
doApplyExternalSkyWcs: False
doApplySkyCorr: False
assembleCoadd:
class: lsst.pipe.tasks.assembleCoadd.CompareWarpAssembleCoaddTask
config:
useGlobalExternalPhotoCalib: False
doApplyExternalPhotoCalib: False
doApplyExternalSkyWcs: False

47 changes: 36 additions & 11 deletions python/lsst/ctrl/bps/wms/panda/conf_example/example_panda.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
pipelineYaml: ${CTRL_BPS_DIR}/wms/panda/conf_example/HSC-PANDA.yaml
pipelineYaml: ${CTRL_BPS_DIR}/python/lsst/ctrl/bps/wms/panda/conf_example/HSC-PANDA.yaml
project: dev
campaign: "configuration_example"
submitPath: ${PWD}/submit/{outCollection}
submitPath: ${HOME}/submit/{outCollection}
container_obs_panda_edge_node_dir: /opt/lsst/software/stack/stack/miniconda3-py38_4.9.2-0.6.0/Linux64/ctrl_bps/21.0.0-18-gf2cd492+6c749b2ca5/python/lsst/ctrl/bps/wms/panda/edgenode
container_CTRL_MPEXEC_DIR: /opt/lsst/software/stack/stack/miniconda3-py38_4.9.2-0.6.0/Linux64/ctrl_mpexec/21.0.0-30-g82f2559+c64cb64c6b/


computing_queue: DOMA_LSST_GOOGLE_TEST
computing_queue_himem: DOMA_LSST_GOOGLE_TEST_HIMEM
himem_steps: ['makeWarp', 'assembleCoadd', 'deblend', 'measure', 'pipetaskInit']
computing_cloud: LSST
maxwalltime: 90000
requestMemory: 2000
maxattempt: 1

whenSaveJobQgraph: "NEVER"
Expand All @@ -23,18 +21,19 @@ payload:
output: "u/${USER}/pipelines_check"
outCollection: "{output}/{timestamp}"



# tracts (smallest to largest): 9615, 9697, 9813
#dataQuery: "tract = 9813 and instrument='HSC' and skymap='hsc_rings_v1'"
#dataQuery: "tract = 9615 and patch=30 and instrument='HSC' and skymap='hsc_rings_v1'"

#Small (~1000 jobs) workflow
#dataQuery: "tract = 9615 and patch=30 and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'"

#Very small (~150 jobs) workflow
dataQuery: "tract = 9615 and patch=30 and detector IN (10..20) and instrument='HSC' and skymap='hsc_rings_v1' and band in ('r', 'i')"

#Very small (~2 jobs) workflow
#dataQuery: "tract = 9615 and patch=30 and detector IN (10..11) and instrument='HSC' and skymap='hsc_rings_v1' and band in ('r')"


# ~30k quanta
#dataQuery: "tract = 9615 and detector IN (0..103) and detector != 9 and instrument='HSC' and skymap='hsc_rings_v1'"
# ~150k quanta
Expand All @@ -49,12 +48,38 @@ payload:


pipetask:

pipetaskInit:
runQuantumCommand: "{container_CTRL_MPEXEC_DIR}/bin/pipetask run -b {butlerConfig} -i {inCollection} --output {output} --output-run {outCollection} --init-only --register-dataset-types --qgraph {bucket}/{payload_folder}/{uniqProcName}/${{filename}} --clobber-partial-outputs --no-versions --skip-existing"
measure:
requestMemory: 8129
forcedPhotCcd:
requestMemory: 8192
forcedPhotCoadd:
requestMemory: 8192
mergeMeasurements:
requestMemory: 4096
writeObjectTable:
requestMemory: 20000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml
consolidateObjectTable:
requestMemory: 20000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step3.yaml
assembleCoadd:
requestMemory: 8
requestMemory: 8192
makeWarp:
requestMemory: 20000 #reduced from 85000: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml
jointcal:
requestMemory: 20000 #reduced from 49152: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step2.yaml
deblend:
requestMemory: 3000
skyCorr:
requestMemory: 11500
fgcmBuildStarsTable:
requestMemory: 8192
fgcmFitCycle:
requestMemory: 20000 #reduced from 24576: https://github.com/lsst-dm/bps-gen3-rc2/blob/master/bps-rc2_hsc_step1.yaml
fgcmOutputProducts:
requestMemory: 8192

submitPath: ${PWD}/submit/{outCollection}
saveDot: False
requestCpus: 1
wmsServiceClass: lsst.ctrl.bps.wms.panda.panda_service.PanDAService
Expand Down
18 changes: 11 additions & 7 deletions python/lsst/ctrl/bps/wms/panda/idds_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ class RubinTask:
step: str = None
queue: str = None
executable: str = None
maxwalltime: int = None
maxattempt: int = None
maxwalltime: int = None # Maximum allowed walltime in seconds
maxattempt: int = None # Maximum number of jobs attempts in a task
maxrss: int = None # Maximum size of RAM to be used by a job
cloud: str = None
lfns: list = None
local_pfns: list = None
dependencies: list = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a brief description to each field of this data class? (It would be nice if the descriptions for maxwalltime and maxrss include information about the units they use, e.g., if the walltime needs to be specified in seconds, minutes, etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've added the suggested comment.

Expand All @@ -42,9 +44,7 @@ def __init__(self, bps_workflow, config):
self.tasks_inputs = {}
self.jobs_steps = {}
self.tasks_steps = {}
self.himem_tasks = set(config.get("himem_steps"))
self.computing_queue = config.get("computing_queue")
self.computing_queue_himem = config.get("computing_queue_himem")
self.computing_cloud = config.get("computing_cloud")
self.qgraph_file = os.path.basename(config['bps_defined']['run_qgraph_file'])
_, v = config.search("maxwalltime", opt={"default": 90000})
self.maxwalltime = v
Expand Down Expand Up @@ -96,11 +96,15 @@ def define_tasks(self):
task = RubinTask()
task.step = task_name
task.name = task.step
task.queue = self.computing_queue_himem if self.tasks_steps[task_name] \
in self.himem_tasks else self.computing_queue
bps_node = next(filter(lambda x: x['job'].label == self.tasks_steps[task_name],
self.bps_workflow.nodes.values()))['job']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I got it right it looks like, for each task, you iterate over all nodes of the generic workflow to find a node with a job having a specific label. Is there a reason why you can't create and use some kind of a "lookup table" (for example, a dictionary mapping task_steps keys onto these nodes)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter provides the iterator and the values are computed lazy. This is why the next function will lead to finding only a single bps workflow node per task. The lookup table could do the same, I agree, but will require an extra variable. I would leave as it is now, if it is OK with you. Thank you!


task.queue = bps_node.compute_site
task.lfns = list(jobs)
task.maxattempt = self.maxattempt
task.maxwalltime = self.maxwalltime
task.maxrss = bps_node.request_memory
task.cloud = self.computing_cloud

# We take the commandline only from the first job because PanDA uses late binding and
# command line for each job in task is equal to each other in exception to the processing
Expand Down
4 changes: 3 additions & 1 deletion python/lsst/ctrl/bps/wms/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def submit(self, workflow):
task_queue=task.queue,
task_log={"destination": "local", "value": "log.tgz", "dataset": "PandaJob_#{pandaid}/",
"token": "local", "param_type": "log", "type": "template"},
encode_command_line=True
encode_command_line=True,
task_rss=task.maxrss,
task_cloud=task.cloud,
)
idds_client_workflow.add_work(work)
idds_request = {
Expand Down