Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 30 additions & 27 deletions dvc/command/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,45 +31,50 @@ def _show(self, target, commands, outs, locked):
else:
logger.info(stage.path_in_repo)

def __build_graph(self, target, commands, outs):
def _build_graph(self, target, commands, outs):
import networkx
from dvc.stage import Stage
from dvc.repo.graph import get_pipeline

stage = Stage.load(self.repo, target)
G = get_pipeline(self.repo.pipelines, stage)
target_stage = Stage.load(self.repo, target)
G = get_pipeline(self.repo.pipelines, target_stage)

nodes = []
for stage in G:
nodes = set()
for stage in networkx.dfs_preorder_nodes(G, target_stage):
if commands:
if stage.cmd is None:
continue
nodes.append(stage.cmd)
nodes.add(stage.cmd)
elif outs:
for out in stage.outs:
nodes.append(str(out))
nodes.add(str(out))
for dep in stage.deps:
nodes.add(str(dep))
else:
nodes.append(stage.relpath)
nodes.add(stage.relpath)

edges = []
for from_stage, to_stage in G.edges():
if commands:
if to_stage.cmd is None:
continue
edges.append((from_stage.cmd, to_stage.cmd))
elif outs:
for from_out in from_stage.outs:
for to_out in to_stage.outs:
edges.append((str(from_out), str(to_out)))
else:
edges.append((from_stage.relpath, to_stage.relpath))

return nodes, edges, networkx.is_tree(G)
if outs:
for stage in networkx.dfs_preorder_nodes(G, target_stage):
for dep in stage.deps:
for out in stage.outs:
edges.append((str(out), str(dep)))
else:
for from_stage, to_stage in networkx.dfs_edges(G, target_stage):
if commands:
if to_stage.cmd is None:
continue
edges.append((from_stage.cmd, to_stage.cmd))
else:
edges.append((from_stage.relpath, to_stage.relpath))

return list(nodes), edges, networkx.is_tree(G)

def _show_ascii(self, target, commands, outs):
from dvc.dagascii import draw

nodes, edges, _ = self.__build_graph(target, commands, outs)
nodes, edges, _ = self._build_graph(target, commands, outs)

if not nodes:
return
Expand All @@ -79,7 +84,7 @@ def _show_ascii(self, target, commands, outs):
def _show_dependencies_tree(self, target, commands, outs):
from treelib import Tree

nodes, edges, is_tree = self.__build_graph(target, commands, outs)
nodes, edges, is_tree = self._build_graph(target, commands, outs)
if not nodes:
return
if not is_tree:
Expand All @@ -100,12 +105,12 @@ def _show_dependencies_tree(self, target, commands, outs):
observe_list.pop(0)
tree.show()

def __write_dot(self, target, commands, outs):
def _write_dot(self, target, commands, outs):
import io
import networkx
from networkx.drawing.nx_pydot import write_dot

_, edges, _ = self.__build_graph(target, commands, outs)
_, edges, _ = self._build_graph(target, commands, outs)
edges = [edge[::-1] for edge in edges]

simple_g = networkx.DiGraph()
Expand All @@ -126,9 +131,7 @@ def run(self):
target, self.args.commands, self.args.outs
)
elif self.args.dot:
self.__write_dot(
target, self.args.commands, self.args.outs
)
self._write_dot(target, self.args.commands, self.args.outs)
elif self.args.tree:
self._show_dependencies_tree(
target, self.args.commands, self.args.outs
Expand Down
22 changes: 22 additions & 0 deletions tests/func/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from dvc.main import main
from dvc.command.pipeline import CmdPipelineShow
from tests.basic_env import TestDvc
from tests.func.test_repro import TestRepro
from tests.func.test_repro import TestReproChangedDeepData
Expand Down Expand Up @@ -98,6 +99,27 @@ def test_dot_commands(self):
self.assertEqual(ret, 0)


def test_disconnected_stage(tmp_dir, dvc):
tmp_dir.dvc_gen({"base": "base"})

dvc.add("base")
dvc.run(deps=["base"], outs=["derived1"], cmd="echo derived1 > derived1")
dvc.run(deps=["base"], outs=["derived2"], cmd="echo derived2 > derived2")
final_stage = dvc.run(
deps=["derived1"], outs=["final"], cmd="echo final > final"
)

command = CmdPipelineShow([])
# Need to test __build_graph directly
nodes, edges, is_tree = command._build_graph(
final_stage.path, commands=False, outs=True
)

assert set(nodes) == {"final", "derived1", "base"}
assert edges == [("final", "derived1"), ("derived1", "base")]
assert is_tree is True


def test_print_locked_stages(tmp_dir, dvc, caplog):
tmp_dir.dvc_gen({"foo": "foo content", "bar": "bar content"})
dvc.lock_stage("foo.dvc")
Expand Down