From 4c591b72b31d6c6c36b43f1d7773d8317352fbc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christopher=20Schr=C3=B6der?= Date: Fri, 21 Jul 2023 00:03:44 +0200 Subject: [PATCH] fix: ensure lazy evaluation of resource functions/callables (this also entails, for now, a removal of the thread statistics in the yellow job stats table); further, added some clarifying sentences about resource function evaluation to the docs (#2356) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Description This is a test for lazy resource evaluation which currently fails. ### QC * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). --------- Co-authored-by: Johannes Köster --- docs/snakefiles/rules.rst | 7 ++++- snakemake/dag.py | 9 ------ snakemake/jobs.py | 2 +- snakemake/rules.py | 18 ++++++++++-- tests/test_lazy_resources/Snakefile | 28 +++++++++++++++++++ .../expected-results/results/bar.txt | 0 .../expected-results/results/foo.txt | 0 tests/tests.py | 4 +++ 8 files changed, 55 insertions(+), 13 deletions(-) create mode 100644 tests/test_lazy_resources/Snakefile create mode 100644 tests/test_lazy_resources/expected-results/results/bar.txt create mode 100644 tests/test_lazy_resources/expected-results/results/foo.txt diff --git a/docs/snakefiles/rules.rst b/docs/snakefiles/rules.rst index 1d09703d8..44445aee1 100644 --- a/docs/snakefiles/rules.rst +++ b/docs/snakefiles/rules.rst @@ -359,9 +359,14 @@ Apart from making Snakemake aware of hybrid-computing architectures (e.g. with a If no limits are given, the resources are ignored in local execution. Resources can have any arbitrary name, and must be assigned ``int`` or ``str`` values. -They can also be callables that return ``int``, ``str`` or ``None`` values. In case of ``None``, the resource is considered to be unset (i.e. ignored) in the rule. + +Resources can also be callables (e.g. functions or lambda expressions) that return ``int``, ``str`` or ``None`` values. The signature of the callable must be ``callable(wildcards [, input] [, threads] [, attempt])`` (``input``, ``threads``, and ``attempt`` are optional parameters). +Such callables are evaluated immediately before the job is executed (or printed during a dry-run). + +Since the callables can take e.g. ``input`` as an argument, they can for example be used to obtain the size of an input file and infer the amount of memory needed for the job. +In order to make this work with a dry-run, where the input files are not yet present, Snakemake automatically converts a ``FileNotFoundError`` that is raised by the callable into a placeholder called ```` that will be displayed during dry-run in such a case. The parameter ``attempt`` allows us to adjust resources based on how often the job has been restarted (see :ref:`all_options`, option ``--retries``). This is handy when executing a Snakemake workflow in a cluster environment, where jobs can e.g. fail because of too limited resources. diff --git a/snakemake/dag.py b/snakemake/dag.py index 7d3a65844..87e6b5fb8 100755 --- a/snakemake/dag.py +++ b/snakemake/dag.py @@ -2460,17 +2460,10 @@ def stats(self): rules.update(job.rule for job in self.needrun_jobs()) rules.update(job.rule for job in self.finished_jobs) - max_threads = defaultdict(int) - min_threads = defaultdict(lambda: sys.maxsize) - for job in chain(self.needrun_jobs(), self.finished_jobs): - max_threads[job.rule] = max(max_threads[job.rule], job.threads) - min_threads[job.rule] = min(min_threads[job.rule], job.threads) rows = [ { "job": rule.name, "count": count, - "min threads": min_threads[rule], - "max threads": max_threads[rule], } for rule, count in sorted( rules.most_common(), key=lambda item: item[0].name @@ -2480,8 +2473,6 @@ def stats(self): { "job": "total", "count": sum(rules.values()), - "min threads": min(min_threads.values()), - "max threads": max(max_threads.values()), } ) diff --git a/snakemake/jobs.py b/snakemake/jobs.py index 4aef853ea..3c428b2e7 100644 --- a/snakemake/jobs.py +++ b/snakemake/jobs.py @@ -372,7 +372,7 @@ def threads(self): def params(self): if self._params is None: self._params = self.rule.expand_params( - self.wildcards_dict, self.input, self.output, self.resources + self.wildcards_dict, self.input, self.output, self ) return self._params diff --git a/snakemake/rules.py b/snakemake/rules.py index 52a68c7da..765d15230 100644 --- a/snakemake/rules.py +++ b/snakemake/rules.py @@ -752,6 +752,13 @@ def apply_input_function( _aux_params = get_input_function_aux_params(func, aux_params) + # call any callables in _aux_params + # This way, we enable to delay the evaluation of expensive + # aux params until they are actually needed. + for name, value in list(_aux_params.items()): + if callable(value): + _aux_params[name] = value() + try: value = func(Wildcards(fromdict=wildcards), **_aux_params) if isinstance(value, types.GeneratorType): @@ -933,7 +940,7 @@ def handle_incomplete_checkpoint(exception): return input, mapping, dependencies, incomplete - def expand_params(self, wildcards, input, output, resources, omit_callable=False): + def expand_params(self, wildcards, input, output, job, omit_callable=False): def concretize_param(p, wildcards, is_from_callable): if not is_from_callable: if isinstance(p, str): @@ -956,6 +963,13 @@ def handle_incomplete_checkpoint(exception): "Please add the output of the respective checkpoint to the rule inputs." ) + # We make sure that resources are only evaluated if a param function + # actually needs them by turning them into callables and delegating their + # evaluation to a later stage that only happens if the param function + # requests access to resources or threads. + resources = lambda: job.resources + threads = lambda: job.resources._cores + params = Params() try: # When applying wildcards to params, the return type need not be @@ -974,7 +988,7 @@ def handle_incomplete_checkpoint(exception): "input": input._plainstrings(), "resources": resources, "output": output._plainstrings(), - "threads": resources._cores, + "threads": threads, }, incomplete_checkpoint_func=handle_incomplete_checkpoint, ) diff --git a/tests/test_lazy_resources/Snakefile b/tests/test_lazy_resources/Snakefile new file mode 100644 index 000000000..0fa4fe87f --- /dev/null +++ b/tests/test_lazy_resources/Snakefile @@ -0,0 +1,28 @@ +import os + +rule all: + input: + "results/bar.txt", + +rule foo: + output: + "results/foo.txt" + shell: + "touch {output}" + +def get_resources(wc): + # usually, anything that raises a FileNotFoundError if the file is not present is fine here, even in dryrun. + # However, since this test case shall ensure that the function is evaluated just before the actual execution + # of the job, we additionally assert that the file is present. + assert os.path.isfile("results/foo.txt"), "bug: resource function is not evaluated in a lazy way, just before job execution" + return os.path.getsize("results/foo.txt") + +rule bar: + input: + "results/foo.txt" + output: + "results/bar.txt" + resources: + test=get_resources + shell: + "touch {output}" diff --git a/tests/test_lazy_resources/expected-results/results/bar.txt b/tests/test_lazy_resources/expected-results/results/bar.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_lazy_resources/expected-results/results/foo.txt b/tests/test_lazy_resources/expected-results/results/foo.txt new file mode 100644 index 000000000..e69de29bb diff --git a/tests/tests.py b/tests/tests.py index 562164304..f922b3d40 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -1953,6 +1953,10 @@ def test_github_issue1498(): run(dpath("test_github_issue1498")) +def test_lazy_resources(): + run(dpath("test_lazy_resources")) + + def test_cleanup_metadata_fail(): run(dpath("test09"), cleanup_metadata=["xyz"])