Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 16 additions & 30 deletions dvc/command/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

import argparse
import logging
from dvc.utils.compat import str

from dvc.command.base import append_doc_link
from dvc.command.base import CmdBase
from dvc.command.base import fix_subparsers
from dvc.command.base import CmdBase, append_doc_link, fix_subparsers
from dvc.exceptions import DvcException
from dvc.utils import relpath
from dvc.utils.compat import str


logger = logging.getLogger(__name__)
Expand All @@ -21,38 +18,32 @@ def _show(self, target, commands, outs, locked):

stage = Stage.load(self.repo, target)
G = self.repo.graph
stages = networkx.get_node_attributes(G, "stage")
node = relpath(stage.path, self.repo.root_dir)
nodes = networkx.dfs_postorder_nodes(G, node)
stages = networkx.dfs_postorder_nodes(G, stage)

if locked:
nodes = [n for n in nodes if stages[n].locked]
stages = [s for s in stages if s.locked]

for n in nodes:
for stage in stages:
if commands:
if stages[n].cmd is None:
if stage.cmd is None:
continue
logger.info(stages[n].cmd)
logger.info(stage.cmd)
elif outs:
for out in stages[n].outs:
for out in stage.outs:
logger.info(str(out))
else:
logger.info(n)
logger.info(stage.path_in_repo)

def __build_graph(self, target, commands, outs):
import networkx
from dvc.stage import Stage
from dvc.repo.graph import get_pipeline

stage = Stage.load(self.repo, target)
node = relpath(stage.path, self.repo.root_dir)

G = get_pipeline(self.repo.pipelines, node)
stages = networkx.get_node_attributes(G, "stage")
G = get_pipeline(self.repo.pipelines, stage)

nodes = []
for n in G:
stage = stages[n]
for stage in G:
if commands:
if stage.cmd is None:
continue
Expand All @@ -64,9 +55,7 @@ def __build_graph(self, target, commands, outs):
nodes.append(stage.relpath)

edges = []
for e in G.edges():
from_stage = stages[e[0]]
to_stage = stages[e[1]]
for from_stage, to_stage in G.edges():
if commands:
if to_stage.cmd is None:
continue
Expand Down Expand Up @@ -163,14 +152,11 @@ def run(self):

class CmdPipelineList(CmdBase):
def run(self):
import networkx

pipelines = self.repo.pipelines
for p in pipelines:
stages = networkx.get_node_attributes(p, "stage")
for stage in stages:
logger.info(stage)
if len(stages) != 0:
for pipeline in pipelines:
for stage in pipeline:
logger.info(stage.relpath)
if len(pipeline) != 0:
logger.info("=" * 80)
logger.info("{} pipelines total".format(len(pipelines)))

Expand Down
30 changes: 11 additions & 19 deletions dvc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,13 @@ class OutputDuplicationError(DvcException):
"""

def __init__(self, output, stages):
assert isinstance(output, str) or isinstance(output, builtin_str)
assert isinstance(stages, list)
assert all(
isinstance(stage, str) or isinstance(stage, builtin_str)
for stage in stages
)
assert isinstance(output, (str, builtin_str))
assert all(hasattr(stage, "relpath") for stage in stages)
msg = (
"file/directory '{}' is specified as an output in more than one "
"stage: {}\n"
"This is not allowed. Consider using a different output name."
).format(output, "\n ".join(stages))
).format(output, "\n ".join(s.relpath for s in stages))
super(OutputDuplicationError, self).__init__(msg)


Expand All @@ -75,21 +71,17 @@ class StagePathAsOutputError(DvcException):
an output of another stage.

Args:
cwd (str): path to the directory.
fname (str): path to the DVC-file that has cwd specified as an
output.
stage (Stage): a stage that is in some other stages output
output (str): an output covering the stage above
"""

def __init__(self, wdir, fname):
assert isinstance(wdir, str) or isinstance(wdir, builtin_str)
assert isinstance(fname, str) or isinstance(fname, builtin_str)
msg = (
"current working directory '{cwd}' is specified as an output in "
"'{fname}'. Use another CWD to prevent any data removal.".format(
cwd=wdir, fname=fname
def __init__(self, stage, output):
assert isinstance(output, (str, builtin_str))
super(StagePathAsOutputError, self).__init__(
"'{stage}' is within an output '{output}' of another stage".format(
stage=stage.relpath, output=output
)
)
super(StagePathAsOutputError, self).__init__(msg)


class CircularDependencyError(DvcException):
Expand Down Expand Up @@ -167,7 +159,7 @@ def __init__(self):
class CyclicGraphError(DvcException):
def __init__(self, stages):
assert isinstance(stages, list)
stages = "\n".join("\t- {}".format(stage) for stage in stages)
stages = "\n".join("\t- {}".format(stage.relpath) for stage in stages)
msg = (
"you've introduced a cycle in your pipeline that involves "
"the following stages:"
Expand Down
68 changes: 23 additions & 45 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,22 @@
from contextlib import contextmanager
from functools import wraps
from itertools import chain
from dvc.utils.compat import FileNotFoundError, fspath_py35, open as _open

from funcy import cached_property

from .graph import check_acyclic
from .graph import get_pipeline
from .graph import get_pipelines
from .graph import get_stages
from dvc.config import Config
from dvc.exceptions import FileMissingError
from dvc.exceptions import NotDvcRepoError
from dvc.exceptions import OutputNotFoundError
from dvc.exceptions import (
FileMissingError,
NotDvcRepoError,
OutputNotFoundError,
)
from dvc.ignore import DvcIgnoreFilter
from dvc.path_info import PathInfo
from dvc.remote.base import RemoteActionNotImplemented
from dvc.utils import relpath
from dvc.utils.fs import path_isin
from dvc.utils.compat import FileNotFoundError
from dvc.utils.compat import fspath_py35
from dvc.utils.compat import open as _open
from .graph import check_acyclic, get_pipeline, get_pipelines


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -190,32 +187,20 @@ def collect(self, target, with_deps=False, recursive=False, graph=None):
G = graph or self.graph

if not target:
return get_stages(G)
return list(G)

target = os.path.abspath(target)

if recursive and os.path.isdir(target):
attrs = nx.get_node_attributes(G, "stage")
nodes = [node for node in nx.dfs_postorder_nodes(G)]

ret = []
for node in nodes:
stage = attrs[node]
if path_isin(stage.path, target):
ret.append(stage)
return ret
stages = nx.dfs_postorder_nodes(G)
return [stage for stage in stages if path_isin(stage.path, target)]

stage = Stage.load(self, target)
if not with_deps:
return [stage]

node = relpath(stage.path, self.root_dir)
pipeline = get_pipeline(get_pipelines(G), node)

return [
pipeline.node[n]["stage"]
for n in nx.dfs_postorder_nodes(pipeline, node)
]
pipeline = get_pipeline(get_pipelines(G), stage)
return list(nx.dfs_postorder_nodes(pipeline, stage))

def collect_granular(self, target, *args, **kwargs):
if not target:
Expand Down Expand Up @@ -332,15 +317,15 @@ def _collect_graph(self, stages=None):
)

G = nx.DiGraph()
stages = stages or self.collect_stages()
stages = stages or self.stages
stages = [stage for stage in stages if stage]
outs = {}

for stage in stages:
for out in stage.outs:
if out.path_info in outs:
stages = [stage.relpath, outs[out.path_info].stage.relpath]
raise OutputDuplicationError(str(out), stages)
dup_stages = [stage, outs[out.path_info].stage]
raise OutputDuplicationError(str(out), dup_stages)
outs[out.path_info] = out

for stage in stages:
Expand All @@ -353,23 +338,19 @@ def _collect_graph(self, stages=None):
stage_path_info = PathInfo(stage.path)
for p in chain([stage_path_info], stage_path_info.parents):
if p in outs:
raise StagePathAsOutputError(stage.wdir, stage.relpath)
raise StagePathAsOutputError(stage, str(outs[p]))

for stage in stages:
node = relpath(stage.path, self.root_dir)

G.add_node(node, stage=stage)
G.add_node(stage)

for dep in stage.deps:
if dep.path_info is None:
continue

for out in outs:
if out.overlaps(dep.path_info):
dep_stage = outs[out].stage
dep_node = relpath(dep_stage.path, self.root_dir)
G.add_node(dep_node, stage=dep_stage)
G.add_edge(node, dep_node)
for out_path_info, out in outs.items():
if out_path_info.overlaps(dep.path_info):
G.add_node(out.stage)
G.add_edge(stage, out.stage)

check_acyclic(G)

Expand All @@ -394,7 +375,8 @@ def filter_dirs(dname):

return list(filter(filter_dirs, dirs))

def collect_stages(self):
@cached_property
def stages(self):
"""
Walks down the root directory looking for Dvcfiles,
skipping the directories that are related with
Expand Down Expand Up @@ -426,10 +408,6 @@ def collect_stages(self):

return stages

@cached_property
def stages(self):
return get_stages(self.graph)

def find_outs_by_path(self, path, outs=None, recursive=False, strict=True):
if not outs:
outs = [out for stage in self.stages for out in stage.outs]
Expand Down
6 changes: 0 additions & 6 deletions dvc/repo/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,3 @@ def get_pipelines(G):
import networkx as nx

return [G.subgraph(c).copy() for c in nx.weakly_connected_components(G)]


def get_stages(G):
import networkx

return list(networkx.get_node_attributes(G, "stage").values())
Loading