Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: improved error handling for cluster status scripts and smarter j…
…ob selector choice in case of cluster submission (use greedy for single jobs). (#1142)

* fix: improved error handling for cluster status scripts and smarter job selector choice in case of cluster submission (use greedy for single jobs).

* refactor: fmt

* temporarily deactivate greedy fallback in case of a single job (see if this is the reason for the failure in GLS).

* handle missing file in job reward.
  • Loading branch information
johanneskoester committed Aug 27, 2021
1 parent 68c13fd commit 48d2dd9
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 21 deletions.
47 changes: 34 additions & 13 deletions snakemake/executors/__init__.py
Expand Up @@ -712,7 +712,7 @@ def __init__(
self.active_jobs = list()
self.lock = threading.Lock()
self.wait = True
self.wait_thread = threading.Thread(target=self._wait_for_jobs)
self.wait_thread = threading.Thread(target=self._wait_thread)
self.wait_thread.daemon = True
self.wait_thread.start()

Expand All @@ -722,6 +722,12 @@ def __init__(
max_calls=self.max_status_checks_per_second, period=1
)

def _wait_thread(self):
try:
self._wait_for_jobs()
except Exception as e:
self.workflow.scheduler.executor_error_callback(e)

def shutdown(self):
with self.lock:
self.wait = False
Expand Down Expand Up @@ -1065,21 +1071,18 @@ def _wait_for_jobs(self):
success = "success"
failed = "failed"
running = "running"
status_cmd_kills = set()
if self.statuscmd is not None:

def job_status(job):
def job_status(job, valid_returns=["running", "success", "failed"]):
try:
# this command shall return "success", "failed" or "running"
return (
subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
)
.decode()
.split("\n")[0]
)
ret = subprocess.check_output(
"{statuscmd} {jobid}".format(
jobid=job.jobid, statuscmd=self.statuscmd
),
shell=True,
).decode()
except subprocess.CalledProcessError as e:
if e.returncode < 0:
# Ignore SIGINT and all other issues due to signals
Expand All @@ -1088,13 +1091,31 @@ def job_status(job):
# snakemake.
# Snakemake will handle the signal in
# the main process.
pass
status_cmd_kills.add(e.returncode)
if len(status_cmd_kills) > 10:
logger.info(
"Cluster status command {} was killed >10 times with signal(s) {} "
"(if this happens unexpectedly during your workflow execution, "
"have a closer look.).".format(
self.statuscmd, ",".join(status_cmd_kills)
)
)
status_cmd_kills.clear()
else:
raise WorkflowError(
"Failed to obtain job status. "
"See above for error message."
)

ret = ret.strip().split("\n")
if len(ret) != 1 or ret[0] not in valid_returns:
raise WorkflowError(
"Cluster status command {} returned {} but just a single line with one of {} is expected.".format(
self.statuscmd, "\\n".join(ret), ",".join(valid_returns)
)
)
return ret[0]

else:

def job_status(job):
Expand Down
36 changes: 31 additions & 5 deletions snakemake/scheduler.py
Expand Up @@ -140,8 +140,10 @@ def __init__(
self._lock = threading.Lock()

self._errors = False
self._executor_error = None
self._finished = False
self._job_queue = None
self._last_job_selection_empty = False
self._submit_callback = self._noop
self._finish_callback = partial(
self._proceed,
Expand Down Expand Up @@ -391,6 +393,12 @@ def __init__(
pass
self._open_jobs.release()

def executor_error_callback(self, exception):
with self._lock:
self._executor_error = exception
# next scheduling round to catch and raise error
self._open_jobs.release()

@property
def stats(self):
try:
Expand Down Expand Up @@ -435,16 +443,20 @@ def schedule(self):
needrun = set(self.open_jobs)
running = list(self.running)
errors = self._errors
executor_error = self._executor_error
user_kill = self._user_kill

# handle errors
if user_kill or (not self.keepgoing and errors):
if user_kill or (not self.keepgoing and errors) or executor_error:
if user_kill == "graceful":
logger.info(
"Will exit after finishing " "currently running jobs."
)

if not running:
if executor_error:
print_exception(executor_error, self.workflow.linemaps)

if executor_error or not running:
logger.info("Shutting down, this might take some time.")
self._executor.shutdown()
if not user_kill:
Expand Down Expand Up @@ -474,7 +486,11 @@ def schedule(self):
"Ready jobs ({}):\n\t".format(len(needrun))
+ "\n\t".join(map(str, needrun))
)

if not self._last_job_selection_empty:
logger.info("Select jobs to execute...")
run = self.job_selector(needrun)
self._last_job_selection_empty = not run

logger.debug(
"Selected jobs ({}):\n\t".format(len(run))
Expand Down Expand Up @@ -624,7 +640,11 @@ def job_selector_ilp(self, jobs):
from pulp import lpSum
from stopit import ThreadingTimeout as Timeout, TimeoutException

logger.info("Select jobs to execute...")
if len(jobs) == 1:
logger.debug(
"Using greedy selector because only single job has to be scheduled."
)
return self.job_selector_greedy(jobs)

with self._lock:
if not self.resources["_cores"]:
Expand Down Expand Up @@ -895,8 +915,14 @@ def job_reward(self, job):
temp_size = 0
input_size = 0
else:
temp_size = self.dag.temp_size(job)
input_size = job.inputsize
try:
temp_size = self.dag.temp_size(job)
input_size = job.inputsize
except FileNotFoundError:
# If the file is not yet present, this shall not affect the
# job selection.
temp_size = 0
input_size = 0

# Usually, this should guide the scheduler to first schedule all jobs
# that remove the largest temp file, then the second largest and so on.
Expand Down
7 changes: 4 additions & 3 deletions snakemake/workflow.py
Expand Up @@ -217,6 +217,7 @@ def __init__(
self.check_envvars = check_envvars
self.max_threads = max_threads
self.all_temp = all_temp
self.scheduler = None

_globals = globals()
_globals["workflow"] = self
Expand Down Expand Up @@ -953,7 +954,7 @@ def files(items):
self.persistence.conda_cleanup_envs()
return True

scheduler = JobScheduler(
self.scheduler = JobScheduler(
self,
dag,
local_cores=local_cores,
Expand Down Expand Up @@ -1053,7 +1054,7 @@ def files(items):
if not dryrun and not no_hooks:
self._onstart(logger.get_logfile())

success = scheduler.schedule()
success = self.scheduler.schedule()

if not immediate_submit and not dryrun:
dag.cleanup_workdir()
Expand All @@ -1069,7 +1070,7 @@ def files(items):
logger.remove_logfile()
else:
if stats:
scheduler.stats.to_json(stats)
self.scheduler.stats.to_json(stats)
logger.logfile_hint()
if not dryrun and not no_hooks:
self._onsuccess(logger.get_logfile())
Expand Down
32 changes: 32 additions & 0 deletions tests/test_cluster_statusscript/Snakefile.nonstandard
@@ -0,0 +1,32 @@
from snakemake import shell

chromosomes = [1,2,3,4,5]

envvars:
"TESTVAR"



rule all:
input: 'test.predictions', 'test.2.inter2'

rule compute1:
input: '{name}.in'
output: ['{name}.%s.inter'%c for c in chromosomes]
params: prefix="{name}"
run:
for out in output:
shell('(cat {input[0]} && echo "Part {out}") > {out}')

rule compute2:
input: '{name}.{chromosome}.inter'
output: '{name}.{chromosome}.inter2'
params: test="a=b"
threads: workflow.cores * 0.5
shell: 'echo copy; cp {input[0]} {output[0]}'

rule gather:
input: ['{name}.%s.inter2'%c for c in chromosomes]
output: '{name}.predictions'
run:
shell('cat {} > {}'.format(' '.join(input), output[0]))
2 changes: 2 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.1.inter
@@ -0,0 +1,2 @@
testz0r
Part test.1.inter
@@ -0,0 +1,2 @@
testz0r
Part test.1.inter
2 changes: 2 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.2.inter
@@ -0,0 +1,2 @@
testz0r
Part test.2.inter
@@ -0,0 +1,2 @@
testz0r
Part test.2.inter
2 changes: 2 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.3.inter
@@ -0,0 +1,2 @@
testz0r
Part test.3.inter
@@ -0,0 +1,2 @@
testz0r
Part test.3.inter
10 changes: 10 additions & 0 deletions tests/test_cluster_statusscript/expected-results/test.predictions
@@ -0,0 +1,10 @@
testz0r
Part test.1.inter
testz0r
Part test.2.inter
testz0r
Part test.3.inter
testz0r
Part test.4.inter
testz0r
Part test.5.inter
7 changes: 7 additions & 0 deletions tests/test_cluster_statusscript/qsub
@@ -0,0 +1,7 @@
#!/bin/bash
echo `date` >> qsub.log
tail -n1 $1 >> qsub.log
# simulate printing of job id by a random number
echo $RANDOM
cat $1 >> qsub.log
sh $1
1 change: 1 addition & 0 deletions tests/test_cluster_statusscript/status.sh
@@ -0,0 +1 @@
echo success
1 change: 1 addition & 0 deletions tests/test_cluster_statusscript/test.in
@@ -0,0 +1 @@
testz0r
11 changes: 11 additions & 0 deletions tests/tests.py
Expand Up @@ -104,6 +104,17 @@ def test14():
run(dpath("test14"), snakefile="Snakefile.nonstandard", cluster="./qsub")


@skip_on_windows
def test_cluster_statusscript():
os.environ["TESTVAR"] = "test"
run(
dpath("test_cluster_statusscript"),
snakefile="Snakefile.nonstandard",
cluster="./qsub",
cluster_status="./status.sh",
)


def test15():
run(dpath("test15"))

Expand Down

0 comments on commit 48d2dd9

Please sign in to comment.