Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions dvc/command/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from dvc.command.base import CmdBase, append_doc_link, fix_subparsers
from dvc.exceptions import DvcException


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -32,6 +31,31 @@ def _show(self, target, commands, outs, locked):
else:
logger.info(stage.addressing)

@staticmethod
def _build_output_graph(G, target_stage):
import networkx
from itertools import product

nodes = {str(out) for out in target_stage.outs}
edges = []

for from_stage, to_stage in networkx.edge_dfs(G, target_stage):
from_stage_deps = {dep.path_info.parts for dep in from_stage.deps}
to_outs = {
to_out
for to_out in to_stage.outs
if to_out.path_info.parts in from_stage_deps
}
from_outs = {
from_out
for from_out in from_stage.outs
if str(from_out) in nodes
}
nodes |= {str(to_out) for to_out in to_outs}
for from_out, to_out in product(from_outs, to_outs):
edges.append((str(from_out), str(to_out)))
return nodes, edges

def _build_graph(self, target, commands=False, outs=False):
import networkx
from dvc import dvcfile
Expand All @@ -48,10 +72,7 @@ def _build_graph(self, target, commands=False, outs=False):
if stage.cmd is None:
continue
nodes.add(stage.cmd)
elif outs:
for out in stage.outs:
nodes.add(str(out))
else:
elif not outs:
nodes.add(stage.addressing)

edges = []
Expand All @@ -60,13 +81,12 @@ def _build_graph(self, target, commands=False, outs=False):
if to_stage.cmd is None:
continue
edges.append((from_stage.cmd, to_stage.cmd))
elif outs:
for from_out in from_stage.outs:
for to_out in to_stage.outs:
edges.append((str(from_out), str(to_out)))
else:
elif not outs:
edges.append((from_stage.addressing, to_stage.addressing))

if outs:
nodes, edges = self._build_output_graph(G, target_stage)

return list(nodes), edges, networkx.is_tree(G)

def _show_ascii(self, target, commands, outs):
Expand Down
31 changes: 28 additions & 3 deletions tests/func/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def test_split_pipeline(tmp_dir, scm, dvc):
)

command = CmdPipelineShow([])
nodes, edges, is_tree = command._build_graph(
nodes, edges, _ = command._build_graph(
stage.path, commands=False, outs=True
)
assert set(nodes) == {"data", "data_train", "data_valid", "result"}
Expand Down Expand Up @@ -320,11 +320,36 @@ def test_pipeline_ascii_multistage(tmp_dir, dvc, run_copy):
run_copy("foo", "bar", name="copy-foo-bar")
run_copy("bar", "foobar", single_stage=True)
command = CmdPipelineShow([])
nodes, edges, is_tree = command._build_graph("foobar.dvc")
nodes, edges, _ = command._build_graph("foobar.dvc")
assert set(nodes) == {"dvc.yaml:copy-foo-bar", "foobar.dvc"}
assert set(edges) == {
("foobar.dvc", "dvc.yaml:copy-foo-bar"),
}

nodes, edges, is_tree = command._build_graph("dvc.yaml:copy-foo-bar")
nodes, *_ = command._build_graph("dvc.yaml:copy-foo-bar")
assert set(nodes) == {"dvc.yaml:copy-foo-bar"}


def test_pipeline_multi_outputs_stages(dvc):
dvc.run(
outs=["alice", "bob"],
cmd="echo alice>alice && echo bob>bob",
single_stage=True,
)
dvc.run(
deps=["alice"],
outs=["mary", "mike"],
cmd="echo mary>mary && echo mike>mike",
single_stage=True,
)
stage = dvc.run(
deps=["mary"],
outs=["carol"],
cmd="echo carol>carol",
single_stage=True,
)

command = CmdPipelineShow([])
nodes, edges, _ = command._build_graph(stage.path, outs=True)
assert set(nodes) == {"alice", "mary", "carol"}
assert set(edges) == {("carol", "mary"), ("mary", "alice")}