Skip to content
Permalink
Browse files
fix: efficient job status checking when using DRMAA API (this should …
…yield much better parallelization and performance when using --drmaa) (#1156)

* fix drmaa job status checking to not wait but return immediately. Handle suspended statuses via extra log messages.

* refactor

* fix potential key error when removing suspended message that has never existed
  • Loading branch information
johanneskoester committed Sep 3, 2021
1 parent 48d2dd9 commit ac004cb19cebd4efb5e38f6039861a2810c702ff
Showing 1 changed file with 31 additions and 11 deletions.
@@ -1402,6 +1402,8 @@ def shutdown(self):
def _wait_for_jobs(self):
import drmaa

suspended_msg = set()

while True:
with self.lock:
if not self.wait:
@@ -1412,9 +1414,7 @@ def _wait_for_jobs(self):
for active_job in active_jobs:
with self.status_rate_limiter:
try:
retval = self.session.wait(
active_job.jobid, drmaa.Session.TIMEOUT_NO_WAIT
)
retval = self.session.jobStatus(active_job.jobid)
except drmaa.ExitTimeoutException as e:
# job still active
still_running.append(active_job)
@@ -1427,20 +1427,40 @@ def _wait_for_jobs(self):
os.remove(active_job.jobscript)
active_job.error_callback(active_job.job)
continue
# job exited
os.remove(active_job.jobscript)
if (
not retval.wasAborted
and retval.hasExited
and retval.exitStatus == 0
):
if retval == drmaa.JobState.DONE:
os.remove(active_job.jobscript)
active_job.callback(active_job.job)
else:
elif retval == drmaa.JobState.FAILED:
os.remove(active_job.jobscript)
self.print_job_error(active_job.job)
self.print_cluster_job_error(
active_job, self.dag.jobid(active_job.job)
)
active_job.error_callback(active_job.job)
else:
# still running
still_running.append(active_job)

def handle_suspended(by):
if active_job.job.jobid not in suspended_msg:
logger.warning(
"Job {} (DRMAA id: {}) was suspended by {}.".format(
active_job.job.jobid, active_job.jobid, by
)
)
suspended_msg.add(active_job.job.jobid)

if retval == drmaa.JobState.USER_SUSPENDED:
handle_suspended("user")
elif retval == drmaa.JobState.SYSTEM_SUSPENDED:
handle_suspended("system")
else:
try:
suspended_msg.remove(active_job.job.jobid)
except KeyError:
# there was nothing to remove
pass

with self.lock:
self.active_jobs.extend(still_running)
sleep()

0 comments on commit ac004cb

Please sign in to comment.