Skip to content
Permalink
Browse files
feat: add flag ensure that allows to annotate that certain output f…
…iles should be non-empty or agree with a given checksum (#1651)

* feat: add flag for marking output files as expected to be non empty.

* docs

* test case

* update docs

* update tests

* update test

* fixes

* docs

* fix error handling

* ensure that testcase runs properly on windows
  • Loading branch information
johanneskoester committed May 16, 2022
1 parent 1ae85c6 commit 76f69d9e21b9c9a9a01198862b66284bc3942d20
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 5 deletions.
@@ -1036,6 +1036,61 @@ The timestamp of such files is ignored and always assumed to be older than any o
Here, this means that the file ``path/to/outputfile`` will not be triggered for re-creation after it has been generated once, even when the input file is modified in the future.
Note that any flag that forces re-creation of files still also applies to files marked as ``ancient``.

.. _snakefiles_ensure::

Ensuring output file properties like non-emptyness or checksum compliance
-------------------------------------------------------------------------

It is possible to annotate certain additional criteria for output files to be ensured after they have been generated successfully.
For example, this can be used to check for output files to be non-empty, or to compare them against a given sha256 checksum.
If this functionality is used, Snakemake will check such annotated files before considering a job to be successfull.
Non-emptyness can be checked as follows:

.. code-block:: python
rule NAME:
output:
ensure("test.txt", non_empty=True)
shell:
"somecommand {output}"
Above, the output file ``test.txt`` is marked as non-empty.
If the command ``somecommand`` happens to generate an empty output,
the job will fail with an error listing the unexpected empty file.

A sha256 checksum can be compared as follows:

.. code-block:: python
my_checksum = "u98a9cjsd98saud090923ßkpoasköf9ß32"
rule NAME:
output:
ensure("test.txt", sha256=my_checksum)
shell:
"somecommand {output}"
In addition to providing the checksum as plain string, it is possible to provide a pointer to a function (similar to :ref:`input functions <snakefiles_input-functions>`).
The function has to accept a single argument that will be the wildcards object generated from the application of the rule to create some requested output files:

.. code-block:: python
def get_checksum(wildcards):
# e.g., look up the checksum with the value of the wildcard sample
# in some dictionary
return my_checksums[wildcards.sample]
rule NAME:
output:
ensure("test/{sample}.txt", sha256=get_checksum)
shell:
"somecommand {output}"
Note that you can also use `lambda expressions <https://docs.python.org/3/tutorial/controlflow.html#lambda-expressions>`_ instead of full function definitions.

Often, it is a good idea to combine ``ensure`` annotations with :ref:`retry definitions <snakefiles_retries>`, e.g. for retrying upon invalid checksums or empty files.

Shadow rules
------------

@@ -1069,7 +1124,7 @@ Consider running with the ``--cleanup-shadow`` argument every now and then
to remove any remaining shadow directories from aborted jobs.
The base shadow directory can be changed with the ``--shadow-prefix`` command line argument.

.. _snakefiles-retries:
.. _snakefiles_retries:

Defining retries for fallible rules
-----------------------------------
@@ -1087,6 +1142,8 @@ For such cases, it is possible to defined a number of automatic retries for each
shell:
"curl https://some.unreliable.server/test.txt > {output}"
Often, it is a good idea to combine retry functionality with :ref:`ensure annotations <snakefiles_ensure>`, e.g. for retrying upon invalid checksums or empty files.

Note that it is also possible to define retries globally (via the ``--retries`` command line option, see :ref:`all_options`).
The local definition of the rule thereby overwrites the global definition.

@@ -17,7 +17,14 @@
import uuid
import math

from snakemake.io import PeriodicityDetector, wait_for_files, is_flagged, IOFile
from snakemake.io import (
PeriodicityDetector,
get_flag_value,
is_callable,
wait_for_files,
is_flagged,
IOFile,
)
from snakemake.jobs import Reason, JobFactory, GroupJobFactory, Job
from snakemake.exceptions import MissingInputException
from snakemake.exceptions import MissingRuleException, AmbiguousRuleException
@@ -479,6 +486,55 @@ def missing_temp(self, job):
return True
return False

def handle_ensure(self, job, expanded_output):
ensured_output = {
f: get_flag_value(f, "ensure")
for f in expanded_output
if is_flagged(f, "ensure")
}
# handle non_empty
empty_output = [
f
for f, ensure in ensured_output.items()
if ensure["non_empty"] and f.size == 0
]
if empty_output:
raise WorkflowError(
"Detected unexpected empty output files. "
"Something went wrong in the rule without "
"an error being reported:\n{}".format("\n".join(empty_output)),
rule=job.rule,
)

# handle checksum
def is_not_same_checksum(f, checksum):
if checksum is None:
return False
if is_callable(checksum):
try:
checksum = checksum(job.wildcards)
except Exception as e:
raise WorkflowError(
"Error calling checksum function provided to ensure marker.",
e,
rule=job.rule,
)
return not f.is_same_checksum(checksum, force=True)

checksum_failed_output = [
f
for f, ensure in ensured_output.items()
if is_not_same_checksum(f, ensure.get("sha256"))
]
if checksum_failed_output:
raise WorkflowError(
"Output files have checksums that differ from the expected ones "
"defined in the workflow:\n{}".format(
"\n".join(checksum_failed_output)
),
rule=job.rule,
)

def check_and_touch_output(
self,
job,
@@ -508,13 +564,16 @@ def check_and_touch_output(
)

# Ensure that outputs are of the correct type (those flagged with directory()
# are directories and not files and vice versa). We can't check for remote objects
# are directories and not files and vice versa). We can't check for remote objects.
for f in expanded_output:
if (f.is_directory and not f.remote_object and not os.path.isdir(f)) or (
not f.remote_object and os.path.isdir(f) and not f.is_directory
):
raise ImproperOutputException(job, [f])

# Handle ensure flags
self.handle_ensure(job, expanded_output)

# It is possible, due to archive expansion or cluster clock skew, that
# the files appear older than the input. But we know they must be new,
# so touch them to update timestamps. This also serves to touch outputs
@@ -605,8 +605,8 @@ def checksum(self, force=False):
else:
return None

def is_same_checksum(self, other_checksum):
checksum = self.checksum()
def is_same_checksum(self, other_checksum, force=False):
checksum = self.checksum(force=force)
if checksum is None or other_checksum is None:
# if no checksum available or files too large, not the same
return False
@@ -1114,6 +1114,10 @@ def touch(value):
return flag(value, "touch")


def ensure(value, non_empty=False, sha256=None):
return flag(value, "ensure", {"non_empty": non_empty, "sha256": sha256})


def unpack(value):
return flag(value, "unpack")

@@ -560,6 +560,7 @@ def _set_inoutput_item(self, item, output=False, name=None):
"touch",
"pipe",
"service",
"ensure",
]:
logger.warning(
"The flag '{}' used in rule {} is only valid for outputs, not inputs.".format(
@@ -55,6 +55,7 @@
repeat,
report,
multiext,
ensure,
IOFile,
sourcecache_entry,
)
@@ -0,0 +1,31 @@
shell.executable("bash")

rule a:
output:
ensure("test.txt", non_empty=True)
shell:
"touch {output}"


rule b:
output:
ensure("test2.txt", non_empty=True)
shell:
"echo test > {output}"


sha256 = "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"


rule c:
output:
ensure("test3.txt", sha256=sha256)
shell:
"echo -n test > {output}"


rule d:
output:
ensure("test4.txt", sha256=lambda w: sha256)
shell:
"echo -n test2 > {output}"
@@ -0,0 +1 @@
test
@@ -1616,6 +1616,18 @@ def test_github_issue1389():
run(dpath("test_github_issue1389"), resources={"foo": 4}, shouldfail=True)


def test_ensure_nonempty_fail():
run(dpath("test_ensure"), targets=["a"], shouldfail=True)


def test_ensure_success():
run(dpath("test_ensure"), targets=["b", "c"])


def test_ensure_checksum_fail():
run(dpath("test_ensure"), targets=["d"], shouldfail=True)


@skip_on_windows
def test_github_issue1261():
run(dpath("test_github_issue1261"), shouldfail=True, check_results=True)

0 comments on commit 76f69d9

Please sign in to comment.