From 1c348da772fc53d3e61ed26b74c055476c033372 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Sat, 9 May 2020 23:37:43 +0800 Subject: [PATCH 1/5] fix #3588 remove improper edges from output DAG. --- dvc/command/pipeline.py | 33 +++++++++++++++++++-------------- tests/func/test_pipeline.py | 25 +++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index d788031660..76f2dc1a70 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -42,30 +42,35 @@ def _build_graph(self, target, commands=False, outs=False): target_stage = dvcfile.Dvcfile(self.repo, path).stages[name] G = get_pipeline(self.repo.pipelines, target_stage) + out_nodes = set() + if outs: + for stage in networkx.dfs_preorder_nodes(G, target_stage): + for out in stage.outs: + out_nodes.add(str(out)) + nodes = set() + edges = [] for stage in networkx.dfs_preorder_nodes(G, target_stage): if commands: if stage.cmd is None: continue nodes.add(stage.cmd) + for to_stage in G.neighbors(stage): + if to_stage.cmd: + edges.append((stage.cmd, to_stage.cmd)) elif outs: for out in stage.outs: - nodes.add(str(out)) + if stage == target_stage: + nodes.add(str(out)) + if str(out) in nodes: + for dep in stage.deps: + if str(dep) in out_nodes: + nodes.add(str(dep)) + edges.append((str(out), str(dep))) else: nodes.add(stage.addressing) - - edges = [] - for from_stage, to_stage in networkx.edge_dfs(G, target_stage): - if commands: - if to_stage.cmd is None: - continue - edges.append((from_stage.cmd, to_stage.cmd)) - elif outs: - for from_out in from_stage.outs: - for to_out in to_stage.outs: - edges.append((str(from_out), str(to_out))) - else: - edges.append((from_stage.addressing, to_stage.addressing)) + for to_stage in G.neighbors(stage): + edges.append((stage.addressing, to_stage.addressing)) return list(nodes), edges, networkx.is_tree(G) diff --git a/tests/func/test_pipeline.py b/tests/func/test_pipeline.py index eda4a14713..a222aeadce 100644 --- a/tests/func/test_pipeline.py +++ b/tests/func/test_pipeline.py @@ -328,3 +328,28 @@ def test_pipeline_ascii_multistage(tmp_dir, dvc, run_copy): nodes, edges, is_tree = command._build_graph("dvc.yaml:copy-foo-bar") assert set(nodes) == {"dvc.yaml:copy-foo-bar"} + + +def test_pipeline_multi_outputs_stages(dvc): + dvc.run( + outs=["alice", "bob"], + cmd="echo alice>alice && echo bob>bob", + single_stage=True, + ) + dvc.run( + deps=["alice"], + outs=["mary", "mike"], + cmd="echo mary>mary && echo mike>mike", + single_stage=True, + ) + stage = dvc.run( + deps=["mary"], + outs=["carol"], + cmd="echo carol>carol", + single_stage=True, + ) + + command = CmdPipelineShow([]) + nodes, edges, is_tree = command._build_graph(stage.path, outs=True) + assert set(nodes) == {"alice", "mary", "carol"} + assert set(edges) == {("carol", "mary"), ("mary", "alice")} From a454e788afe5b3f1242b499ea739a3866e48b89c Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Sat, 9 May 2020 23:58:56 +0800 Subject: [PATCH 2/5] less code modification --- dvc/command/pipeline.py | 45 ++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index 76f2dc1a70..387a585ec0 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -42,35 +42,38 @@ def _build_graph(self, target, commands=False, outs=False): target_stage = dvcfile.Dvcfile(self.repo, path).stages[name] G = get_pipeline(self.repo.pipelines, target_stage) - out_nodes = set() - if outs: - for stage in networkx.dfs_preorder_nodes(G, target_stage): - for out in stage.outs: - out_nodes.add(str(out)) - nodes = set() - edges = [] for stage in networkx.dfs_preorder_nodes(G, target_stage): if commands: if stage.cmd is None: continue nodes.add(stage.cmd) - for to_stage in G.neighbors(stage): - if to_stage.cmd: - edges.append((stage.cmd, to_stage.cmd)) + elif not outs: + nodes.add(stage.addressing) + + edges = [] + for from_stage, to_stage in networkx.edge_dfs(G, target_stage): + if commands: + if to_stage.cmd is None: + continue + edges.append((from_stage.cmd, to_stage.cmd)) elif outs: - for out in stage.outs: - if stage == target_stage: - nodes.add(str(out)) - if str(out) in nodes: - for dep in stage.deps: - if str(dep) in out_nodes: - nodes.add(str(dep)) - edges.append((str(out), str(dep))) + from_stage_deps = set( + [dep.path_info.parts for dep in from_stage.deps] + ) + for from_out in from_stage.outs: + if ( + from_stage != target_stage + and str(from_out) not in nodes + ): + continue + for to_out in to_stage.outs: + if to_out.path_info.parts in from_stage_deps: + nodes.add(str(from_out)) + nodes.add(str(to_out)) + edges.append((str(from_out), str(to_out))) else: - nodes.add(stage.addressing) - for to_stage in G.neighbors(stage): - edges.append((stage.addressing, to_stage.addressing)) + edges.append((from_stage.addressing, to_stage.addressing)) return list(nodes), edges, networkx.is_tree(G) From 4fd2995c22b177fe28c42f4b55c0b8c52ac9ba84 Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Sun, 10 May 2020 00:08:48 +0800 Subject: [PATCH 3/5] deepsource required --- dvc/command/pipeline.py | 6 +++--- tests/func/test_pipeline.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index 387a585ec0..8d71b7316d 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -58,9 +58,9 @@ def _build_graph(self, target, commands=False, outs=False): continue edges.append((from_stage.cmd, to_stage.cmd)) elif outs: - from_stage_deps = set( - [dep.path_info.parts for dep in from_stage.deps] - ) + from_stage_deps = { + dep.path_info.parts for dep in from_stage.deps + } for from_out in from_stage.outs: if ( from_stage != target_stage diff --git a/tests/func/test_pipeline.py b/tests/func/test_pipeline.py index a222aeadce..7d39ce30a0 100644 --- a/tests/func/test_pipeline.py +++ b/tests/func/test_pipeline.py @@ -275,7 +275,7 @@ def test_split_pipeline(tmp_dir, scm, dvc): ) command = CmdPipelineShow([]) - nodes, edges, is_tree = command._build_graph( + nodes, edges, _ = command._build_graph( stage.path, commands=False, outs=True ) assert set(nodes) == {"data", "data_train", "data_valid", "result"} @@ -320,13 +320,13 @@ def test_pipeline_ascii_multistage(tmp_dir, dvc, run_copy): run_copy("foo", "bar", name="copy-foo-bar") run_copy("bar", "foobar", single_stage=True) command = CmdPipelineShow([]) - nodes, edges, is_tree = command._build_graph("foobar.dvc") + nodes, edges, _ = command._build_graph("foobar.dvc") assert set(nodes) == {"dvc.yaml:copy-foo-bar", "foobar.dvc"} assert set(edges) == { ("foobar.dvc", "dvc.yaml:copy-foo-bar"), } - nodes, edges, is_tree = command._build_graph("dvc.yaml:copy-foo-bar") + nodes, *_ = command._build_graph("dvc.yaml:copy-foo-bar") assert set(nodes) == {"dvc.yaml:copy-foo-bar"} @@ -350,6 +350,6 @@ def test_pipeline_multi_outputs_stages(dvc): ) command = CmdPipelineShow([]) - nodes, edges, is_tree = command._build_graph(stage.path, outs=True) + nodes, edges, _ = command._build_graph(stage.path, outs=True) assert set(nodes) == {"alice", "mary", "carol"} assert set(edges) == {("carol", "mary"), ("mary", "alice")} From 106b165dd965829d33c3d092197fcfe2e9f80d4f Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Sun, 10 May 2020 00:29:39 +0800 Subject: [PATCH 4/5] deepsource required --- dvc/command/pipeline.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index 8d71b7316d..f51cf346f2 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -32,6 +32,24 @@ def _show(self, target, commands, outs, locked): else: logger.info(stage.addressing) + def _build_output_graph(self, G, target_stage): + import networkx + + nodes = set() + edges = [] + for from_stage, to_stage in networkx.edge_dfs(G, target_stage): + from_stage_deps = {dep.path_info.parts for dep in from_stage.deps} + for from_out in from_stage.outs: + if from_stage != target_stage and str(from_out) not in nodes: + continue + for to_out in to_stage.outs: + if to_out.path_info.parts not in from_stage_deps: + continue + nodes.add(str(from_out)) + nodes.add(str(to_out)) + edges.append((str(from_out), str(to_out))) + return nodes, edges + def _build_graph(self, target, commands=False, outs=False): import networkx from dvc import dvcfile @@ -57,24 +75,12 @@ def _build_graph(self, target, commands=False, outs=False): if to_stage.cmd is None: continue edges.append((from_stage.cmd, to_stage.cmd)) - elif outs: - from_stage_deps = { - dep.path_info.parts for dep in from_stage.deps - } - for from_out in from_stage.outs: - if ( - from_stage != target_stage - and str(from_out) not in nodes - ): - continue - for to_out in to_stage.outs: - if to_out.path_info.parts in from_stage_deps: - nodes.add(str(from_out)) - nodes.add(str(to_out)) - edges.append((str(from_out), str(to_out))) - else: + elif not outs: edges.append((from_stage.addressing, to_stage.addressing)) + if outs: + nodes, edges = self._build_output_graph(G, target_stage) + return list(nodes), edges, networkx.is_tree(G) def _show_ascii(self, target, commands, outs): From ef3705dff824d44edd1aafce18555cce7e57001b Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Sun, 10 May 2020 00:55:25 +0800 Subject: [PATCH 5/5] refactoring for reducing cognitive complexity. --- dvc/command/pipeline.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index f51cf346f2..4d1f3b9cc3 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -4,7 +4,6 @@ from dvc.command.base import CmdBase, append_doc_link, fix_subparsers from dvc.exceptions import DvcException - logger = logging.getLogger(__name__) @@ -32,22 +31,29 @@ def _show(self, target, commands, outs, locked): else: logger.info(stage.addressing) - def _build_output_graph(self, G, target_stage): + @staticmethod + def _build_output_graph(G, target_stage): import networkx + from itertools import product - nodes = set() + nodes = {str(out) for out in target_stage.outs} edges = [] + for from_stage, to_stage in networkx.edge_dfs(G, target_stage): from_stage_deps = {dep.path_info.parts for dep in from_stage.deps} - for from_out in from_stage.outs: - if from_stage != target_stage and str(from_out) not in nodes: - continue - for to_out in to_stage.outs: - if to_out.path_info.parts not in from_stage_deps: - continue - nodes.add(str(from_out)) - nodes.add(str(to_out)) - edges.append((str(from_out), str(to_out))) + to_outs = { + to_out + for to_out in to_stage.outs + if to_out.path_info.parts in from_stage_deps + } + from_outs = { + from_out + for from_out in from_stage.outs + if str(from_out) in nodes + } + nodes |= {str(to_out) for to_out in to_outs} + for from_out, to_out in product(from_outs, to_outs): + edges.append((str(from_out), str(to_out))) return nodes, edges def _build_graph(self, target, commands=False, outs=False):