Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: always wait for input files before starting jobs, also upon loca…
…l execution and within group jobs. This should add further robustness against NFS latency issues. (#1486)

* fix: always wait for input files before starting jobs, also upon local execution and within group jobs. This should add further robustness against NFS latency issues.

* try downgrading irods client
  • Loading branch information
johanneskoester committed Mar 16, 2022
1 parent adae8f1 commit cab2adb
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 42 deletions.
2 changes: 1 addition & 1 deletion snakemake/__init__.py
Expand Up @@ -599,6 +599,7 @@ def snakemake(
check_envvars=not lint, # for linting, we do not need to check whether requested envvars exist
all_temp=all_temp,
local_groupid=local_groupid,
latency_wait=latency_wait,
)
success = True

Expand Down Expand Up @@ -771,7 +772,6 @@ def snakemake(
archive=archive,
delete_all_output=delete_all_output,
delete_temp_output=delete_temp_output,
latency_wait=latency_wait,
wait_for_files=wait_for_files,
detailed_summary=detailed_summary,
nolock=not lock,
Expand Down
19 changes: 1 addition & 18 deletions snakemake/executors/__init__.py
Expand Up @@ -65,7 +65,6 @@ def __init__(
quiet=False,
printshellcmds=False,
printthreads=True,
latency_wait=3,
keepincomplete=False,
keepmetadata=True,
):
Expand All @@ -75,7 +74,7 @@ def __init__(
self.printreason = printreason
self.printshellcmds = printshellcmds
self.printthreads = printthreads
self.latency_wait = latency_wait
self.latency_wait = workflow.latency_wait
self.keepincomplete = keepincomplete
self.keepmetadata = keepmetadata

Expand Down Expand Up @@ -221,7 +220,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
assume_shared_fs=True,
keepincomplete=False,
keepmetadata=False,
Expand All @@ -232,7 +230,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand Down Expand Up @@ -424,7 +421,6 @@ def __init__(
quiet=False,
printshellcmds=False,
use_threads=False,
latency_wait=3,
cores=1,
keepincomplete=False,
keepmetadata=True,
Expand All @@ -435,7 +431,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand Down Expand Up @@ -659,7 +654,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand All @@ -680,7 +674,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
assume_shared_fs=assume_shared_fs,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down Expand Up @@ -957,7 +950,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
restart_times=0,
assume_shared_fs=True,
max_status_checks_per_second=1,
Expand Down Expand Up @@ -989,7 +981,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1322,7 +1313,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
restart_times=0,
assume_shared_fs=True,
keepincomplete=False,
Expand All @@ -1336,7 +1326,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1432,7 +1421,6 @@ def __init__(
printshellcmds=False,
drmaa_args="",
drmaa_log_dir=None,
latency_wait=3,
cluster_config=None,
restart_times=0,
assume_shared_fs=True,
Expand All @@ -1448,7 +1436,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
restart_times=restart_times,
assume_shared_fs=assume_shared_fs,
Expand Down Expand Up @@ -1635,7 +1622,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand Down Expand Up @@ -1663,7 +1649,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
local_input=local_input,
restart_times=restart_times,
Expand Down Expand Up @@ -2092,7 +2077,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
local_input=None,
restart_times=None,
max_status_checks_per_second=1,
Expand Down Expand Up @@ -2147,7 +2131,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
local_input=local_input,
restart_times=restart_times,
exec_job=exec_job,
Expand Down
2 changes: 0 additions & 2 deletions snakemake/executors/ga4gh_tes.py
Expand Up @@ -28,7 +28,6 @@ def __init__(
printreason=False,
quiet=False,
printshellcmds=False,
latency_wait=3,
cluster_config=None,
local_input=None,
restart_times=None,
Expand Down Expand Up @@ -83,7 +82,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
local_input=local_input,
restart_times=restart_times,
Expand Down
2 changes: 0 additions & 2 deletions snakemake/executors/google_lifesciences.py
Expand Up @@ -51,7 +51,6 @@ def __init__(
regions=None,
location=None,
cache=False,
latency_wait=3,
local_input=None,
restart_times=None,
exec_job=None,
Expand Down Expand Up @@ -127,7 +126,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
restart_times=restart_times,
exec_job=exec_job,
assume_shared_fs=False,
Expand Down
4 changes: 4 additions & 0 deletions snakemake/jobs.py
Expand Up @@ -21,6 +21,7 @@
_IOFile,
is_flagged,
get_flag_value,
wait_for_files,
)
from snakemake.utils import format, listfiles
from snakemake.exceptions import RuleException, ProtectedOutputException, WorkflowError
Expand Down Expand Up @@ -764,6 +765,9 @@ def prepare(self):
if self.benchmark:
self.benchmark.prepare()

# wait for input files
wait_for_files(self.input, latency_wait=self.dag.workflow.latency_wait)

if not self.is_shadow:
return

Expand Down
15 changes: 0 additions & 15 deletions snakemake/scheduler.py
Expand Up @@ -94,7 +94,6 @@ def __init__(
keepgoing=False,
max_jobs_per_second=None,
max_status_checks_per_second=100,
latency_wait=3,
greediness=1.0,
force_use_threads=False,
assume_shared_fs=True,
Expand Down Expand Up @@ -169,7 +168,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
)
elif touch:
self._executor = TouchExecutor(
Expand All @@ -178,7 +176,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
)
elif cluster or cluster_sync or (drmaa is not None):
if not workflow.immediate_submit:
Expand All @@ -191,7 +188,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down Expand Up @@ -219,7 +215,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
assume_shared_fs=assume_shared_fs,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -240,7 +235,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
assume_shared_fs=assume_shared_fs,
max_status_checks_per_second=max_status_checks_per_second,
Expand All @@ -255,7 +249,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -269,7 +262,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cluster_config=cluster_config,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -283,7 +275,6 @@ def __init__(
quiet=quiet,
printshellcmds=printshellcmds,
use_threads=use_threads,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand All @@ -300,7 +291,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
)
Expand All @@ -312,7 +302,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
)

Expand All @@ -327,7 +316,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
preemption_default=preemption_default,
preemptible_rules=preemptible_rules,
)
Expand All @@ -339,7 +327,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
cores=local_cores,
keepincomplete=keepincomplete,
)
Expand All @@ -351,7 +338,6 @@ def __init__(
printreason=printreason,
quiet=quiet,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
tes_url=tes,
container_image=container_image,
)
Expand All @@ -365,7 +351,6 @@ def __init__(
quiet=quiet,
printshellcmds=printshellcmds,
use_threads=use_threads,
latency_wait=latency_wait,
cores=cores,
keepincomplete=keepincomplete,
keepmetadata=keepmetadata,
Expand Down
8 changes: 5 additions & 3 deletions snakemake/workflow.py
Expand Up @@ -149,6 +149,7 @@ def __init__(
max_threads=None,
all_temp=False,
local_groupid="local",
latency_wait=3,
):
"""
Create the controller.
Expand Down Expand Up @@ -234,6 +235,7 @@ def __init__(
self.all_temp = all_temp
self.scheduler = None
self.local_groupid = local_groupid
self.latency_wait = latency_wait

_globals = globals()
_globals["workflow"] = self
Expand Down Expand Up @@ -599,7 +601,6 @@ def execute(
delete_all_output=False,
delete_temp_output=False,
detailed_summary=False,
latency_wait=3,
wait_for_files=None,
nolock=False,
unlock=False,
Expand Down Expand Up @@ -701,7 +702,9 @@ def files(items):

if wait_for_files is not None:
try:
snakemake.io.wait_for_files(wait_for_files, latency_wait=latency_wait)
snakemake.io.wait_for_files(
wait_for_files, latency_wait=self.latency_wait
)
except IOError as e:
logger.error(str(e))
return False
Expand Down Expand Up @@ -1024,7 +1027,6 @@ def files(items):
container_image=container_image,
printreason=printreason,
printshellcmds=printshellcmds,
latency_wait=latency_wait,
greediness=greediness,
force_use_threads=force_use_threads,
assume_shared_fs=self.assume_shared_fs,
Expand Down
2 changes: 1 addition & 1 deletion test-environment.yml
Expand Up @@ -38,7 +38,7 @@ dependencies:
- ratelimiter
- configargparse
- appdirs
- python-irodsclient
- python-irodsclient <1.1.2 # bug in 1.1.2 leads to KeyError
- cwltool
- jsonschema
- pandas
Expand Down

0 comments on commit cab2adb

Please sign in to comment.