Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: merging of pipe groups when multiple rules are chained together via pipes #1173

Merged
merged 6 commits into from Sep 24, 2021

Conversation

aryarm
Copy link
Contributor

@aryarm aryarm commented Sep 12, 2021

Related to: #975 (and #349)

The problem

In versions of Snakemake up to (and including) v5.26.1, workflows where chained pipe rules get aggregated across their wildcards could be executed successfully. Here's an example.

rule all:
    input: "test.out"

rule a:
    output: pipe("testa.{i}.txt")
    shell: "echo {wildcards.i} > {output}"

rule b:
    input: rules.a.output
    output: pipe("testb.{i}.txt")
    shell: "cat {input} > {output}"

rule c:
    input: expand(rules.b.output, i=range(2))
    output: "test.out"
    shell: "cat {input} > {output}"

However, in v5.27.0 (ie the next version), a change was introduced that led to the following failure.

$ snakemake -j
Building DAG of jobs...
WorkflowError in line 8 of Snakefile:
An output file is marked as pipe, but consuming jobs are part of conflicting groups.

The change in question?

    @property
    def needrun_jobs(self):
        """ Jobs that need to be executed. """
-        for job in filter(
-            self.needrun,
-            self.bfs(self.dependencies, *self.targetjobs, stop=self.noneedrun_finished),
-        ):
-            yield job
+        return filterfalse(self.finished, self._needrun)

As a result, the code that handles the grouping of piped jobs in dag.py no longer iterates over jobs in the same order:

snakemake/snakemake/dag.py

Lines 1164 to 1167 in 2e2c63a

def handle_pipes(self):
"""Use pipes to determine job groups. Check if every pipe has exactly
one consumer"""
for job in self.needrun_jobs:

It turns out that the order really matters. If the jobs are not sorted topologically, the groups that are created for the jobs from each of rules a and b will not get properly merged. Instead, the jobs will be in separate groups, resulting in a WorkflowError about conflicting groups.
Unfortunately, the error happens somewhat non-deterministically. Since self._needrun is an unordered set(), the jobs it contains might appear in the right order occasionally just by chance.

The proposed solution

This PR revises the handle_pipes() code to force jobs to be processed in BFS order, and thus, resolves #975.

def handle_pipes(self):
    """Use pipes to determine job groups. Check if every pipe has exactly
    one consumer"""
-    for job in self.needrun_jobs:
+    needruns = filterfalse(
+        self.finished,
+        self.bfs(self.dependencies, *self.targetjobs, stop=self.noneedrun_finished),
+    )
+    for job in needruns:

Feel free to let me know if there's a better way to accomplish this. For example, I also considered changing the self._needrun set into some sort of ordered set. But I don't want to break anything, and this seemed like the most straightforward solution.

QC

  • I created a test case to cover the bug fix. The test is called test_pipes_multiple
  • Changes to the documentation are not needed in this case. This pull request fixes the code so that it can be consistent with the docs.

aryarm added 3 commits Sep 11, 2021
handle_pipes() depends on the jobs being in BFS order
otherwise, it won't merge groups together properly
see snakemake#975
@aryarm aryarm requested a review from johanneskoester as a code owner Sep 12, 2021
@aryarm aryarm changed the title restore merging of pipe groups fix: merging of pipe groups Sep 12, 2021
@aryarm aryarm changed the title fix: merging of pipe groups fix: merging of pipe groups when multiple rules are chained together via pipes Sep 19, 2021
@sonarcloud
Copy link

sonarcloud bot commented Sep 24, 2021

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

@johanneskoester
Copy link
Contributor

johanneskoester commented Sep 24, 2021

Thank you! I have modified this PR a bit, using an approach that does not need the BFS needruns. The reason for avoiding the BFS there was performance.

@johanneskoester johanneskoester merged commit de91d2c into snakemake:main Sep 24, 2021
6 checks passed
@pdagosto
Copy link

pdagosto commented Sep 28, 2021

Thanks Johannes and Arya - this fixed the problem I was having. I can run my workflow without problems in v 6.8.1

@aryarm aryarm deleted the fix/github-issue-975 branch Sep 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

workflow with piped output fails under versions 5.28 - 6.2
3 participants