From 4a20876182fe9724dace0a14a189e29ad727b3c7 Mon Sep 17 00:00:00 2001 From: Charles Cowart Date: Mon, 4 Nov 2024 11:52:30 -0800 Subject: [PATCH] Updates based on production testing --- sequence_processing_pipeline/Job.py | 37 +++++++++++++------ .../templates/nuqc_job.sh | 1 + 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/sequence_processing_pipeline/Job.py b/sequence_processing_pipeline/Job.py index b650543e..fea3b6bf 100644 --- a/sequence_processing_pipeline/Job.py +++ b/sequence_processing_pipeline/Job.py @@ -9,6 +9,7 @@ import logging from inspect import stack import re +from collections import Counter, defaultdict class Job: @@ -263,24 +264,36 @@ def query_slurm(job_ids): jobs[unique_id] = state if unique_id != job_id: - child_jobs[unique_id] = job_id # job is a child job + # job is a child job + child_jobs[unique_id] = [job_id, state] return jobs, child_jobs while True: jobs, child_jobs = query_slurm(job_ids) - for jid in job_ids: - logging.debug("JOB %s: %s" % (jid, jobs[jid])) - if callback is not None: - callback(jid=jid, status=jobs[jid]) - - children = [x for x in child_jobs if child_jobs[x] == jid] - if len(children) == 0: - logging.debug("\tNO CHILDREN") - for cid in children: - logging.debug("\tCHILD JOB %s: %s" % (cid, jobs[cid])) - status = [jobs[x] in Job.slurm_status_not_running for x in job_ids] + status = [] + status_display = defaultdict(list) + + # check main jobs + for job in job_ids: + status.append(jobs[job] in Job.slurm_status_not_running) + status_display[job].append(jobs[job]) + + # check children + for child_job, (parent_job, state) in child_jobs.items(): + status_display[parent_job].append(state) + status.append(state in Job.slurm_status_not_running) + + if callback is not None: + status_text = [] + for parent_job, state in status_display.items(): + msg = ', '.join([f'{x}: {y}' for x, y in + Counter(state).most_common()]) + status_text.append(f'{parent_job}: {msg}') + + # job_id is contained in status_text so jid='' is fine. + callback(jid='', status=' -- '.join(status_text)) if set(status) == {True}: # all jobs either completed successfully or terminated. diff --git a/sequence_processing_pipeline/templates/nuqc_job.sh b/sequence_processing_pipeline/templates/nuqc_job.sh index 363b45f1..7ada81f4 100644 --- a/sequence_processing_pipeline/templates/nuqc_job.sh +++ b/sequence_processing_pipeline/templates/nuqc_job.sh @@ -12,6 +12,7 @@ ### Commented out for now, but there is a possibility it will be needed ### in the future. ###SBATCH --gres=node_jobs:{{gres_value}} +#SBATCH --constraint="amd" echo "---------------"