Skip to content
Permalink
Browse files
fix: merging of pipe groups when multiple rules are chained together …
…via pipes (#1173)

* when handling pipes, process jobs in BFS order

handle_pipes() depends on the jobs being in BFS order
otherwise, it won't merge groups together properly
see #975

* create test for #975 - multiple piped rules

* create output for multiple pipes test

* fix formatting and filter finished jobs

* register test_pipes_multiple with nosetests in tests.py

* perf: use mergeable CandidateGroup objects instead of topological sorting of the jobs (faster).

Co-authored-by: Johannes Köster <johannes.koester@tu-dortmund.de>
  • Loading branch information
aryarm and johanneskoester committed Sep 24, 2021
1 parent 51e5afc commit de91d2ccf53bd844b4dbf4f64dd087f4ee935be5
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 8 deletions.
@@ -1224,6 +1224,8 @@ def postprocess(self, update_needrun=True):
def handle_pipes(self):
"""Use pipes to determine job groups. Check if every pipe has exactly
one consumer"""

visited = set()
for job in self.needrun_jobs:
candidate_groups = set()
if job.group is not None:
@@ -1283,22 +1285,31 @@ def handle_pipes(self):
continue

if len(candidate_groups) > 1:
raise WorkflowError(
"An output file is marked as "
"pipe, but consuming jobs "
"are part of conflicting "
"groups.",
rule=job.rule,
)
if all(isinstance(group, CandidateGroup) for group in candidate_groups):
for g in candidate_groups:
g.merge(group)
else:
raise WorkflowError(
"An output file is marked as "
"pipe, but consuming jobs "
"are part of conflicting "
"groups.",
rule=job.rule,
)
elif candidate_groups:
# extend the candidate group to all involved jobs
group = candidate_groups.pop()
else:
# generate a random unique group name
group = str(uuid.uuid4())
group = CandidateGroup() # str(uuid.uuid4())
job.group = group
visited.add(job)
for j in all_depending:
j.group = group
visited.add(j)

for job in visited:
job.group = group.id if isinstance(group, CandidateGroup) else group

def _ready(self, job):
"""Return whether the given job is ready to execute."""
@@ -2181,3 +2192,17 @@ def __str__(self):

def __len__(self):
return self._len


class CandidateGroup:
def __init__(self):
self.id = str(uuid.uuid4())

def __eq__(self, other):
return self.id == other.id

def __hash__(self):
return hash(self.id)

def merge(self, other):
self.id = other.id
@@ -289,6 +289,7 @@ def group(self):

@group.setter
def group(self, group):
print(group, type(group))
self._group = group

@property
@@ -0,0 +1,27 @@
shell.executable("bash")

rule all:
input:
"test.out"

rule a:
output:
pipe("testa.{i}.txt")
shell:
"echo {wildcards.i} > {output}"

rule b:
input:
rules.a.output
output:
pipe("testb.{i}.txt")
shell:
"cat {input} > {output}"

rule c:
input:
expand(rules.b.output, i=range(2))
output:
"test.out"
shell:
"cat {input} > {output}"
@@ -0,0 +1,2 @@
0
1
@@ -832,6 +832,12 @@ def test_pipes():
run(dpath("test_pipes"))


@skip_on_windows
def test_pipes_multiple():
# see github issue #975
run(dpath("test_pipes_multiple"))


def test_pipes_fail():
run(dpath("test_pipes_fail"), shouldfail=True)

0 comments on commit de91d2c

Please sign in to comment.