fix: correct handling of exceptions in input functions that are gener…
…ators (#1536)

* fix: correct handling of exceptions in input functions that are generators

* fmt
johanneskoester committed Mar 30, 2022
1 parent 5b394c0 commit d9a56aa
Showing 6 changed files with 40 additions and 2 deletions.
2 changes: 1 addition & 1 deletion snakemake/
Expand Up @@ -510,7 +510,7 @@ class CheckSumMismatchException(WorkflowError):
class IncompleteCheckpointException(Exception):
def __init__(self, rule, targetfile):
"The requested checkpoint output is not yet created."
"The requested checkpoint output is not yet created. "
"If you see this error, you have likely tried to use "
"checkpoint output outside of an input function, or "
"you have tried to call an input function directly "
4 changes: 3 additions & 1 deletion snakemake/executors/
Expand Up @@ -465,6 +465,9 @@ def get_python_executable(self):
def get_envvar_declarations(self):
return ""

def get_job_args(self, job, **kwargs):
return f"{super().get_job_args(job, **kwargs)} --quiet"

def run(self, job, callback=None, submit_callback=None, error_callback=None):

Expand Down Expand Up @@ -581,7 +584,6 @@ def run_group_job(self, job):

def spawn_job(self, job):
cmd = self.format_job_exec(job)
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as e:
6 changes: 6 additions & 0 deletions snakemake/
Expand Up @@ -5,6 +5,7 @@

import os
import re
import types
from snakemake.path_modifier import PATH_MODIFIER_FLAG
import sys
import inspect
Expand Down Expand Up @@ -753,6 +754,11 @@ def apply_input_function(

value = func(Wildcards(fromdict=wildcards), **_aux_params)
if isinstance(value, types.GeneratorType):
# generators should be immediately collected here,
# otherwise we would miss any exceptions and
# would have to capture them again later.
value = list(value)
except IncompleteCheckpointException as e:
value = incomplete_checkpoint_func(e)
incomplete = True
25 changes: 25 additions & 0 deletions tests/test_github_issue261/Snakefile
@@ -0,0 +1,25 @@
import random

checkpoint random:
target = 'test1/{target}/{config}.txt'

with open(, 'w') as ftg:
for i in range(2, 10):

rule process:

def genetate_inputs(wildcards):
with checkpoints.random.get(**wildcards).output[0].open() as fran:
for line in fran.readlines():
yield line.rstrip()

rule generate:
Empty file.
5 changes: 5 additions & 0 deletions tests/
Expand Up @@ -1567,6 +1567,11 @@ def test_incomplete_params():
run(dpath("test_incomplete_params"), dryrun=True, printshellcmds=True)

def test_github_issue261():
run(dpath("test_github_issue261"), targets=["test1/target1/config1.done"])

@skip_on_windows # no pipe support on windows
def test_pipe_depend():
run(dpath("test_pipe_depend"), shouldfail=True)
0 comments on commit d9a56aa

