Skip to content

Commit

Permalink
fix: fix type error in job status checking if sacct fails
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneskoester committed Feb 24, 2024
1 parent 7e3de33 commit 6a197ae
Showing 1 changed file with 48 additions and 45 deletions.
93 changes: 48 additions & 45 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ async def check_active_jobs(

active_jobs_ids = {job_info.external_jobid for job_info in active_jobs}
active_jobs_seen_by_sacct = set()
missing_sacct_status = set()

# We use this sacct syntax for argument 'starttime' to keep it compatible
# with slurm < 20.11
Expand Down Expand Up @@ -245,53 +246,55 @@ async def check_active_jobs(
self.logger.debug(f"missing_sacct_status are: {missing_sacct_status}")
if not missing_sacct_status:
break
if i >= status_attempts - 1:
self.logger.warning(
f"Unable to get the status of all active_jobs that should be "
f"in slurmdbd, even after {status_attempts} attempts.\n"
f"The jobs with the following slurm job ids were previously seen "
"by sacct, but sacct doesn't report them any more:\n"
f"{missing_sacct_status}\n"
f"Please double-check with your slurm cluster administrator, that "
"slurmdbd job accounting is properly set up.\n"
)

any_finished = False
for j in active_jobs:
# the job probably didn't make it into slurmdbd yet, so
# `sacct` doesn't return it
if j.external_jobid not in status_of_jobs:
# but the job should still be queueing or running and
# appear in slurmdbd (and thus `sacct` output) later
yield j
continue
status = status_of_jobs[j.external_jobid]
if status == "COMPLETED":
self.report_job_success(j)
any_finished = True
active_jobs_seen_by_sacct.remove(j.external_jobid)
elif status == "UNKNOWN":
# the job probably does not exist anymore, but 'sacct' did not work
# so we assume it is finished
self.report_job_success(j)
any_finished = True
active_jobs_seen_by_sacct.remove(j.external_jobid)
elif status in fail_stati:
msg = (
f"SLURM-job '{j.external_jobid}' failed, SLURM status is: "
f"'{status}'"
)
self.report_job_error(j, msg=msg, aux_logs=[j.aux["slurm_logfile"]])
active_jobs_seen_by_sacct.remove(j.external_jobid)
else: # still running?
yield j

if not any_finished:
self.next_seconds_between_status_checks = min(
self.next_seconds_between_status_checks + 10, max_sleep_time
if missing_sacct_status:
self.logger.warning(
f"Unable to get the status of all active jobs that should be "
f"in slurmdbd, even after {status_attempts} attempts.\n"
f"The jobs with the following slurm job ids were previously seen "
"by sacct, but sacct doesn't report them any more:\n"
f"{missing_sacct_status}\n"
f"Please double-check with your slurm cluster administrator, that "
"slurmdbd job accounting is properly set up.\n"
)
else:
self.next_seconds_between_status_checks = None

if status_of_jobs is not None:
any_finished = False
for j in active_jobs:
# the job probably didn't make it into slurmdbd yet, so
# `sacct` doesn't return it
if j.external_jobid not in status_of_jobs:
# but the job should still be queueing or running and
# appear in slurmdbd (and thus `sacct` output) later
yield j
continue
status = status_of_jobs[j.external_jobid]
if status == "COMPLETED":
self.report_job_success(j)
any_finished = True
active_jobs_seen_by_sacct.remove(j.external_jobid)
elif status == "UNKNOWN":
# the job probably does not exist anymore, but 'sacct' did not work
# so we assume it is finished
self.report_job_success(j)
any_finished = True
active_jobs_seen_by_sacct.remove(j.external_jobid)
elif status in fail_stati:
msg = (
f"SLURM-job '{j.external_jobid}' failed, SLURM status is: "
f"'{status}'"
)
self.report_job_error(j, msg=msg, aux_logs=[j.aux["slurm_logfile"]])
active_jobs_seen_by_sacct.remove(j.external_jobid)
else: # still running?
yield j

if not any_finished:
self.next_seconds_between_status_checks = min(
self.next_seconds_between_status_checks + 10, max_sleep_time
)
else:
self.next_seconds_between_status_checks = None

def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
# Cancel all active jobs.
Expand Down

0 comments on commit 6a197ae

Please sign in to comment.