Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: fixed temp file deletion for group jobs (#1487)
  • Loading branch information
johanneskoester committed Mar 16, 2022
1 parent 3f03c5d commit d030443
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 15 deletions.
7 changes: 7 additions & 0 deletions snakemake/dag.py
Expand Up @@ -605,6 +605,11 @@ def handle_temp(self, job):
if self.notemp:
return

if job.is_group():
for j in job:
self.handle_temp(j)
return

is_temp = lambda f: is_flagged(f, "temp")

# handle temp input
Expand Down Expand Up @@ -1466,6 +1471,8 @@ def finish(self, job, update_dynamic=True):
self.create_conda_envs()
potential_new_ready_jobs = True

self.handle_temp(job)

return potential_new_ready_jobs

def new_job(self, rule, targetfile=None, format_wildcards=None):
Expand Down
2 changes: 1 addition & 1 deletion snakemake/executors/__init__.py
Expand Up @@ -818,7 +818,7 @@ def format_job(self, pattern, job, **kwargs):
wait_for_files=[repr(f) for f in wait_for_files],
)
job_specific_args = ""
if job.is_group:
if job.is_group():
job_specific_args = f"--local-groupid {job.jobid}"

format_p = partial(
Expand Down
15 changes: 1 addition & 14 deletions snakemake/jobs.py
Expand Up @@ -1042,7 +1042,6 @@ def postprocess(
upload_remote=True,
handle_log=True,
handle_touch=True,
handle_temp=True,
error=False,
ignore_missing_output=False,
assume_shared_fs=True,
Expand Down Expand Up @@ -1083,11 +1082,6 @@ def postprocess(
"({}). Please ensure write permissions for the "
"directory {}".format(e, self.dag.workflow.persistence.path)
)
if handle_temp:
# temp handling has to happen after calling finished(),
# because we need to access temp output files to record
# start and end times.
self.dag.handle_temp(self)

@property
def name(self):
Expand Down Expand Up @@ -1409,14 +1403,7 @@ def cleanup(self):

def postprocess(self, error=False, **kwargs):
for job in self.jobs:
job.postprocess(handle_temp=False, error=error, **kwargs)
# Handle temp after per-job postprocess.
# This is necessary because group jobs are not topologically sorted,
# and we might otherwise delete a temp input file before it has been
# postprocessed by the outputting job in the same group.
if not error:
for job in self.jobs:
self.dag.handle_temp(job)
job.postprocess(error=error, **kwargs)
# remove all pipe and service outputs since all jobs of this group are done and the
# outputs are no longer needed
for job in self.jobs:
Expand Down

0 comments on commit d030443

Please sign in to comment.