Skip to content

Commit

Permalink
fix: ensure lazy evaluation of resource functions/callables (this als…
Browse files Browse the repository at this point in the history
…o entails, for now, a removal of the thread statistics in the yellow job stats table); further, added some clarifying sentences about resource function evaluation to the docs (#2356)

### Description

This is a test for lazy resource evaluation which currently fails.

### QC
<!-- Make sure that you can tick the boxes below. -->

* [x] The PR contains a test case for the changes or the changes are
already covered by an existing test case.
* [x] The documentation (`docs/`) is updated to reflect the changes or
this is not necessary (e.g. if the change does neither modify the
language nor the behavior or functionalities of Snakemake).

---------

Co-authored-by: Johannes Köster <johannes.koester@uni-due.de>
  • Loading branch information
christopher-schroeder and johanneskoester committed Jul 20, 2023
1 parent f770984 commit 4c591b7
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 13 deletions.
7 changes: 6 additions & 1 deletion docs/snakefiles/rules.rst
Expand Up @@ -359,9 +359,14 @@ Apart from making Snakemake aware of hybrid-computing architectures (e.g. with a
If no limits are given, the resources are ignored in local execution.

Resources can have any arbitrary name, and must be assigned ``int`` or ``str`` values.
They can also be callables that return ``int``, ``str`` or ``None`` values.
In case of ``None``, the resource is considered to be unset (i.e. ignored) in the rule.

Resources can also be callables (e.g. functions or lambda expressions) that return ``int``, ``str`` or ``None`` values.
The signature of the callable must be ``callable(wildcards [, input] [, threads] [, attempt])`` (``input``, ``threads``, and ``attempt`` are optional parameters).
Such callables are evaluated immediately before the job is executed (or printed during a dry-run).

Since the callables can take e.g. ``input`` as an argument, they can for example be used to obtain the size of an input file and infer the amount of memory needed for the job.
In order to make this work with a dry-run, where the input files are not yet present, Snakemake automatically converts a ``FileNotFoundError`` that is raised by the callable into a placeholder called ``<TBD>`` that will be displayed during dry-run in such a case.

The parameter ``attempt`` allows us to adjust resources based on how often the job has been restarted (see :ref:`all_options`, option ``--retries``).
This is handy when executing a Snakemake workflow in a cluster environment, where jobs can e.g. fail because of too limited resources.
Expand Down
9 changes: 0 additions & 9 deletions snakemake/dag.py
Expand Up @@ -2460,17 +2460,10 @@ def stats(self):
rules.update(job.rule for job in self.needrun_jobs())
rules.update(job.rule for job in self.finished_jobs)

max_threads = defaultdict(int)
min_threads = defaultdict(lambda: sys.maxsize)
for job in chain(self.needrun_jobs(), self.finished_jobs):
max_threads[job.rule] = max(max_threads[job.rule], job.threads)
min_threads[job.rule] = min(min_threads[job.rule], job.threads)
rows = [
{
"job": rule.name,
"count": count,
"min threads": min_threads[rule],
"max threads": max_threads[rule],
}
for rule, count in sorted(
rules.most_common(), key=lambda item: item[0].name
Expand All @@ -2480,8 +2473,6 @@ def stats(self):
{
"job": "total",
"count": sum(rules.values()),
"min threads": min(min_threads.values()),
"max threads": max(max_threads.values()),
}
)

Expand Down
2 changes: 1 addition & 1 deletion snakemake/jobs.py
Expand Up @@ -372,7 +372,7 @@ def threads(self):
def params(self):
if self._params is None:
self._params = self.rule.expand_params(
self.wildcards_dict, self.input, self.output, self.resources
self.wildcards_dict, self.input, self.output, self
)
return self._params

Expand Down
18 changes: 16 additions & 2 deletions snakemake/rules.py
Expand Up @@ -752,6 +752,13 @@ def apply_input_function(

_aux_params = get_input_function_aux_params(func, aux_params)

# call any callables in _aux_params
# This way, we enable to delay the evaluation of expensive
# aux params until they are actually needed.
for name, value in list(_aux_params.items()):
if callable(value):
_aux_params[name] = value()

try:
value = func(Wildcards(fromdict=wildcards), **_aux_params)
if isinstance(value, types.GeneratorType):
Expand Down Expand Up @@ -933,7 +940,7 @@ def handle_incomplete_checkpoint(exception):

return input, mapping, dependencies, incomplete

def expand_params(self, wildcards, input, output, resources, omit_callable=False):
def expand_params(self, wildcards, input, output, job, omit_callable=False):
def concretize_param(p, wildcards, is_from_callable):
if not is_from_callable:
if isinstance(p, str):
Expand All @@ -956,6 +963,13 @@ def handle_incomplete_checkpoint(exception):
"Please add the output of the respective checkpoint to the rule inputs."
)

# We make sure that resources are only evaluated if a param function
# actually needs them by turning them into callables and delegating their
# evaluation to a later stage that only happens if the param function
# requests access to resources or threads.
resources = lambda: job.resources
threads = lambda: job.resources._cores

params = Params()
try:
# When applying wildcards to params, the return type need not be
Expand All @@ -974,7 +988,7 @@ def handle_incomplete_checkpoint(exception):
"input": input._plainstrings(),
"resources": resources,
"output": output._plainstrings(),
"threads": resources._cores,
"threads": threads,
},
incomplete_checkpoint_func=handle_incomplete_checkpoint,
)
Expand Down
28 changes: 28 additions & 0 deletions tests/test_lazy_resources/Snakefile
@@ -0,0 +1,28 @@
import os

rule all:
input:
"results/bar.txt",

rule foo:
output:
"results/foo.txt"
shell:
"touch {output}"

def get_resources(wc):
# usually, anything that raises a FileNotFoundError if the file is not present is fine here, even in dryrun.
# However, since this test case shall ensure that the function is evaluated just before the actual execution
# of the job, we additionally assert that the file is present.
assert os.path.isfile("results/foo.txt"), "bug: resource function is not evaluated in a lazy way, just before job execution"
return os.path.getsize("results/foo.txt")

rule bar:
input:
"results/foo.txt"
output:
"results/bar.txt"
resources:
test=get_resources
shell:
"touch {output}"
Empty file.
Empty file.
4 changes: 4 additions & 0 deletions tests/tests.py
Expand Up @@ -1953,6 +1953,10 @@ def test_github_issue1498():
run(dpath("test_github_issue1498"))


def test_lazy_resources():
run(dpath("test_lazy_resources"))


def test_cleanup_metadata_fail():
run(dpath("test09"), cleanup_metadata=["xyz"])

Expand Down

0 comments on commit 4c591b7

Please sign in to comment.