Skip to content
Permalink
Browse files
fix: bug in pipe group handling that led to multiple assignments of t…
…he same group id to different groups; bug that accidentally added already running groups of the list of ready jobs (issue #1331) (#1332)

* issue 1331

* Update Snakefile

* Update Snakefile

* fix: bug in pipe group handling that led to multiple assignments of the same group id to different groups; bug that accidentally added already running groups of the list of ready jobs

* fmt

* skip on win

Co-authored-by: Johannes Köster <johannes.koester@tu-dortmund.de>
  • Loading branch information
Maarten-vd-Sande and johanneskoester committed Feb 18, 2022
1 parent 7189183 commit 1a9b483a6c675315d74bff791502c2bdd74609c1
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 6 deletions.
@@ -1149,7 +1149,6 @@ def _update_group_components(self):
for groupid, conn_components in groups_by_id.items():
n_components = self.workflow.group_components.get(groupid, 1)
if n_components > 1:
print(n_components)
for chunk in group_into_chunks(n_components, conn_components):
if len(chunk) > 1:
primary = chunk[0]
@@ -1180,7 +1179,8 @@ def update_ready(self, jobs=None):
else:
group = self._group[job]
group.finalize()
candidate_groups.add(group)
if group not in self._running:
candidate_groups.add(group)

self._ready_jobs.update(
group
@@ -1286,8 +1286,9 @@ def handle_pipes(self):

if len(candidate_groups) > 1:
if all(isinstance(group, CandidateGroup) for group in candidate_groups):
group = candidate_groups.pop()
for g in candidate_groups:
g.merge(group)
group.merge(g)
else:
raise WorkflowError(
"An output file is marked as "
@@ -1301,15 +1302,20 @@ def handle_pipes(self):
group = candidate_groups.pop()
else:
# generate a random unique group name
group = CandidateGroup() # str(uuid.uuid4())
group = CandidateGroup()

# set group for job and all downstreams
job.group = group
visited.add(job)
for j in all_depending:
j.group = group
visited.add(j)

# convert candidate groups to plain string IDs
for job in visited:
job.group = group.id if isinstance(group, CandidateGroup) else group
job.group = (
job.group.id if isinstance(job.group, CandidateGroup) else job.group
)

def _ready(self, job):
"""Return whether the given job is ready to execute."""
@@ -530,6 +530,7 @@ def schedule(self):

def _finish_jobs(self):
# must be called from within lock
# clear the global tofinish such that parallel calls do not interfere
for job in self._tofinish:
if self.handle_job_success:
try:
@@ -539,7 +540,7 @@ def _finish_jobs(self):
# we do the same as in case of errors during execution
print_exception(e, self.workflow.linemaps)
self._handle_error(job)
return
continue

if self.update_resources:
# normal jobs have len=1, group jobs have len>1
@@ -0,0 +1,35 @@
rule all:
input:
[
"aligned_and_sort/1.txt",
"aligned_and_sort/2.txt",
"aligned_and_sort/3.txt",
"aligned_and_sort/4.txt",
"aligned_and_sort/5.txt",
"aligned_and_sort/6.txt",
]


checkpoint trimming:
output:
"trimmed/{sample}.txt"
shell:
"touch {output}; sleep 1"


rule align:
input:
"trimmed/{sample}.txt"
output:
pipe("aligned/{sample}.txt")
shell:
"touch {output}; sleep 1"


rule sort:
input:
"aligned/{sample}.txt"
output:
"aligned_and_sort/{sample}.txt"
shell:
"touch {output}; sleep 1"
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
@@ -1455,6 +1455,13 @@ def test_modules_ruledeps_inheritance():
run(dpath("test_modules_ruledeps_inheritance"))


@skip_on_windows
def test_issue1331():
# not guaranteed to fail, so let's try multiple times
for i in range(10):
run(dpath("test_issue1331"), cores=4)


@skip_on_windows
def test_conda_named():
run(dpath("test_conda_named"), use_conda=True)

0 comments on commit 1a9b483

Please sign in to comment.