diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index e8a3fd301f..f9d80f76c1 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -8,15 +8,26 @@ logger = logging.getLogger(__name__) +def _stage_repr(stage): + from dvc.stage import PipelineStage + + return ( + "{}:{}".format(stage.relpath, stage.name) + if isinstance(stage, PipelineStage) + else stage.relpath + ) + + class CmdPipelineShow(CmdBase): def _show(self, target, commands, outs, locked): import networkx - from dvc.dvcfile import Dvcfile + from dvc import dvcfile + from dvc.utils import parse_target - stage = Dvcfile(self.repo, target).load() - G = self.repo.graph + path, name = parse_target(target) + stage = dvcfile.Dvcfile(self.repo, path).load_one(name) + G = self.repo.pipeline_graph stages = networkx.dfs_postorder_nodes(G, stage) - if locked: stages = [s for s in stages if s.locked] @@ -29,14 +40,16 @@ def _show(self, target, commands, outs, locked): for out in stage.outs: logger.info(str(out)) else: - logger.info(stage.path_in_repo) + logger.info(_stage_repr(stage)) - def _build_graph(self, target, commands, outs): + def _build_graph(self, target, commands=False, outs=False): import networkx - from dvc.dvcfile import Dvcfile + from dvc import dvcfile from dvc.repo.graph import get_pipeline + from dvc.utils import parse_target - target_stage = Dvcfile(self.repo, target).load() + path, name = parse_target(target) + target_stage = dvcfile.Dvcfile(self.repo, path).load_one(name) G = get_pipeline(self.repo.pipelines, target_stage) nodes = set() @@ -49,7 +62,7 @@ def _build_graph(self, target, commands, outs): for out in stage.outs: nodes.add(str(out)) else: - nodes.add(stage.relpath) + nodes.add(_stage_repr(stage)) edges = [] for from_stage, to_stage in networkx.edge_dfs(G, target_stage): @@ -62,7 +75,7 @@ def _build_graph(self, target, commands, outs): for to_out in to_stage.outs: edges.append((str(from_out), str(to_out))) else: - edges.append((from_stage.relpath, to_stage.relpath)) + edges.append((_stage_repr(from_stage), _stage_repr(to_stage))) return list(nodes), edges, networkx.is_tree(G) @@ -150,7 +163,7 @@ def run(self): pipelines = self.repo.pipelines for pipeline in pipelines: for stage in pipeline: - logger.info(stage.relpath) + logger.info(_stage_repr(stage)) if len(pipeline) != 0: logger.info("=" * 80) logger.info("{} pipelines total".format(len(pipelines))) diff --git a/dvc/command/run.py b/dvc/command/run.py index ced17d7354..0cf2730e83 100644 --- a/dvc/command/run.py +++ b/dvc/command/run.py @@ -51,6 +51,7 @@ def run(self): outs_persist=self.args.outs_persist, outs_persist_no_cache=self.args.outs_persist_no_cache, always_changed=self.args.always_changed, + name=self.args.name, ) except DvcException: logger.exception("failed to run command") @@ -96,6 +97,7 @@ def add_parser(subparsers, parent_parser): default=[], help="Declare dependencies for reproducible cmd.", ) + run_parser.add_argument("-n", "--name", help=argparse.SUPPRESS) run_parser.add_argument( "-o", "--outs", diff --git a/dvc/dvcfile.py b/dvc/dvcfile.py index d760909404..dd7cb94430 100644 --- a/dvc/dvcfile.py +++ b/dvc/dvcfile.py @@ -2,11 +2,14 @@ import re import logging +from funcy import project + import dvc.prompt as prompt from voluptuous import MultipleInvalid from dvc import dependency, output +from dvc.exceptions import DvcException from dvc.stage.exceptions import ( StageFileBadNameError, StageFileDoesNotExistError, @@ -29,6 +32,39 @@ TAG_REGEX = r"^(?P.*)@(?P[^\\/@:]*)$" +class MultiStageFileLoadError(DvcException): + def __init__(self): + super().__init__("Cannot load multi-stage file.") + + +def _serialize_stage(stage): + outs_bucket = {} + for o in stage.outs: + bucket_key = ["metrics"] if o.metric else ["outs"] + + if not o.metric and o.persist: + bucket_key += ["persist"] + if not o.use_cache: + bucket_key += ["no_cache"] + key = "_".join(bucket_key) + outs_bucket[key] = outs_bucket.get(key, []) + [o.def_path] + + return { + stage.name: { + key: value + for key, value in { + stage.PARAM_CMD: stage.cmd, + stage.PARAM_WDIR: stage.resolve_wdir(), + stage.PARAM_DEPS: [d.def_path for d in stage.deps], + **outs_bucket, + stage.PARAM_LOCKED: stage.locked, + stage.PARAM_ALWAYS_CHANGED: stage.always_changed, + }.items() + if value + } + } + + class Dvcfile: def __init__(self, repo, path): self.repo = repo @@ -77,9 +113,61 @@ def _get_path_tag(s): return s, None return match.group("path"), match.group("tag") - def dump(self, stage): + @property + def lockfile(self): + return os.path.splitext(self.path)[0] + ".lock" + + def dump(self, stage, update_dvcfile=False): """Dumps given stage appropriately in the dvcfile.""" - self.dump_single_stage(stage) + if not hasattr(stage, "name"): + self.dump_single_stage(stage) + return + + self.dump_lockfile(stage) + if update_dvcfile and not stage.is_data_source: + self.dump_multistage_dvcfile(stage) + + from .stage import Stage, create_stage + + for out in stage.outs: + if not out.use_cache: + continue + s = create_stage( + Stage, + stage.repo, + os.path.join(stage.wdir, out.def_path + DVC_FILE_SUFFIX), + wdir=stage.wdir, + ) + s.outs = [out] + s.md5 = s._compute_md5() + Dvcfile(s.repo, s.path).dump_single_stage(s) + + def dump_lockfile(self, stage): + from . import lockfile + + lockfile.dump(self.repo, self.lockfile, stage) + self.repo.scm.track_file(relpath(self.lockfile)) + + def dump_multistage_dvcfile(self, stage): + from dvc.utils.stage import parse_stage_for_update, dump_stage_file + from dvc.schema import COMPILED_MULTI_STAGE_SCHEMA + + path = self.path + if not os.path.exists(path): + open(path, "w+").close() + + with open(path, "r") as fd: + data = parse_stage_for_update(fd.read(), path) + + if not self.is_multi_stage(data): + raise MultiStageFileLoadError + + # handle this in Stage::dumpd() + data["stages"] = data.get("stages", {}) + data["stages"].update(_serialize_stage(stage)) + + dump_stage_file(path, COMPILED_MULTI_STAGE_SCHEMA(data)) + self.repo.scm.track_file(relpath(path)) def dump_single_stage(self, stage): self.check_dvc_filename(self.path) @@ -108,10 +196,7 @@ def dump_single_stage(self, stage): self.repo.scm.track_file(relpath(self.path)) - def load(self): - """Loads single stage.""" - from dvc.stage import Stage - + def _load(self): # it raises the proper exceptions by priority: # 1. when the file doesn't exists # 2. filename is not a DVC-file @@ -123,27 +208,71 @@ def load(self): with self.repo.tree.open(self.path) as fd: stage_text = fd.read() d = parse_stage(stage_text, self.path) + return d, stage_text + + def load_one(self, target=None): + data, raw = self._load() + if not self.is_multi_stage(data): + if target: + logger.warning( + "Ignoring target name '%s' as it's a single stage file.", + target, + ) + return self._load_single_stage(data, raw) - Dvcfile.validate(d, fname=relpath(self.path)) - path = os.path.abspath(self.path) - - stage = Stage( - repo=self.repo, - path=path, - wdir=os.path.abspath( - os.path.join( - os.path.dirname(path), d.get(Stage.PARAM_WDIR, ".") + if not target: + raise DvcException( + "No target provided for multi-stage file '{}'.".format( + self.path ) - ), - cmd=d.get(Stage.PARAM_CMD), - md5=d.get(Stage.PARAM_MD5), - locked=d.get(Stage.PARAM_LOCKED, False), - tag=self.tag, - always_changed=d.get(Stage.PARAM_ALWAYS_CHANGED, False), - # We store stage text to apply updates to the same structure - stage_text=stage_text, + ) + + if not self.has_stage(name=target, data=data): + raise DvcException( + "Target '{}' does not exist " + "inside '{}' multi-stage file.".format(target, self.path) + ) + + stages = self._load_multi_stage( + {"stages": {target: self._get_stage_data(target, data)}} + ) + assert stages + return stages[0] + + @staticmethod + def _get_stage_data(name, data): + return data.get("stages", {}).get(name) + + def has_stage(self, name, data=None): + if not data: + data, _ = self._load() + return bool(self._get_stage_data(name, data)) + + def load(self): + """Loads single stage.""" + data, raw = self._load() + if not self.is_multi_stage(data): + return self._load_single_stage(data, raw) + + raise MultiStageFileLoadError + + def load_all(self): + data, raw = self._load() + return ( + [self._load_single_stage(data, raw)] + if not self.is_multi_stage(data) + else self._load_multi_stage(data) ) + def _load_single_stage(self, d: dict, stage_text: str): + from dvc.stage import Stage, loads_from + + path = os.path.abspath(self.path) + wdir = os.path.abspath( + os.path.join(os.path.dirname(path), d.get(Stage.PARAM_WDIR, ".")) + ) + stage = loads_from(Stage, self.repo, path, wdir, d) + stage._stage_text, stage.tag = stage_text, self.tag stage.deps = dependency.loadd_from( stage, d.get(Stage.PARAM_DEPS) or [] ) @@ -151,22 +280,105 @@ def load(self): return stage + def load_multi(self): + data, _ = self._load() + if self.is_multi_stage(data): + return self._load_multi_stage(data) + raise DvcException( + "Cannot load multiple stages from single stage file." + ) + + def _load_multi_stage(self, data): + from . import lockfile + from .stage import PipelineStage, Stage, loads_from + + stages = [] + path = os.path.abspath(self.path) + lock_data = lockfile.load(self.repo, self.lockfile) + for stage_name, d in data.get("stages", {}).items(): + lock_stage_data = lock_data.get(stage_name, {}) + wdir = os.path.abspath( + os.path.join( + os.path.dirname(path), d.get(Stage.PARAM_WDIR, ".") + ) + ) + stage = loads_from(PipelineStage, self.repo, path, wdir, d) + stage.name = stage_name + stage.cmd_changed = lock_stage_data.get(Stage.PARAM_CMD) != d.get( + Stage.PARAM_CMD + ) + + stage._fill_stage_dependencies(**project(d, ["deps"])) + stage._fill_stage_outputs(**d) + stages.append(stage) + + for dep in stage.deps: + dep.info[dep.remote.PARAM_CHECKSUM] = lock_stage_data.get( + Stage.PARAM_DEPS, {} + ).get(dep.def_path) + + if stage.cmd_changed: + continue + + for out in stage.outs: + out.info[out.remote.PARAM_CHECKSUM] = lock_stage_data.get( + Stage.PARAM_OUTS, {} + ).get(out.def_path) + + return stages + @staticmethod - def validate(d, fname=None): - from dvc.stage.schema import SINGLE_STAGE_SCHEMA + def validate_single_stage(d, fname=None): + from dvc.schema import COMPILED_SINGLE_STAGE_SCHEMA + + try: + COMPILED_SINGLE_STAGE_SCHEMA(d) + except MultipleInvalid as exc: + raise StageFileFormatError(fname, exc) + + @staticmethod + def validate_multi_stage(d, fname=None): + from dvc.schema import COMPILED_MULTI_STAGE_SCHEMA try: - SINGLE_STAGE_SCHEMA(d) + COMPILED_MULTI_STAGE_SCHEMA(d) except MultipleInvalid as exc: raise StageFileFormatError(fname, exc) + @staticmethod + def validate(d, fname=None): + Dvcfile.validate_single_stage(d, fname) + + def is_multi_stage(self, d=None): + # TODO: maybe the following heuristics is enough? + if d is None: + d = self._load()[0] + check_multi_stage = d.get("stages") or not d + exc = None + if check_multi_stage: + try: + self.validate_multi_stage(d, self.path) + return True + except StageFileFormatError as _exc: + exc = _exc + + try: + self.validate_single_stage(d, self.path) + return False + except StageFileFormatError: + if check_multi_stage: + raise exc + + self.validate_multi_stage(d, self.path) + return True + def overwrite_with_prompt(self, force=False): if not self.exists(): return msg = ( "'{}' already exists. Do you wish to run the command and " - "overwrite it?".format(self.path) + "overwrite it?".format(relpath(self.path)) ) if not (force or prompt.confirm(msg)): raise StageFileAlreadyExistsError(self.path) diff --git a/dvc/lockfile.py b/dvc/lockfile.py new file mode 100644 index 0000000000..5f39d1e5c7 --- /dev/null +++ b/dvc/lockfile.py @@ -0,0 +1,82 @@ +import json + +from typing import TYPE_CHECKING +from collections import OrderedDict + +from dvc.exceptions import DvcException +from dvc.schema import COMPILED_LOCKFILE_SCHEMA +from voluptuous import MultipleInvalid + +if TYPE_CHECKING: + from dvc.repo import Repo + from dvc.stage import PipelineStage + + +class LockfileCorruptedError(DvcException): + def __init__(self, path): + super().__init__("Lockfile '{}' is corrupted.".format(path)) + + +def serialize_stage(stage: "PipelineStage") -> OrderedDict: + assert stage.cmd + assert stage.name + + deps = OrderedDict( + [ + (dep.def_path, dep.remote.get_checksum(dep.path_info),) + for dep in stage.deps + if dep.remote.get_checksum(dep.path_info) + ] + ) + outs = OrderedDict( + [ + (out.def_path, out.remote.get_checksum(out.path_info),) + for out in stage.outs + if out.remote.get_checksum(out.path_info) + ] + ) + return OrderedDict( + [ + ( + stage.name, + OrderedDict( + [("cmd", stage.cmd), ("deps", deps,), ("outs", outs)] + ), + ) + ] + ) + + +def exists(repo: "Repo", path: str) -> bool: + return repo.tree.exists(path) + + +def read(repo: "Repo", path: str) -> dict: + with repo.tree.open(path) as f: + return json.load(f, object_pairs_hook=OrderedDict) + + +def write(repo: "Repo", path: str, data: dict) -> dict: + with repo.tree.open(path, "w+") as f: + json.dump(data, f) + + +def load(repo: "Repo", path: str) -> dict: + if not exists(repo, path): + return {} + try: + return COMPILED_LOCKFILE_SCHEMA(read(repo, path)) + except MultipleInvalid: + raise LockfileCorruptedError(path) + + +def dump(repo: "Repo", path: str, stage: "PipelineStage"): + stage_data = serialize_stage(stage) + + if not exists(repo, path): + data = stage_data + else: + data = read(repo, path) + data.update(stage_data) + + write(repo, path, COMPILED_LOCKFILE_SCHEMA(data)) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 79414c4e59..39920cf7ca 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -174,7 +174,7 @@ def _ignore(self): self.scm.ignore_list(flist) - def check_modified_graph(self, new_stages): + def check_modified_graph(self, new_stages, old_stages=None): """Generate graph including the new stage to check for errors""" # Building graph might be costly for the ones with many DVC-files, # so we provide this undocumented hack to skip it. See [1] for @@ -189,7 +189,29 @@ def check_modified_graph(self, new_stages): # # [1] https://github.com/iterative/dvc/issues/2671 if not getattr(self, "_skip_graph_checks", False): - self._collect_graph(self.stages + new_stages) + self._collect_graph((old_stages or self.stages) + new_stages) + + def _collect_inside(self, path, graph): + import networkx as nx + + stages = nx.dfs_postorder_nodes(graph or self.pipeline_graph) + return [stage for stage in stages if path_isin(stage.path, path)] + + def collect_for_pipelines( + self, path=None, name=None, recursive=False, graph=None + ): + from ..dvcfile import Dvcfile + + if not path: + return list(graph) if graph else self.pipeline_stages + + path = os.path.abspath(path) + if recursive and os.path.isdir(path): + return self._collect_inside(path, graph or self.pipeline_graph) + + dvcfile = Dvcfile(self, path) + dvcfile.check_file_exists() + return [dvcfile.load_one(name)] def collect(self, target, with_deps=False, recursive=False, graph=None): import networkx as nx @@ -201,8 +223,7 @@ def collect(self, target, with_deps=False, recursive=False, graph=None): target = os.path.abspath(target) if recursive and os.path.isdir(target): - stages = nx.dfs_postorder_nodes(graph or self.graph) - return [stage for stage in stages if path_isin(stage.path, target)] + return self._collect_inside(target, graph or self.graph) stage = Dvcfile(self, target).load() @@ -388,18 +409,17 @@ def _collect_graph(self, stages=None): overlapping.extend(outs.values(prefix=dep_key)) G.add_edges_from((stage, out.stage) for out in overlapping) - check_acyclic(G) return G @cached_property def graph(self): - return self._collect_graph() + return self._collect_graph(self.stages) @cached_property def pipelines(self): - return get_pipelines(self.graph) + return get_pipelines(self.pipeline_graph) @cached_property def stages(self): @@ -412,9 +432,24 @@ def stages(self): NOTE: For large repos, this could be an expensive operation. Consider using some memoization. """ - from ..dvcfile import Dvcfile + return self._collect_stages()[0] + + @cached_property + def pipeline_stages(self): + return self._collect_stages()[1] + + @cached_property + def pipeline_graph(self): + return self._collect_graph(self.pipeline_stages) - stages = [] + def _collect_stages(self): + from dvc.dvcfile import Dvcfile + from dvc.stage import PipelineStage + + pipeline_stages = [] + single_stages = [] + output_stages = [] + ignored_outs = [] outs = set() for root, dirs, files in self.tree.walk(self.root_dir): @@ -422,16 +457,49 @@ def stages(self): path = os.path.join(root, fname) if not Dvcfile.is_valid_filename(path): continue - stage = Dvcfile(self, path).load() - stages.append(stage) + stgs = Dvcfile(self, path).load_all() + ignored_outs.extend( + out + for stage in stgs + if isinstance(stage, PipelineStage) + for out in stage.outs + ) - for out in stage.outs: - if out.scheme == "local": - outs.add(out.fspath) + for stage in stgs: + stages = ( + output_stages + if stage.is_data_source + else pipeline_stages + ) + stages.append(stage) + if not ( + isinstance(stage, PipelineStage) + or stage.is_data_source + ): + single_stages.append(stage) + for out in stage.outs: + if out.scheme == "local": + outs.add(out.fspath) dirs[:] = [d for d in dirs if os.path.join(root, d) not in outs] - return stages + # DVC files are generated by multi-stage for data management. + # We need to ignore those stages for pipelines_stages, but still + # should be collected for output stages. + _output_stages = [ + stage + for stage in output_stages + if all( + stage.outs and out.fspath != stage.outs[0].fspath + for out in ignored_outs + ) + ] + # Old single-stages are used for both outputs and pipelines + # so they go into both buckets: pipeline_stages and output stages. + return ( + output_stages + single_stages, + pipeline_stages + _output_stages, + ) def find_outs_by_path(self, path, outs=None, recursive=False, strict=True): if not outs: @@ -528,3 +596,5 @@ def _reset(self): self.__dict__.pop("stages", None) self.__dict__.pop("pipelines", None) self.__dict__.pop("dvcignore", None) + self.__dict__.pop("pipeline_graph", None) + self.__dict__.pop("pipeline_stages", None) diff --git a/dvc/repo/add.py b/dvc/repo/add.py index bc31bb72e8..c597418083 100644 --- a/dvc/repo/add.py +++ b/dvc/repo/add.py @@ -115,7 +115,7 @@ def _find_all_targets(repo, target, recursive): def _create_stages(repo, targets, fname, pbar=None): - from dvc.stage import Stage + from dvc.stage import Stage, create_stage stages = [] @@ -126,7 +126,7 @@ def _create_stages(repo, targets, fname, pbar=None): unit="file", ): path, wdir, out = resolve_paths(repo, out) - stage = Stage.create(repo, fname or path, wdir=wdir, outs=[out]) + stage = create_stage(Stage, repo, fname or path, wdir=wdir, outs=[out]) if stage: Dvcfile(repo, stage.path).overwrite_with_prompt(force=True) diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index 7a78cebda4..32cdf3de7d 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -10,7 +10,7 @@ @scm_context def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): from dvc.dvcfile import Dvcfile - from dvc.stage import Stage + from dvc.stage import Stage, create_stage out = resolve_output(url, out) path, wdir, out = resolve_paths(self, out) @@ -19,8 +19,14 @@ def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): if os.path.exists(url) and path_isin(os.path.abspath(url), self.root_dir): url = relpath(url, wdir) - stage = Stage.create( - self, fname or path, wdir=wdir, deps=[url], outs=[out], erepo=erepo, + stage = create_stage( + Stage, + self, + fname or path, + wdir=wdir, + deps=[url], + outs=[out], + erepo=erepo, ) if stage is None: diff --git a/dvc/repo/lock.py b/dvc/repo/lock.py index ca7f7c4107..d9f8d5f83a 100644 --- a/dvc/repo/lock.py +++ b/dvc/repo/lock.py @@ -3,11 +3,13 @@ @locked def lock(self, target, unlock=False): - from ..dvcfile import Dvcfile + from .. import dvcfile + from dvc.utils import parse_target - dvcfile = Dvcfile(self, target) - stage = dvcfile.load() + path, target = parse_target(target) + dvcfile = dvcfile.Dvcfile(self, path) + stage = dvcfile.load_one(target) stage.locked = False if unlock else True - dvcfile.dump(stage) + dvcfile.dump(stage, update_dvcfile=True) return stage diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index bb31368c02..2e9697c886 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -61,7 +61,8 @@ def reproduce( all_pipelines=False, **kwargs ): - from ..dvcfile import Dvcfile + from .. import dvcfile + from dvc.utils import parse_target if not target and not all_pipelines: raise InvalidArgumentError( @@ -72,14 +73,15 @@ def reproduce( if not interactive: kwargs["interactive"] = self.config["core"].get("interactive", False) - active_graph = _get_active_graph(self.graph) + active_graph = _get_active_graph(self.pipeline_graph) active_pipelines = get_pipelines(active_graph) + path, name = parse_target(target) if pipeline or all_pipelines: if all_pipelines: pipelines = active_pipelines else: - stage = Dvcfile(self, target).load() + stage = dvcfile.Dvcfile(self, path).load_one(name) pipelines = [get_pipeline(active_pipelines, stage)] targets = [] @@ -88,7 +90,9 @@ def reproduce( if pipeline.in_degree(stage) == 0: targets.append(stage) else: - targets = self.collect(target, recursive=recursive, graph=active_graph) + targets = self.collect_for_pipelines( + path, name=name, recursive=recursive, graph=active_graph + ) ret = [] for target in targets: diff --git a/dvc/repo/run.py b/dvc/repo/run.py index 61feb84c52..274b956993 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -2,41 +2,61 @@ from . import locked from .scm_context import scm_context +from ..exceptions import DvcException +from ..utils import relpath + +from funcy import first, concat + + +def _get_file_path(**kwargs): + from dvc.dvcfile import DVC_FILE_SUFFIX, DVC_FILE + + out = first( + concat( + kwargs.get("outs", []), + kwargs.get("outs_no_cache", []), + kwargs.get("metrics", []), + kwargs.get("metrics_no_cache", []), + kwargs.get("outs_persist", []), + kwargs.get("outs_persist_no_cache", []), + ) + ) + + return ( + os.path.basename(os.path.normpath(out)) + DVC_FILE_SUFFIX + if out + else DVC_FILE + ) @locked @scm_context def run(self, fname=None, no_exec=False, **kwargs): - from dvc.stage import Stage - from dvc.dvcfile import Dvcfile, DVC_FILE_SUFFIX, DVC_FILE - - outs = ( - kwargs.get("outs", []) - + kwargs.get("outs_no_cache", []) - + kwargs.get("metrics", []) - + kwargs.get("metrics_no_cache", []) - + kwargs.get("outs_persist", []) - + kwargs.get("outs_persist_no_cache", []) - ) + from dvc.stage import PipelineStage, Stage, create_stage + from dvc.dvcfile import DVC_FILE, Dvcfile - if outs: - base = os.path.basename(os.path.normpath(outs[0])) - path = base + DVC_FILE_SUFFIX - else: - path = DVC_FILE + stage_cls, path = PipelineStage, fname or DVC_FILE + if not kwargs.get("name"): + kwargs.pop("name", None) + stage_cls, path = Stage, fname or _get_file_path(**kwargs) - stage = Stage.create(self, fname or path, **kwargs) + stage = create_stage(stage_cls, repo=self, path=path, **kwargs) if stage is None: return None - dvcfile = Dvcfile(self, stage.path) - dvcfile.overwrite_with_prompt(force=kwargs.get("overwrite", True)) - - self.check_modified_graph([stage]) - + dvcfile = Dvcfile(self, path) + if dvcfile.exists() and not dvcfile.is_multi_stage(): + if stage_cls == PipelineStage: + raise DvcException( + "'{}' is a single-stage dvcfile. Please use " + "`-f ` and try again.`.".format( + relpath(dvcfile.path) + ) + ) + dvcfile.overwrite_with_prompt(force=kwargs.get("overwrite", True)) + + self.check_modified_graph([stage], self.pipeline_stages) if not no_exec: stage.run(no_commit=kwargs.get("no_commit", False)) - - dvcfile.dump(stage) - + dvcfile.dump(stage, update_dvcfile=True) return stage diff --git a/dvc/schema.py b/dvc/schema.py new file mode 100644 index 0000000000..fab8f17a00 --- /dev/null +++ b/dvc/schema.py @@ -0,0 +1,41 @@ +from dvc.stage.params import StageParams, OutputParams +from dvc import dependency, output + +from voluptuous import Any, Schema, Optional, Required + + +STAGES = "stages" +SINGLE_STAGE_SCHEMA = { + StageParams.PARAM_MD5: output.CHECKSUM_SCHEMA, + StageParams.PARAM_CMD: Any(str, None), + StageParams.PARAM_WDIR: Any(str, None), + StageParams.PARAM_DEPS: Any([dependency.SCHEMA], None), + StageParams.PARAM_OUTS: Any([output.SCHEMA], None), + StageParams.PARAM_LOCKED: bool, + StageParams.PARAM_META: object, + StageParams.PARAM_ALWAYS_CHANGED: bool, +} + +LOCK_FILE_STAGE_SCHEMA = { + Required(StageParams.PARAM_CMD): str, + Required(StageParams.PARAM_DEPS): {str: output.CHECKSUM_SCHEMA}, + Required(StageParams.PARAM_OUTS): {str: output.CHECKSUM_SCHEMA}, +} +LOCKFILE_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA} + +SINGLE_PIPELINE_STAGE_SCHEMA = { + str: { + StageParams.PARAM_CMD: str, + Optional(StageParams.PARAM_WDIR): str, + Optional(StageParams.PARAM_DEPS): [str], + Optional(StageParams.PARAM_LOCKED): bool, + Optional(StageParams.PARAM_META): object, + Optional(StageParams.PARAM_ALWAYS_CHANGED): bool, + **{Optional(p.value): [str] for p in OutputParams}, + } +} +MULTI_STAGE_SCHEMA = {STAGES: SINGLE_PIPELINE_STAGE_SCHEMA} + +COMPILED_SINGLE_STAGE_SCHEMA = Schema(SINGLE_STAGE_SCHEMA) +COMPILED_MULTI_STAGE_SCHEMA = Schema(MULTI_STAGE_SCHEMA) +COMPILED_LOCKFILE_SCHEMA = Schema(LOCKFILE_SCHEMA) diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index c2c1c1d9c6..bd313107df 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -7,6 +7,8 @@ from itertools import chain +from funcy import project + import dvc.dependency as dependency import dvc.output as output import dvc.prompt as prompt @@ -22,17 +24,66 @@ MissingDep, MissingDataSource, ) -from . import schema +from . import params from dvc.utils import dict_md5 from dvc.utils import fix_env from dvc.utils import relpath from dvc.utils.fs import path_isin - +from .params import OutputParams logger = logging.getLogger(__name__) -class Stage(schema.StageParams): +def loads_from(cls, repo, path, wdir, data): + kw = { + "repo": repo, + "path": path, + "wdir": wdir, + **project( + data, + [ + Stage.PARAM_CMD, + Stage.PARAM_LOCKED, + Stage.PARAM_ALWAYS_CHANGED, + Stage.PARAM_MD5, + "name", + ], + ), + } + return cls(**kw) + + +def create_stage(cls, repo, path, **kwargs): + from dvc.dvcfile import Dvcfile + + wdir = os.path.abspath(kwargs.get("wdir", None) or os.curdir) + path = os.path.abspath(path) + Dvcfile.check_dvc_filename(path) + cls._check_stage_path(repo, wdir, is_wdir=kwargs.get("wdir")) + cls._check_stage_path(repo, os.path.dirname(path)) + + stage = loads_from(cls, repo, path, wdir, kwargs) + stage._fill_stage_outputs(**kwargs) + stage._fill_stage_dependencies(**kwargs) + stage._check_circular_dependency() + stage._check_duplicated_arguments() + + if stage and stage.dvcfile.exists(): + has_persist_outs = any(out.persist for out in stage.outs) + ignore_build_cache = ( + kwargs.get("ignore_build_cache", False) or has_persist_outs + ) + if has_persist_outs: + logger.warning("Build cache is ignored when persisting outputs.") + + if not ignore_build_cache and stage.can_be_skipped: + logger.info("Stage is cached, skipping.") + return None + + return stage + + +class Stage(params.StageParams): def __init__( self, repo, @@ -46,6 +97,7 @@ def __init__( tag=None, always_changed=False, stage_text=None, + dvcfile=None, ): if deps is None: deps = [] @@ -63,6 +115,7 @@ def __init__( self.tag = tag self.always_changed = always_changed self._stage_text = stage_text + self._dvcfile = dvcfile @property def path(self): @@ -72,6 +125,26 @@ def path(self): def path(self, path): self._path = path + @property + def dvcfile(self): + if self.path and self._dvcfile and self.path == self._dvcfile.path: + return self._dvcfile + + if not self.path: + raise DvcException( + "Stage does not have any path set " + "and is detached from dvcfile." + ) + + from dvc.dvcfile import Dvcfile + + self._dvcfile = Dvcfile(self.repo, self.path) + return self._dvcfile + + @dvcfile.setter + def dvcfile(self, dvcfile): + self._dvcfile = dvcfile + def __repr__(self): return "Stage: '{path}'".format( path=self.relpath if self.path else "No path" @@ -82,7 +155,7 @@ def __hash__(self): def __eq__(self, other): return ( - isinstance(other, Stage) + self.__class__ == other.__class__ and self.repo is other.repo and self.path_in_repo == other.path_in_repo ) @@ -173,18 +246,19 @@ def _changed_md5(self): @rwlocked(read=["deps", "outs"]) def changed(self): + if self._changed(): + logger.warning("{} changed.".format(self)) + return True + + logger.debug("{} didn't change.".format(self)) + return False + + def _changed(self): # Short-circuit order: stage md5 is fast, deps are expected to change - ret = ( + return ( self._changed_md5() or self._changed_deps() or self._changed_outs() ) - if ret: - logger.warning("Stage '{}' changed.".format(self.relpath)) - else: - logger.debug("Stage '{}' didn't change.".format(self.relpath)) - - return ret - @rwlocked(write=["outs"]) def remove_outs(self, ignore_remove=False, force=False): """Used mainly for `dvc remove --outs` and :func:`Stage.reproduce`.""" @@ -273,6 +347,9 @@ def can_be_skipped(self): self.is_cached and not self.is_callback and not self.always_changed ) + def reload(self): + return self.dvcfile.load() + @property def is_cached(self): """ @@ -280,9 +357,8 @@ def is_cached(self): """ from dvc.remote.local import RemoteLOCAL from dvc.remote.s3 import RemoteS3 - from dvc.dvcfile import Dvcfile - old = Dvcfile(self.repo, self.path).load() + old = self.reload() if old._changed_outs(): return False @@ -319,66 +395,11 @@ def is_cached(self): return True - @staticmethod - def create(repo, path, **kwargs): - from dvc.dvcfile import Dvcfile - - wdir = kwargs.get("wdir", None) or os.curdir - - wdir = os.path.abspath(wdir) - path = os.path.abspath(path) - - Dvcfile.check_dvc_filename(path) - - Stage._check_stage_path(repo, wdir, is_wdir=kwargs.get("wdir")) - Stage._check_stage_path(repo, os.path.dirname(path)) - - stage = Stage( - repo=repo, - path=path, - wdir=wdir, - cmd=kwargs.get("cmd", None), - locked=kwargs.get("locked", False), - always_changed=kwargs.get("always_changed", False), - ) - - stage._fill_stage_outputs(**kwargs) - stage._fill_stage_dependencies(**kwargs) - - stage._check_circular_dependency() - stage._check_duplicated_arguments() - - Dvcfile.check_dvc_filename(path) - - dvcfile = Dvcfile(stage.repo, stage.path) - if dvcfile.exists(): - has_persist_outs = any(out.persist for out in stage.outs) - ignore_build_cache = ( - kwargs.get("ignore_build_cache", False) or has_persist_outs - ) - if has_persist_outs: - logger.warning( - "Build cache is ignored when persisting outputs." - ) - - if not ignore_build_cache and stage.can_be_skipped: - logger.info("Stage is cached, skipping.") - return None - - return stage - def _fill_stage_outputs(self, **kwargs): assert not self.outs self.outs = [] - for key in [ - "outs", - "metrics", - "outs_persist", - "outs_no_cache", - "metrics_no_cache", - "outs_persist_no_cache", - ]: + for key in (p.value for p in OutputParams): self.outs += output.loads_from( self, kwargs.get(key, []), @@ -400,18 +421,19 @@ def _fix_outs_deps_path(self, wdir): if out.is_in_repo: out.def_path = relpath(out.path_info, wdir) - def dumpd(self): + def resolve_wdir(self): rel_wdir = relpath(self.wdir, os.path.dirname(self.path)) + return ( + pathlib.PurePath(rel_wdir).as_posix() if rel_wdir != "." else None + ) - wdir = pathlib.PurePath(rel_wdir).as_posix() - wdir = wdir if wdir != "." else None - + def dumpd(self): return { key: value for key, value in { Stage.PARAM_MD5: self.md5, Stage.PARAM_CMD: self.cmd, - Stage.PARAM_WDIR: wdir, + Stage.PARAM_WDIR: self.resolve_wdir(), Stage.PARAM_LOCKED: self.locked, Stage.PARAM_DEPS: [d.dumpd() for d in self.deps], Stage.PARAM_OUTS: [o.dumpd() for o in self.outs], @@ -606,7 +628,11 @@ def run(self, dry=False, no_commit=False, force=False): ) ) if not dry: - if not force and self._already_cached(): + if ( + not force + and not self._changed_md5() + and self._already_cached() + ): self.outs[0].checkout() else: self.deps[0].download(self.outs[0]) @@ -709,13 +735,9 @@ def status(self, check_updates=False): return {} def _already_cached(self): - return ( - not self.changed_md5() - and all(not dep.changed() for dep in self.deps) - and all( - not out.changed_cache() if out.use_cache else not out.changed() - for out in self.outs - ) + return all(not dep.changed() for dep in self.deps) and all( + not out.changed_cache() if out.use_cache else not out.changed() + for out in self.outs ) def get_all_files_number(self, filter_info=None): @@ -732,3 +754,34 @@ def get_used_cache(self, *args, **kwargs): cache.update(out.get_used_cache(*args, **kwargs)) return cache + + +class PipelineStage(Stage): + def __init__(self, name=None, **kwargs): + super().__init__(**kwargs) + self.name = name + self.cmd_changed = False + + def __eq__(self, other): + return super().__eq__(other) and self.name == other.name + + def __hash__(self): + return hash((self.path_in_repo, self.name)) + + def __repr__(self): + return "Stage: '{path}:{name}'".format( + path=self.relpath if self.path else "No path", name=self.name + ) + + def _changed(self): + if self.cmd_changed: + logger.warning("'cmd' of {} has changed.".format(self)) + + return self.cmd_changed or self._changed_deps() or self._changed_outs() + + def reload(self): + return self.dvcfile.load_one(self.name) + + @property + def is_cached(self): + return self.dvcfile.has_stage(name=self.name) and super().is_cached diff --git a/dvc/stage/params.py b/dvc/stage/params.py new file mode 100644 index 0000000000..6ea8aa65f7 --- /dev/null +++ b/dvc/stage/params.py @@ -0,0 +1,21 @@ +from enum import Enum + + +class StageParams: + PARAM_MD5 = "md5" + PARAM_CMD = "cmd" + PARAM_WDIR = "wdir" + PARAM_DEPS = "deps" + PARAM_OUTS = "outs" + PARAM_LOCKED = "locked" + PARAM_META = "meta" + PARAM_ALWAYS_CHANGED = "always_changed" + + +class OutputParams(Enum): + PERSIST = "outs_persist" + PERSIST_NO_CACHE = "outs_persist_no_cache" + METRICS_NO_CACHE = "metrics_no_cache" + METRICS = "metrics" + NO_CACHE = "outs_no_cache" + OUTS = "outs" diff --git a/dvc/stage/schema.py b/dvc/stage/schema.py deleted file mode 100644 index dc811c447d..0000000000 --- a/dvc/stage/schema.py +++ /dev/null @@ -1,28 +0,0 @@ -from voluptuous import Any, Schema -from dvc import dependency -from dvc import output - - -class StageParams: - PARAM_MD5 = "md5" - PARAM_CMD = "cmd" - PARAM_WDIR = "wdir" - PARAM_DEPS = "deps" - PARAM_OUTS = "outs" - PARAM_LOCKED = "locked" - PARAM_META = "meta" - PARAM_ALWAYS_CHANGED = "always_changed" - - -SCHEMA = { - StageParams.PARAM_MD5: output.CHECKSUM_SCHEMA, - StageParams.PARAM_CMD: Any(str, None), - StageParams.PARAM_WDIR: Any(str, None), - StageParams.PARAM_DEPS: Any([dependency.SCHEMA], None), - StageParams.PARAM_OUTS: Any([output.SCHEMA], None), - StageParams.PARAM_LOCKED: bool, - StageParams.PARAM_META: object, - StageParams.PARAM_ALWAYS_CHANGED: bool, -} - -SINGLE_STAGE_SCHEMA = Schema(SCHEMA) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index 26fd62da66..e824cf095e 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -23,6 +23,7 @@ LOCAL_CHUNK_SIZE = 2 ** 20 # 1 MB LARGE_FILE_SIZE = 2 ** 30 # 1 GB LARGE_DIR_SIZE = 100 +TARGET_REGEX = re.compile(r"^(?P.*):(?P[^\\/@:]*)$") def dos2unix(data): @@ -374,3 +375,16 @@ def format_link(link): return "<{blue}{link}{nc}>".format( blue=colorama.Fore.CYAN, link=link, nc=colorama.Fore.RESET ) + + +def parse_target(target, default="Dvcfile"): + if not target: + return None, None + + match = TARGET_REGEX.match(target) + if not match: + return target, None + path, name = match.group("path"), match.group("name") + if not path: + logger.warning("Assuming file to be '%s'", default) + return path or default, name diff --git a/tests/func/test_dvcfile.py b/tests/func/test_dvcfile.py new file mode 100644 index 0000000000..e1160f946d --- /dev/null +++ b/tests/func/test_dvcfile.py @@ -0,0 +1,165 @@ +import pytest + +from dvc.dvcfile import Dvcfile +from dvc.exceptions import DvcException +from dvc.stage import Stage +from dvc.stage.exceptions import StageFileDoesNotExistError + + +def test_run_load_one_for_multistage(tmp_dir, dvc): + tmp_dir.gen("foo", "foo") + stage1 = dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + outs_persist_no_cache=["foo2"], + always_changed=True, + ) + stage2 = Dvcfile(dvc, "Dvcfile").load_one("copy-foo-foo2") + assert stage1 == stage2 + foo_out = stage2.outs[0] + assert stage2.cmd == "cp foo foo2" + assert stage2.name == "copy-foo-foo2" + assert foo_out.def_path == "foo2" + assert foo_out.persist + assert not foo_out.use_cache + assert stage2.deps[0].def_path == "foo" + assert dvc.reproduce(":copy-foo-foo2") + + +def test_run_load_one_for_multistage_non_existing(tmp_dir, dvc): + with pytest.raises(StageFileDoesNotExistError): + Dvcfile(dvc, "Dvcfile").load_one("copy-foo-foo2") + + +def test_run_load_one_for_multistage_non_existing_stage_name(tmp_dir, dvc): + tmp_dir.gen("foo", "foo") + stage = dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + metrics=["foo2"], + always_changed=True, + ) + with pytest.raises(DvcException): + # TODO: Better exception + Dvcfile(dvc, stage.path).load_one("random-name") + + +def test_run_load_one_on_single_stage(tmp_dir, dvc): + tmp_dir.gen("foo", "foo") + stage = dvc.run( + cmd="cp foo foo2", deps=["foo"], metrics=["foo2"], always_changed=True, + ) + Dvcfile(dvc, stage.path).load_one("random-name") + Dvcfile(dvc, stage.path).load_one() + + +def test_has_stage_with_name(tmp_dir, dvc): + tmp_dir.gen("foo", "foo") + dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + metrics=["foo2"], + always_changed=True, + ) + dvcfile = Dvcfile(dvc, "Dvcfile") + assert dvcfile.has_stage("copy-foo-foo2") + assert not dvcfile.has_stage("copy") + + +def test_load_all_multistage(tmp_dir, dvc): + tmp_dir.gen("foo", "foo") + stage1 = dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + metrics=["foo2"], + always_changed=True, + ) + stages = Dvcfile(dvc, "Dvcfile").load_all() + assert len(stages) == 1 + assert stages[0] == stage1 + + tmp_dir.gen("bar", "bar") + stage2 = dvc.run( + cmd="cp bar bar2", + deps=["bar"], + name="copy-bar-bar2", + metrics=["bar2"], + always_changed=True, + ) + assert set(Dvcfile(dvc, "Dvcfile").load_all()) == {stage2, stage1} + + +def test_load_all_singlestage(tmp_dir, dvc): + tmp_dir.gen("foo", "foo") + stage1 = dvc.run( + cmd="cp foo foo2", deps=["foo"], metrics=["foo2"], always_changed=True, + ) + stages = Dvcfile(dvc, "foo2.dvc").load_all() + assert len(stages) == 1 + assert stages == [stage1] + + +def test_load_singlestage(tmp_dir, dvc): + tmp_dir.gen("foo", "foo") + stage1 = dvc.run( + cmd="cp foo foo2", deps=["foo"], metrics=["foo2"], always_changed=True, + ) + assert Dvcfile(dvc, "foo2.dvc").load() == stage1 + + +def test_load_multistage(tmp_dir, dvc): + from dvc.dvcfile import MultiStageFileLoadError + + tmp_dir.gen("foo", "foo") + dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + metrics=["foo2"], + always_changed=True, + ) + with pytest.raises(MultiStageFileLoadError): + Dvcfile(dvc, "Dvcfile").load() + + +def test_is_multistage(tmp_dir, dvc): + tmp_dir.gen({"foo": "foo", "bar": "bar"}) + stage1 = dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + metrics=["foo2"], + always_changed=True, + ) + assert Dvcfile(dvc, stage1.path).is_multi_stage() + stage2 = dvc.run( + cmd="cp bar bar2", deps=["bar"], metrics=["bar2"], always_changed=True, + ) + assert not Dvcfile(dvc, stage2.path).is_multi_stage() + + +def test_stage_collection(tmp_dir, dvc): + tmp_dir.gen( + { + "dir": {"file1": "file1", "file2": "file2"}, + "foo": "foo", + "bar": "bar", + } + ) + (stage1,) = dvc.add("dir") + stage2 = dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + metrics=["foo2"], + always_changed=True, + ) + stage3 = dvc.run( + cmd="cp bar bar2", deps=["bar"], metrics=["bar2"], always_changed=True, + ) + assert {s for s in dvc.pipeline_stages} == {stage3, stage2, stage1} + assert {s for s in dvc.stages} == {Stage(dvc, "foo2.dvc"), stage1, stage3} diff --git a/tests/func/test_pipeline.py b/tests/func/test_pipeline.py index 51ee639fb8..4431a86f20 100644 --- a/tests/func/test_pipeline.py +++ b/tests/func/test_pipeline.py @@ -1,7 +1,7 @@ import logging from dvc.main import main -from dvc.command.pipeline import CmdPipelineShow +from dvc.command.pipeline import CmdPipelineShow, CmdPipelineList from tests.basic_env import TestDvc from tests.func.test_repro import TestRepro from tests.func.test_repro import TestReproChangedDeepData @@ -266,3 +266,46 @@ def test_split_pipeline(tmp_dir, scm, dvc): ("data_train", "data"), ("data_valid", "data"), } + + +def test_pipeline_list_show_multistage(tmp_dir, dvc, run_copy, caplog): + tmp_dir.gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + run_copy("bar", "foobar") + command = CmdPipelineShow([]) + + caplog.clear() + with caplog.at_level(logging.INFO, "dvc"): + command._show("foobar.dvc", False, False, False) + output = caplog.text.splitlines() + assert "Dvcfile:copy-foo-bar" in output[0] + assert "foobar.dvc" in output[1] + + caplog.clear() + with caplog.at_level(logging.INFO, "dvc"): + command._show("Dvcfile:copy-foo-bar", False, False, False) + assert "Dvcfile:copy-foo-bar" in caplog.text + assert "foobar.dvc" not in caplog.text + + command = CmdPipelineList([]) + caplog.clear() + with caplog.at_level(logging.INFO, "dvc"): + command.run() + assert "Dvcfile:copy-foo-bar" in caplog.text + assert "foobar.dvc" in caplog.text + assert "1 pipelines in total" + + +def test_pipeline_ascii_multistage(tmp_dir, dvc, run_copy): + tmp_dir.gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + run_copy("bar", "foobar") + command = CmdPipelineShow([]) + nodes, edges, is_tree = command._build_graph("foobar.dvc") + assert set(nodes) == {"Dvcfile:copy-foo-bar", "foobar.dvc"} + assert set(edges) == { + ("foobar.dvc", "Dvcfile:copy-foo-bar"), + } + + nodes, edges, is_tree = command._build_graph("Dvcfile:copy-foo-bar") + assert set(nodes) == {"Dvcfile:copy-foo-bar"} diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index bdaa0544ff..f00d071e14 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -50,7 +50,17 @@ from tests.utils.httpd import StaticFileServer, ContentMD5Handler -class TestRepro(TestDvc): +class SingleStageRun: + def _run(self, **kwargs): + kwargs.pop("name", None) + return self.dvc.run(**kwargs) + + @staticmethod + def _get_stage_target(stage): + return stage.path + + +class TestRepro(SingleStageRun, TestDvc): def setUp(self): super().setUp() @@ -61,11 +71,12 @@ def setUp(self): self.file1 = "file1" self.file1_stage = self.file1 + ".dvc" - self.dvc.run( + self.stage = self._run( fname=self.file1_stage, outs=[self.file1], deps=[self.FOO, self.CODE], cmd="python {} {} {}".format(self.CODE, self.FOO, self.file1), + name="run1", ) @@ -73,18 +84,24 @@ class TestReproFail(TestRepro): def test(self): os.unlink(self.CODE) - ret = main(["repro", self.file1_stage]) + ret = main(["repro", self._get_stage_target(self.stage)]) self.assertNotEqual(ret, 0) -class TestReproCyclicGraph(TestDvc): +class TestReproCyclicGraph(SingleStageRun, TestDvc): def test(self): - self.dvc.run( - deps=[self.FOO], outs=["bar.txt"], cmd="echo bar > bar.txt" + self._run( + deps=[self.FOO], + outs=["bar.txt"], + cmd="echo bar > bar.txt", + name="copybarbar.txt", ) - self.dvc.run( - deps=["bar.txt"], outs=["baz.txt"], cmd="echo baz > baz.txt" + self._run( + deps=["bar.txt"], + outs=["baz.txt"], + cmd="echo baz > baz.txt", + name="copybazbaz.txt", ) stage_dump = { @@ -174,7 +191,7 @@ def test_nested(self): # NOTE: os.walk() walks in a sorted order and we need dir2 subdirs to # be processed before dir1 to load error.dvc first. - self.dvc.stages = [ + self.dvc.pipeline_stages = [ nested_stage, Dvcfile(self.dvc, error_stage_path).load(), ] @@ -208,7 +225,10 @@ def test_similar_paths(self): self.fail("should not raise StagePathAsOutputError") -class TestReproDepUnderDir(TestDvc): +# TODO: Test ^ for multistage + + +class TestReproDepUnderDir(SingleStageRun, TestDvc): def test(self): stages = self.dvc.add(self.DATA_DIR) self.assertEqual(len(stages), 1) @@ -216,12 +236,12 @@ def test(self): self.assertTrue(self.dir_stage is not None) self.file1 = "file1" - self.file1_stage = self.file1 + ".dvc" - self.dvc.run( - fname=self.file1_stage, + stage = self._run( + fname=self.file1 + ".dvc", outs=[self.file1], deps=[self.DATA, self.CODE], cmd="python {} {} {}".format(self.CODE, self.DATA, self.file1), + name="copy-data-file1", ) self.assertTrue(filecmp.cmp(self.file1, self.DATA, shallow=False)) @@ -229,12 +249,12 @@ def test(self): os.unlink(self.DATA) shutil.copyfile(self.FOO, self.DATA) - stages = self.dvc.reproduce(self.file1_stage) + stages = self.dvc.reproduce(self._get_stage_target(stage)) self.assertEqual(len(stages), 2) self.assertTrue(filecmp.cmp(self.file1, self.FOO, shallow=False)) -class TestReproDepDirWithOutputsUnderIt(TestDvc): +class TestReproDepDirWithOutputsUnderIt(SingleStageRun, TestDvc): def test(self): stages = self.dvc.add(self.DATA) self.assertEqual(len(stages), 1) @@ -244,23 +264,26 @@ def test(self): self.assertEqual(len(stages), 1) self.assertTrue(stages[0] is not None) - stage = self.dvc.run(fname="Dvcfile", deps=[self.DATA, self.DATA_SUB]) + stage = self.dvc.run( + fname="dvcfile2.dvc", deps=[self.DATA, self.DATA_SUB] + ) self.assertTrue(stage is not None) file1 = "file1" file1_stage = file1 + ".dvc" - stage = self.dvc.run( + stage = self._run( fname=file1_stage, deps=[self.DATA_DIR], outs=[file1], cmd="python {} {} {}".format(self.CODE, self.DATA, file1), + name="copy-data-file1", ) self.assertTrue(stage is not None) os.unlink(self.DATA) shutil.copyfile(self.FOO, self.DATA) - stages = self.dvc.reproduce(file1_stage) + stages = self.dvc.reproduce(self._get_stage_target(stage)) self.assertEqual(len(stages), 2) @@ -275,17 +298,22 @@ def test(self): ) with open(code_file, "w+") as fd: fd.write(code) - self.dvc.run( - fname=stage_file, outs=[out], cmd="python {}".format(code_file) + stage = self._run( + fname=stage_file, + outs=[out], + cmd="python {}".format(code_file), + name="uuid", ) - stages = self.dvc.reproduce(stage_file) + stages = self.dvc.reproduce(self._get_stage_target(stage)) self.assertEqual(len(stages), 1) class TestReproForce(TestRepro): def test(self): - stages = self.dvc.reproduce(self.file1_stage, force=True) + stages = self.dvc.reproduce( + self._get_stage_target(self.stage), force=True + ) self.assertEqual(len(stages), 2) @@ -293,7 +321,7 @@ class TestReproChangedCode(TestRepro): def test(self): self.swap_code() - stages = self.dvc.reproduce(self.file1_stage) + stages = self.dvc.reproduce(self._get_stage_target(self.stage)) self.assertTrue(filecmp.cmp(self.file1, self.BAR, shallow=False)) self.assertEqual(len(stages), 1) @@ -311,7 +339,7 @@ class TestReproChangedData(TestRepro): def test(self): self.swap_foo_with_bar() - stages = self.dvc.reproduce(self.file1_stage) + stages = self.dvc.reproduce(self._get_stage_target(self.stage)) self.assertTrue(filecmp.cmp(self.file1, self.BAR, shallow=False)) self.assertEqual(len(stages), 2) @@ -325,19 +353,21 @@ class TestReproDry(TestReproChangedData): def test(self): self.swap_foo_with_bar() - stages = self.dvc.reproduce(self.file1_stage, dry=True) + stages = self.dvc.reproduce( + self._get_stage_target(self.stage), dry=True + ) self.assertTrue(len(stages), 2) self.assertFalse(filecmp.cmp(self.file1, self.BAR, shallow=False)) - ret = main(["repro", "--dry", self.file1_stage]) + ret = main(["repro", "--dry", self._get_stage_target(self.stage)]) self.assertEqual(ret, 0) self.assertFalse(filecmp.cmp(self.file1, self.BAR, shallow=False)) class TestReproUpToDate(TestRepro): def test(self): - ret = main(["repro", self.file1_stage]) + ret = main(["repro", self._get_stage_target(self.stage)]) self.assertEqual(ret, 0) @@ -383,18 +413,18 @@ def setUp(self): super().setUp() self.file2 = "file2" - self.file2_stage = self.file2 + ".dvc" - self.dvc.run( - fname=self.file2_stage, + self.stage = self._run( + fname=self.file2 + ".dvc", outs=[self.file2], deps=[self.file1, self.CODE], cmd="python {} {} {}".format(self.CODE, self.file1, self.file2), + name="copy-file-file2", ) def test(self): self.swap_foo_with_bar() - stages = self.dvc.reproduce(self.file2_stage) + stages = self.dvc.reproduce(self._get_stage_target(self.stage)) self.assertTrue(filecmp.cmp(self.file1, self.BAR, shallow=False)) self.assertTrue(filecmp.cmp(self.file2, self.BAR, shallow=False)) @@ -450,16 +480,18 @@ def test(self): class TestReproPipeline(TestReproChangedDeepData): def test(self): stages = self.dvc.reproduce( - self.file1_stage, force=True, pipeline=True + self._get_stage_target(self.stage), force=True, pipeline=True ) self.assertEqual(len(stages), 3) def test_cli(self): - ret = main(["repro", "--pipeline", "-f", self.file1_stage]) + ret = main( + ["repro", "--pipeline", "-f", self._get_stage_target(self.stage)] + ) self.assertEqual(ret, 0) -class TestReproPipelines(TestDvc): +class TestReproPipelines(SingleStageRun, TestDvc): def setUp(self): super().setUp() @@ -474,31 +506,27 @@ def setUp(self): self.assertTrue(self.bar_stage is not None) self.file1 = "file1" - self.file1_stage = self.file1 + ".dvc" - self.dvc.run( - fname=self.file1_stage, + self.file1_stage = self.dvc.run( + fname=self.file1 + ".dvc", outs=[self.file1], deps=[self.FOO, self.CODE], cmd="python {} {} {}".format(self.CODE, self.FOO, self.file1), ) self.file2 = "file2" - self.file2_stage = self.file2 + ".dvc" - self.dvc.run( - fname=self.file2_stage, + self.file2_stage = self._run( + fname=self.file2 + ".dvc", outs=[self.file2], deps=[self.BAR, self.CODE], cmd="python {} {} {}".format(self.CODE, self.BAR, self.file2), + name="copy-BAR-file2", ) def test(self): stages = self.dvc.reproduce(all_pipelines=True, force=True) self.assertEqual(len(stages), 4) - names = [stage.relpath for stage in stages] - self.assertTrue(self.foo_stage.relpath in names) - self.assertTrue(self.bar_stage.relpath in names) - self.assertTrue(self.file1_stage in names) - self.assertTrue(self.file2_stage in names) + self.assertTrue(self.file1_stage in stages) + self.assertTrue(self.file2_stage in stages) def test_cli(self): ret = main(["repro", "-f", "-P"]) @@ -508,24 +536,24 @@ def test_cli(self): class TestReproLocked(TestReproChangedData): def test(self): file2 = "file2" - file2_stage = file2 + ".dvc" - self.dvc.run( - fname=file2_stage, + file2_stage = self._run( + fname=file2 + ".dvc", outs=[file2], deps=[self.file1, self.CODE], cmd="python {} {} {}".format(self.CODE, self.file1, file2), + name="copy-file1-file2", ) self.swap_foo_with_bar() - ret = main(["lock", file2_stage]) + ret = main(["lock", self._get_stage_target(file2_stage)]) self.assertEqual(ret, 0) - stages = self.dvc.reproduce(file2_stage) + stages = self.dvc.reproduce(self._get_stage_target(file2_stage)) self.assertEqual(len(stages), 0) - ret = main(["unlock", file2_stage]) + ret = main(["unlock", self._get_stage_target(file2_stage)]) self.assertEqual(ret, 0) - stages = self.dvc.reproduce(file2_stage) + stages = self.dvc.reproduce(self._get_stage_target(file2_stage)) self.assertTrue(filecmp.cmp(self.file1, self.BAR, shallow=False)) self.assertTrue(filecmp.cmp(file2, self.BAR, shallow=False)) self.assertEqual(len(stages), 3) @@ -538,29 +566,29 @@ def test_non_existing(self): self.assertNotEqual(ret, 0) -class TestReproLockedCallback(TestDvc): +class TestReproLockedCallback(SingleStageRun, TestDvc): def test(self): file1 = "file1" file1_stage = file1 + ".dvc" # NOTE: purposefully not specifying dependencies # to create a callback stage. - stage = self.dvc.run( + stage = self._run( fname=file1_stage, outs=[file1], cmd="python {} {} {}".format(self.CODE, self.FOO, file1), + name="copy-FOO-file1", ) self.assertTrue(stage is not None) - self.assertEqual(stage.relpath, file1_stage) - stages = self.dvc.reproduce(file1_stage) + stages = self.dvc.reproduce(self._get_stage_target(stage)) self.assertEqual(len(stages), 1) - self.dvc.lock_stage(file1_stage) - stages = self.dvc.reproduce(file1_stage) + self.dvc.lock_stage(self._get_stage_target(stage)) + stages = self.dvc.reproduce(self._get_stage_target(stage)) self.assertEqual(len(stages), 0) - self.dvc.lock_stage(file1_stage, unlock=True) - stages = self.dvc.reproduce(file1_stage) + self.dvc.lock_stage(self._get_stage_target(stage), unlock=True) + stages = self.dvc.reproduce(self._get_stage_target(stage)) self.assertEqual(len(stages), 1) @@ -569,12 +597,13 @@ def test(self): """ Check that locking/unlocking doesn't affect stage state """ - self.dvc.lock_stage(self.file1_stage) - stages = self.dvc.reproduce(self.file1_stage) + target = self._get_stage_target(self.stage) + self.dvc.lock_stage(target) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 0) - self.dvc.lock_stage(self.file1_stage, unlock=True) - stages = self.dvc.reproduce(self.file1_stage) + self.dvc.lock_stage(target, unlock=True) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 0) @@ -610,11 +639,13 @@ def test(self): class TestReproPhony(TestReproChangedData): def test(self): - stage = self.dvc.run(deps=[self.file1]) + stage = self._run( + cmd="cat " + self.file1, deps=[self.file1], name="no_cmd?" + ) self.swap_foo_with_bar() - self.dvc.reproduce(stage.path) + self.dvc.reproduce(self._get_stage_target(stage)) self.assertTrue(filecmp.cmp(self.file1, self.BAR, shallow=False)) @@ -624,7 +655,7 @@ def test(self): os.unlink(self.FOO) with self.assertRaises(ReproductionError): - self.dvc.reproduce(self.file1_stage) + self.dvc.reproduce(self._get_stage_target(self.stage)) class TestReproDataSource(TestReproChangedData): @@ -637,12 +668,11 @@ def test(self): self.assertEqual(stages[0].outs[0].checksum, file_md5(self.BAR)[0]) -class TestReproChangedDir(TestDvc): +class TestReproChangedDir(SingleStageRun, TestDvc): def test(self): file_name = "file" shutil.copyfile(self.FOO, file_name) - stage_name = "dir.dvc" dir_name = "dir" dir_code = "dir.py" code = ( @@ -653,24 +683,25 @@ def test(self): with open(dir_code, "w+") as fd: fd.write(code.format(dir_name, file_name, dir_name, file_name)) - self.dvc.run( - fname=stage_name, + stage = self._run( outs=[dir_name], deps=[file_name, dir_code], cmd="python {}".format(dir_code), + name="copy-in-dir", ) + target = self._get_stage_target(stage) - stages = self.dvc.reproduce(stage_name) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 0) os.unlink(file_name) shutil.copyfile(self.BAR, file_name) - stages = self.dvc.reproduce(stage_name) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 1) -class TestReproChangedDirData(TestDvc): +class TestReproChangedDirData(SingleStageRun, TestDvc): def test(self): dir_name = "dir" dir_code = "dir_code.py" @@ -681,32 +712,35 @@ def test(self): "shutil.copytree(sys.argv[1], sys.argv[2])" ) - stage = self.dvc.run( + stage = self._run( outs=[dir_name], deps=[self.DATA_DIR, dir_code], cmd="python {} {} {}".format(dir_code, self.DATA_DIR, dir_name), + name="copy-dir", ) + target = self._get_stage_target(stage) + self.assertTrue(stage is not None) - stages = self.dvc.reproduce(stage.path) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 0) with open(self.DATA_SUB, "a") as fd: fd.write("add") - stages = self.dvc.reproduce(stage.path) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 1) self.assertTrue(stages[0] is not None) # Check that dvc indeed registers changed output dir shutil.move(self.BAR, dir_name) - stages = self.dvc.reproduce(stage.path) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 1) self.assertTrue(stages[0] is not None) # Check that dvc registers mtime change for the directory. System.hardlink(self.DATA_SUB, self.DATA_SUB + ".lnk") - stages = self.dvc.reproduce(stage.path) + stages = self.dvc.reproduce(target) self.assertEqual(len(stages), 1) self.assertTrue(stages[0] is not None) @@ -729,7 +763,7 @@ def test(self): ret = main(["status"]) self.assertEqual(ret, 0) - ret = main(["repro", self.file1_stage]) + ret = main(["repro", self._get_stage_target(self.stage)]) self.assertEqual(ret, 0) ret = main(["repro", "non-existing-file"]) @@ -1206,33 +1240,41 @@ def test(self): self.assertEqual(os.getenv("SHELL"), fd.read().strip()) -class TestReproAllPipelines(TestDvc): +class TestReproAllPipelines(SingleStageRun, TestDvc): def test(self): stages = [ - self.dvc.run( + self._run( fname="start.dvc", outs=["start.txt"], cmd="echo start > start.txt", + name="start", ), - self.dvc.run( + self._run( fname="middle.dvc", deps=["start.txt"], outs=["middle.txt"], cmd="echo middle > middle.txt", + name="middle", ), - self.dvc.run( + self._run( fname="final.dvc", deps=["middle.txt"], outs=["final.txt"], cmd="echo final > final.txt", + name="final", ), - self.dvc.run( + self._run( fname="disconnected.dvc", outs=["disconnected.txt"], cmd="echo other > disconnected.txt", + name="disconnected", ), ] + from dvc.state import StateNoop + + self.dvc.state = StateNoop() + with patch.object( Stage, "reproduce", side_effect=stages ) as mock_reproduce: @@ -1244,21 +1286,26 @@ def test(self): class TestReproNoCommit(TestRepro): def test(self): remove(self.dvc.cache.local.cache_dir) - ret = main(["repro", self.file1_stage, "--no-commit"]) + ret = main( + ["repro", self._get_stage_target(self.stage), "--no-commit"] + ) self.assertEqual(ret, 0) self.assertFalse(os.path.exists(self.dvc.cache.local.cache_dir)) class TestReproAlreadyCached(TestRepro): def test(self): - run_out = self.dvc.run( + stage = self._run( fname="datetime.dvc", deps=[], outs=["datetime.txt"], cmd='python -c "import time; print(time.time())" > datetime.txt', - ).outs[0] - - repro_out = self.dvc.reproduce(target="datetime.dvc")[0].outs[0] + name="datetime", + ) + run_out = stage.outs[0] + repro_out = self.dvc.reproduce(self._get_stage_target(stage))[0].outs[ + 0 + ] self.assertNotEqual(run_out.checksum, repro_out.checksum) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py new file mode 100644 index 0000000000..86852a9e69 --- /dev/null +++ b/tests/func/test_repro_multistage.py @@ -0,0 +1,412 @@ +import os +from textwrap import dedent + +import pytest + +from dvc.stage import PipelineStage +from tests.func import test_repro + +from dvc.main import main + + +COPY_SCRIPT_FORMAT = dedent( + """\ + import sys + import shutil + shutil.copyfile({}, {}) +""" +) +COPY_SCRIPT = COPY_SCRIPT_FORMAT.format("sys.argv[1]", "sys.argv[2]") + + +class MultiStageRun: + def _run(self, **kwargs): + assert kwargs.get("name") + kwargs.pop("fname", None) + # ignore fname for now + return self.dvc.run(**kwargs) + + @staticmethod + def _get_stage_target(stage): + return stage.path + ":" + stage.name + + +class TestReproFailMultiStage(MultiStageRun, test_repro.TestReproFail): + pass + + +class TestReproCyclicGraphMultiStage( + MultiStageRun, test_repro.TestReproCyclicGraph +): + # TODO: Also test with new-style forced dump + pass + + +class TestReproUnderDirMultiStage( + MultiStageRun, test_repro.TestReproDepUnderDir +): + pass + + +class TestReproDepDirWithOutputsUnderItMultiStage( + MultiStageRun, test_repro.TestReproDepDirWithOutputsUnderIt +): + pass + + +class TestReproNoDepsMultiStage(MultiStageRun, test_repro.TestReproNoDeps): + pass + + +class TestReproForceMultiStage(MultiStageRun, test_repro.TestReproForce): + pass + + +class TestReproChangedCodeMultiStage( + MultiStageRun, test_repro.TestReproChangedCode +): + pass + + +class TestReproChangedDataMultiStage( + MultiStageRun, test_repro.TestReproChangedData +): + pass + + +class TestReproDry(MultiStageRun, test_repro.TestReproDry): + pass + + +class TestReproUpToDateMultiStage(MultiStageRun, test_repro.TestReproUpToDate): + pass + + +class TestReproChangedDeepDataMultiStage( + MultiStageRun, test_repro.TestReproChangedDeepData +): + pass + + +class TestReproPipelineMultiStage(MultiStageRun, test_repro.TestReproPipeline): + pass + + +class TestReproPipelinesMultiStage( + MultiStageRun, test_repro.TestReproPipelines +): + pass + + +class TestReproLockedMultiStage(MultiStageRun, test_repro.TestReproLocked): + pass + + +class TestReproLockedCallbackMultiStage( + MultiStageRun, test_repro.TestReproLockedCallback +): + pass + + +class TestReproLockedUnchangedMultiStage( + MultiStageRun, test_repro.TestReproLockedUnchanged +): + pass + + +class TestReproPhonyMultiStage(MultiStageRun, test_repro.TestReproPhony): + pass + + +class TestCmdReproMultiStage(MultiStageRun, test_repro.TestCmdRepro): + pass + + +class TestReproAllPipelinesMultiStage( + MultiStageRun, test_repro.TestReproAllPipelines +): + pass + + +class TestReproNoCommit(MultiStageRun, test_repro.TestReproNoCommit): + pass + + +class TestNonExistingOutputMultiStage( + MultiStageRun, test_repro.TestNonExistingOutput +): + pass + + +class TestReproAlreadyCachedMultiStage( + MultiStageRun, test_repro.TestReproAlreadyCached +): + pass + + +class TestReproChangedDirMultiStage( + MultiStageRun, test_repro.TestReproChangedDir +): + pass + + +class TestReproChangedDirDataMultiStage( + MultiStageRun, test_repro.TestReproChangedDirData +): + pass + + +def test_non_existing_stage_name(tmp_dir, dvc, run_copy): + from dvc.exceptions import DvcException + + tmp_dir.gen("file1", "file1") + run_copy("file1", "file2", name="copy-file1-file2") + + with pytest.raises(DvcException): + dvc.lock_stage(":copy-file1-file3") + + assert main(["lock", ":copy-file1-file3"]) != 0 + + +# TODO: TestReproWorkingDirectoryAsOutput + + +def test_downstream(tmp_dir, dvc): + # The dependency graph should look like this: + # + # E + # / \ + # D F + # / \ \ + # B C G + # \ / + # A + # + assert main(["run", "-n", "A-gen", "-o", "A", "echo A>A"]) == 0 + assert main(["run", "-n", "B-gen", "-d", "A", "-o", "B", "echo B>B"]) == 0 + assert main(["run", "-d", "A", "-o", "C", "echo C>C"]) == 0 + assert ( + main( + ["run", "-n", "D-gen", "-d", "B", "-d", "C", "-o", "D", "echo D>D"] + ) + == 0 + ) + assert main(["run", "-o", "G", "echo G>G"]) == 0 + assert main(["run", "-n", "F-gen", "-d", "G", "-o", "F", "echo F>F"]) == 0 + assert main(["run", "-d", "D", "-d", "F", "-o", "E", "echo E>E"]) == 0 + + # We want the evaluation to move from B to E + # + # E + # / + # D + # / + # B + # + evaluation = dvc.reproduce("Dvcfile:B-gen", downstream=True, force=True) + + assert len(evaluation) == 3 + assert ( + isinstance(evaluation[0], PipelineStage) + and evaluation[0].relpath == "Dvcfile" + and evaluation[0].name == "B-gen" + ) + assert ( + isinstance(evaluation[1], PipelineStage) + and evaluation[1].relpath == "Dvcfile" + and evaluation[1].name == "D-gen" + ) + assert ( + not isinstance(evaluation[2], PipelineStage) + and evaluation[2].relpath == "E.dvc" + ) + + +def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy): + from dvc.dvcfile import Dvcfile + + tmp_dir.gen("foo", "foo") + stage = run_copy("foo", "bar", fname="copy-process.dvc", name="copy-file") + target = "copy-process.dvc:copy-file" + assert not dvc.reproduce(target) + + stage.cmd = " ".join(stage.cmd.split()) # change cmd spacing by two + Dvcfile(dvc, "copy-process.dvc").dump_multistage_dvcfile(stage) + + assert dvc.reproduce(target)[0] == stage + + +def test_repro_when_new_deps_is_added_in_dvcfile(tmp_dir, dvc, run_copy): + from dvc.dvcfile import Dvcfile + from dvc.utils.stage import dump_stage_file + + tmp_dir.gen("copy.py", COPY_SCRIPT) + tmp_dir.gen({"foo": "foo", "bar": "bar"}) + stage = dvc.run( + cmd="python copy.py {} {}".format("foo", "foobar"), + outs=["foobar"], + deps=["foo"], + fname="copy-process.dvc", + name="copy-file", + ) + target = "copy-process.dvc:copy-file" + assert not dvc.reproduce(target) + + dvcfile = Dvcfile(dvc, stage.path) + data, _ = dvcfile._load() + data["stages"]["copy-file"]["deps"] += ["copy.py"] + dump_stage_file(stage.path, data) + + assert dvc.reproduce(target)[0] == stage + + +def test_repro_when_new_outs_is_added_in_dvcfile(tmp_dir, dvc): + from dvc.dvcfile import Dvcfile + from dvc.utils.stage import dump_stage_file + + tmp_dir.gen("copy.py", COPY_SCRIPT) + tmp_dir.gen({"foo": "foo", "bar": "bar"}) + stage = dvc.run( + cmd="python copy.py {} {}".format("foo", "foobar"), + outs=[], # scenario where user forgot to add + deps=["foo"], + fname="copy-process.dvc", + name="copy-file", + ) + target = "copy-process.dvc:copy-file" + assert not dvc.reproduce(target) + + dvcfile = Dvcfile(dvc, stage.path) + data, _ = dvcfile._load() + data["stages"]["copy-file"]["outs"] = ["foobar"] + dump_stage_file(stage.path, data) + + assert dvc.reproduce(target)[0] == stage + + +def test_repro_when_new_deps_is_moved(tmp_dir, dvc): + from dvc.dvcfile import Dvcfile + from dvc.utils.stage import dump_stage_file + + tmp_dir.gen("copy.py", COPY_SCRIPT) + tmp_dir.gen({"foo": "foo", "bar": "foo"}) + stage = dvc.run( + cmd="python copy.py {} {}".format("foo", "foobar"), + outs=["foobar"], + deps=["foo"], + fname="copy-process.dvc", + name="copy-file", + ) + target = "copy-process.dvc:copy-file" + assert not dvc.reproduce(target) + + tmp_dir.gen("copy.py", COPY_SCRIPT_FORMAT.format("'bar'", "'foobar'")) + from shutil import move + + move("foo", "bar") + + dvcfile = Dvcfile(dvc, stage.path) + data, _ = dvcfile._load() + data["stages"]["copy-file"]["deps"] = ["bar"] + dump_stage_file(stage.path, data) + + assert dvc.reproduce(target)[0] == stage + + +def test_repro_when_new_out_overlaps_others_stage_outs(tmp_dir, dvc): + from dvc.utils.stage import dump_stage_file + from dvc.exceptions import OverlappingOutputPathsError + + tmp_dir.gen({"dir": {"file1": "file1"}, "foo": "foo"}) + dvc.add("dir") + dump_stage_file( + "Dvcfile", + { + "stages": { + "run-copy": { + "cmd": "python copy {} {}".format("foo", "dir/foo"), + "deps": ["foo"], + "outs": ["dir/foo"], + } + } + }, + ) + with pytest.raises(OverlappingOutputPathsError): + dvc.reproduce(":run-copy") + + +def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc): + from dvc.utils.stage import dump_stage_file + from dvc.exceptions import ReproductionError + + tmp_dir.gen("copy.py", COPY_SCRIPT) + tmp_dir.gen("foo", "foo") + dump_stage_file( + "Dvcfile", + { + "stages": { + "run-copy": { + "cmd": "python copy.py {} {}".format("foo", "foobar"), + "deps": ["foo", "bar"], + "outs": ["foobar"], + } + } + }, + ) + with pytest.raises(ReproductionError): + dvc.reproduce(":run-copy") + + +def test_repro_when_new_outs_added_does_not_exist(tmp_dir, dvc): + from dvc.utils.stage import dump_stage_file + from dvc.exceptions import ReproductionError + + tmp_dir.gen("copy.py", COPY_SCRIPT) + tmp_dir.gen("foo", "foo") + dump_stage_file( + "Dvcfile", + { + "stages": { + "run-copy": { + "cmd": "python copy {} {}".format("foo", "foobar"), + "deps": ["foo"], + "outs": ["foobar", "bar"], + } + } + }, + ) + with pytest.raises(ReproductionError): + dvc.reproduce(":run-copy") + + +def test_repro_when_lockfile_gets_deleted(tmp_dir, dvc): + from dvc.utils.stage import dump_stage_file + + tmp_dir.gen("copy.py", COPY_SCRIPT) + tmp_dir.gen("foo", "foo") + dump_stage_file( + "Dvcfile", + { + "stages": { + "run-copy": { + "cmd": "python copy.py {} {}".format("foo", "foobar"), + "deps": ["foo"], + "outs": ["foobar"], + } + } + }, + ) + assert dvc.reproduce(":run-copy") + assert os.path.exists("Dvcfile.lock") + assert os.path.exists("foobar.dvc") + + assert not dvc.reproduce(":run-copy") + os.unlink("Dvcfile.lock") + stages = dvc.reproduce(":run-copy") + assert ( + stages + and stages[0].relpath == "Dvcfile" + and stages[0].name == "run-copy" + ) + + assert os.path.exists("foobar.dvc") diff --git a/tests/func/test_run_multistage.py b/tests/func/test_run_multistage.py new file mode 100644 index 0000000000..574efc7b67 --- /dev/null +++ b/tests/func/test_run_multistage.py @@ -0,0 +1,180 @@ +import pytest +import os + + +def test_run_with_name(tmp_dir, dvc, run_copy): + from dvc.stage import PipelineStage + from dvc.dvcfile import DVC_FILE, DVC_FILE_SUFFIX + + tmp_dir.dvc_gen("foo", "foo") + assert not os.path.exists(DVC_FILE) + stage = run_copy("foo", "bar", name="copy-foo-to-bar") + assert isinstance(stage, PipelineStage) + assert stage.name == "copy-foo-to-bar" + assert os.path.exists(DVC_FILE) + assert os.path.exists(DVC_FILE + ".lock") + assert os.path.exists("foo" + DVC_FILE_SUFFIX) + + +def test_run_with_multistage_and_single_stage(tmp_dir, dvc, run_copy): + from dvc.stage import PipelineStage, Stage + + tmp_dir.dvc_gen("foo", "foo") + stage1 = run_copy("foo", "foo1") + stage2 = run_copy("foo1", "foo2", name="copy-foo1-foo2") + stage3 = run_copy("foo2", "foo3") + + assert isinstance(stage2, PipelineStage) + assert isinstance(stage1, Stage) + assert isinstance(stage3, Stage) + assert stage2.name == "copy-foo1-foo2" + + +def test_run_multi_stage_repeat(tmp_dir, dvc, run_copy): + from dvc.stage import PipelineStage + from dvc.dvcfile import Dvcfile, DVC_FILE, DVC_FILE_SUFFIX + + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "foo1", name="copy-foo-foo1") + run_copy("foo1", "foo2", name="copy-foo1-foo2") + run_copy("foo2", "foo3") + + stages = Dvcfile(dvc, DVC_FILE).load_multi() + assert len(stages) == 2 + assert all(isinstance(stage, PipelineStage) for stage in stages) + assert set(stage.name for stage in stages) == { + "copy-foo-foo1", + "copy-foo1-foo2", + } + assert all( + os.path.exists(file + DVC_FILE_SUFFIX) + for file in ["foo1", "foo2", "foo3"] + ) + + +def test_multi_stage_try_writing_on_single_stage_file(tmp_dir, dvc, run_copy): + from dvc.exceptions import DvcException + from dvc.dvcfile import MultiStageFileLoadError + + tmp_dir.dvc_gen("foo") + dvc.run(cmd="echo foo", deps=["foo"]) + + with pytest.raises(DvcException): + run_copy("foo", "foo2", name="copy-foo1-foo2") + + run_copy("foo", "foo2", name="copy-foo1-foo2", fname="DIFFERENT-FILE.dvc") + + with pytest.raises(MultiStageFileLoadError): + run_copy("foo2", "foo3", fname="DIFFERENT-FILE.dvc") + + +def test_multi_stage_run_cached(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + + run_copy("foo", "foo2", name="copy-foo1-foo2") + stage2 = run_copy("foo", "foo2", name="copy-foo1-foo2") + + assert stage2 is None + + +def test_multistage_dump_on_non_cached_outputs(tmp_dir, dvc): + from dvc.dvcfile import DVC_FILE_SUFFIX + + tmp_dir.dvc_gen("foo") + dvc.run( + cmd="cp foo foo1", + deps=["foo"], + name="copy-foo1-foo2", + outs_no_cache=["foo1"], + ) + assert not os.path.exists("foo1" + DVC_FILE_SUFFIX) + + +def test_multistage_with_wdir(tmp_dir, dvc): + from dvc.dvcfile import Dvcfile + + tmp_dir.gen({"dir": {"foo": "foo", "bar": "bar"}}) + stage = dvc.run( + cmd="cp foo foo1", + deps=["foo"], + name="copy-foo1-foo2", + outs=["foo1"], + wdir="dir", + ) + + data, _ = Dvcfile(dvc, stage.path)._load() + assert "dir" == data["stages"]["copy-foo1-foo2"]["wdir"] + + +def test_multistage_always_changed(tmp_dir, dvc): + from dvc.dvcfile import Dvcfile + + tmp_dir.gen({"foo": "foo", "bar": "bar"}) + stage = dvc.run( + cmd="cp foo foo1", + deps=["foo"], + name="copy-foo1-foo2", + outs=["foo1"], + always_changed=True, + ) + + data, _ = Dvcfile(dvc, stage.path)._load() + assert data["stages"]["copy-foo1-foo2"]["always_changed"] + + +def test_graph(tmp_dir, dvc): + from dvc.exceptions import CyclicGraphError + + tmp_dir.gen({"foo": "foo", "bar": "bar"}) + + dvc.run(deps=["foo"], outs=["bar"], cmd="echo foo > bar", name="1") + + dvc.run(deps=["bar"], outs=["baz"], cmd="echo bar > baz", name="2") + + with pytest.raises(CyclicGraphError): + dvc.run(deps=["baz"], outs=["foo"], cmd="echo baz > foo", name="3") + + +def test_run_dump_on_multistage(tmp_dir, dvc): + from dvc.dvcfile import Dvcfile + + tmp_dir.gen({"dir": {"foo": "foo", "bar": "bar"}}) + dvc.run( + cmd="cp foo foo1", + deps=["foo"], + name="copy-foo-foo1", + outs=["foo1"], + wdir="dir", + ) + data, _ = Dvcfile(dvc, "Dvcfile")._load() + assert data == { + "stages": { + "copy-foo-foo1": { + "cmd": "cp foo foo1", + "wdir": "dir", + "deps": ["foo"], + "outs": ["foo1"], + } + } + } + + dvc.run( + cmd="cp foo foo2", + deps=["foo"], + name="copy-foo-foo2", + wdir="dir", + outs_persist=["foo2"], + always_changed=True, + ) + assert Dvcfile(dvc, "Dvcfile")._load()[0] == { + "stages": { + "copy-foo-foo2": { + "cmd": "cp foo foo2", + "deps": ["foo"], + "outs_persist": ["foo2"], + "always_changed": True, + "wdir": "dir", + }, + **data["stages"], + } + } diff --git a/tests/func/test_run.py b/tests/func/test_run_single_stage.py similarity index 100% rename from tests/func/test_run.py rename to tests/func/test_run_single_stage.py diff --git a/tests/unit/command/test_run.py b/tests/unit/command/test_run.py index 02fcab48ea..fef8e8be30 100644 --- a/tests/unit/command/test_run.py +++ b/tests/unit/command/test_run.py @@ -6,6 +6,8 @@ def test_run(mocker, dvc): cli_args = parse_args( [ "run", + "--name", + "nam", "--deps", "deps", "--outs", @@ -61,6 +63,7 @@ def test_run(mocker, dvc): no_commit=True, always_changed=True, cmd="command", + name="nam", ) @@ -86,6 +89,7 @@ def test_run_args_from_cli(mocker, dvc): no_commit=False, always_changed=False, cmd="echo foo", + name=None, ) @@ -111,4 +115,5 @@ def test_run_args_with_spaces(mocker, dvc): no_commit=False, always_changed=False, cmd='echo "foo bar"', + name=None, ) diff --git a/tests/unit/test_lockfile.py b/tests/unit/test_lockfile.py new file mode 100644 index 0000000000..58f2f5c47d --- /dev/null +++ b/tests/unit/test_lockfile.py @@ -0,0 +1,78 @@ +from dvc.stage import PipelineStage +from dvc import lockfile +import json +import pytest + + +def test_stage_dump_no_outs_deps(tmp_dir, dvc): + stage = PipelineStage(name="s1", repo=dvc, path="path", cmd="command") + + lockfile.dump(dvc, "path.lock", stage) + assert lockfile.load(dvc, "path.lock") == { + "s1": {"cmd": "command", "deps": {}, "outs": {}} + } + + +def test_stage_dump_when_already_exists(tmp_dir, dvc): + data = {"s1": {"cmd": "command", "deps": {}, "outs": {}}} + with open("path.lock", "w+") as f: + json.dump(data, f) + stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command2") + + lockfile.dump(dvc, "path.lock", stage) + assert lockfile.load(dvc, "path.lock") == { + **data, + "s2": {"cmd": "command2", "deps": {}, "outs": {}}, + } + + +def test_stage_dump_with_deps_and_outs(tmp_dir, dvc): + data = { + "s1": { + "cmd": "command", + "deps": {"1.txt": "checksum"}, + "outs": {"2.txt": "checksum"}, + } + } + with open("path.lock", "w+") as f: + json.dump(data, f) + + stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command2") + lockfile.dump(dvc, "path.lock", stage) + assert lockfile.load(dvc, "path.lock") == { + **data, + "s2": {"cmd": "command2", "deps": {}, "outs": {}}, + } + + +def test_stage_overwrites_if_already_exists(tmp_dir, dvc): + stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command2") + lockfile.dump(dvc, "path.lock", stage) + stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command3") + lockfile.dump(dvc, "path.lock", stage) + assert lockfile.load(dvc, "path.lock") == { + "s2": {"cmd": "command3", "deps": {}, "outs": {}}, + } + + +def test_load_when_lockfile_does_not_exist(tmp_dir, dvc): + assert {} == lockfile.load(dvc, "dvcfile.lock") + + +@pytest.mark.parametrize( + "corrupt_data", + [ + {"s1": {"cmd": "command", "outs": {}}}, + {"s1": {"outs": {}}}, + {"s1": {"cmd": "command", "deps": {}}}, + {"s1": {}}, + {"s1": {"cmd": "command", "outs": {"file": "checksum"}}}, + {"s1": {"cmd": "command", "deps": {"file": "checksum"}}}, + ], +) +def test_load_when_lockfile_is_corrupted(tmp_dir, dvc, corrupt_data): + with open("Dvcfile.lock", "w+") as f: + json.dump(corrupt_data, f) + with pytest.raises(lockfile.LockfileCorruptedError) as exc_info: + lockfile.load(dvc, "Dvcfile.lock") + assert "Dvcfile.lock" in str(exc_info.value)