From 059e16179b158abe7ce9b30677d061c0722c109d Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Fri, 10 Apr 2020 14:29:04 +0545 Subject: [PATCH 1/4] refactor stage into dvcfile --- dvc/command/base.py | 6 +- dvc/command/pipeline.py | 8 +- dvc/dvcfile.py | 202 ++++++++++++ dvc/output/base.py | 4 +- dvc/repo/__init__.py | 16 +- dvc/repo/add.py | 10 +- dvc/repo/checkout.py | 8 +- dvc/repo/commit.py | 4 +- dvc/repo/get.py | 4 +- dvc/repo/imp_url.py | 9 +- dvc/repo/lock.py | 7 +- dvc/repo/metrics/modify.py | 4 +- dvc/repo/move.py | 5 +- dvc/repo/remove.py | 4 +- dvc/repo/reproduce.py | 9 +- dvc/repo/run.py | 6 +- dvc/repo/tag/add.py | 4 +- dvc/repo/tag/remove.py | 4 +- dvc/repo/update.py | 8 +- dvc/{stage.py => stage/__init__.py} | 471 +++++----------------------- dvc/stage/decorators.py | 50 +++ dvc/stage/exceptions.py | 94 ++++++ dvc/stage/schema.py | 28 ++ tests/func/test_add.py | 11 +- tests/func/test_checkout.py | 11 +- tests/func/test_commit.py | 2 +- tests/func/test_import.py | 9 +- tests/func/test_move.py | 18 +- tests/func/test_remove.py | 2 +- tests/func/test_repro.py | 68 ++-- tests/func/test_run.py | 33 +- tests/func/test_stage.py | 102 +++--- tests/func/test_update.py | 6 +- tests/unit/test_stage.py | 3 +- 34 files changed, 661 insertions(+), 569 deletions(-) create mode 100644 dvc/dvcfile.py rename dvc/{stage.py => stage/__init__.py} (64%) create mode 100644 dvc/stage/decorators.py create mode 100644 dvc/stage/exceptions.py create mode 100644 dvc/stage/schema.py diff --git a/dvc/command/base.py b/dvc/command/base.py index c4e1cbf1fa..30f8700c7a 100644 --- a/dvc/command/base.py +++ b/dvc/command/base.py @@ -42,11 +42,11 @@ def __init__(self, args): @property def default_targets(self): """Default targets for `dvc repro` and `dvc pipeline`.""" - from dvc.stage import Stage + from dvc.dvcfile import DVC_FILE - msg = "assuming default target '{}'.".format(Stage.STAGE_FILE) + msg = "assuming default target '{}'.".format(DVC_FILE) logger.warning(msg) - return [Stage.STAGE_FILE] + return [DVC_FILE] # Abstract methods that have to be implemented by any inheritance class def run(self): diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index a265f10e11..e8a3fd301f 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -11,9 +11,9 @@ class CmdPipelineShow(CmdBase): def _show(self, target, commands, outs, locked): import networkx - from dvc.stage import Stage + from dvc.dvcfile import Dvcfile - stage = Stage.load(self.repo, target) + stage = Dvcfile(self.repo, target).load() G = self.repo.graph stages = networkx.dfs_postorder_nodes(G, stage) @@ -33,10 +33,10 @@ def _show(self, target, commands, outs, locked): def _build_graph(self, target, commands, outs): import networkx - from dvc.stage import Stage + from dvc.dvcfile import Dvcfile from dvc.repo.graph import get_pipeline - target_stage = Stage.load(self.repo, target) + target_stage = Dvcfile(self.repo, target).load() G = get_pipeline(self.repo.pipelines, target_stage) nodes = set() diff --git a/dvc/dvcfile.py b/dvc/dvcfile.py new file mode 100644 index 0000000000..b39deb9259 --- /dev/null +++ b/dvc/dvcfile.py @@ -0,0 +1,202 @@ +import os +import re +import logging + +import dvc.prompt as prompt + +from voluptuous import MultipleInvalid + +from dvc import dependency, output +from dvc.stage.exceptions import ( + StageFileBadNameError, + StageFileDoesNotExistError, + StageFileIsNotDvcFileError, + StageFileFormatError, + StageFileAlreadyExistsError, +) +from dvc.utils import relpath +from dvc.utils.collections import apply_diff +from dvc.utils.stage import ( + parse_stage_for_update, + dump_stage_file, + parse_stage, +) + +logger = logging.getLogger(__name__) + +DVC_FILE = "Dvcfile" +DVC_FILE_SUFFIX = ".dvc" +TAG_REGEX = r"^(?P.*)@(?P[^\\/@:]*)$" + + +class Dvcfile: + def __init__(self, repo, path): + self.repo = repo + self.path, self.tag = self._get_path_tag(path) + + def __repr__(self): + return "{}: {}".format(DVC_FILE, self.path) + + @classmethod + def is_valid_filename(cls, path): + return ( + path.endswith(DVC_FILE_SUFFIX) + or os.path.basename(path) == DVC_FILE + ) + + @classmethod + def is_stage_file(cls, path): + return os.path.isfile(path) and cls.is_valid_filename(path) + + @classmethod + def check_dvc_filename(cls, path): + if not cls.is_valid_filename(path): + raise StageFileBadNameError( + "bad DVC-file name '{}'. DVC-files should be named " + "'Dvcfile' or have a '.dvc' suffix (e.g. '{}.dvc').".format( + relpath(path), os.path.basename(path) + ) + ) + + def exists(self): + return self.repo.tree.exists(self.path) + + def check_file_exists(self): + if not self.exists(): + raise StageFileDoesNotExistError(self.path) + + def check_isfile(self): + if not self.repo.tree.isfile(self.path): + raise StageFileIsNotDvcFileError(self.path) + + @staticmethod + def _get_path_tag(s): + regex = re.compile(TAG_REGEX) + match = regex.match(s) + if not match: + return s, None + return match.group("path"), match.group("tag") + + def dump(self, stage): + """Dumps given stage appropriately in the dvcfile.""" + self.dump_single_stage(stage) + + def dump_single_stage(self, stage): + self.check_dvc_filename(self.path) + + logger.debug( + "Saving information to '{file}'.".format(file=relpath(self.path)) + ) + state = stage.dumpd() + + # When we load a stage we parse yaml with a fast parser, which strips + # off all the comments and formatting. To retain those on update we do + # a trick here: + # - reparse the same yaml text with a slow but smart ruamel yaml parser + # - apply changes to a returned structure + # - serialize it + if stage._stage_text is not None: + saved_state = parse_stage_for_update(stage._stage_text, self.path) + # Stage doesn't work with meta in any way, so .dumpd() doesn't + # have it. We simply copy it over. + if "meta" in saved_state: + state["meta"] = saved_state["meta"] + apply_diff(state, saved_state) + state = saved_state + + dump_stage_file(self.path, state) + + self.repo.scm.track_file(relpath(self.path)) + + def load(self): + """Loads single stage.""" + from dvc.stage import Stage + + # it raises the proper exceptions by priority: + # 1. when the file doesn't exists + # 2. filename is not a DVC-file + # 3. path doesn't represent a regular file + self.check_file_exists() + self.check_dvc_filename(self.path) + self.check_isfile() + + with self.repo.tree.open(self.path) as fd: + stage_text = fd.read() + d = parse_stage(stage_text, self.path) + + Dvcfile.validate(d, fname=relpath(self.path)) + path = os.path.abspath(self.path) + + stage = Stage( + repo=self.repo, + path=path, + wdir=os.path.abspath( + os.path.join( + os.path.dirname(path), d.get(Stage.PARAM_WDIR, ".") + ) + ), + cmd=d.get(Stage.PARAM_CMD), + md5=d.get(Stage.PARAM_MD5), + locked=d.get(Stage.PARAM_LOCKED, False), + tag=self.tag, + always_changed=d.get(Stage.PARAM_ALWAYS_CHANGED, False), + # We store stage text to apply updates to the same structure + stage_text=stage_text, + ) + + stage.deps = dependency.loadd_from( + stage, d.get(Stage.PARAM_DEPS) or [] + ) + stage.outs = output.loadd_from(stage, d.get(Stage.PARAM_OUTS) or []) + + return stage + + @staticmethod + def validate(d, fname=None): + from dvc.stage.schema import SINGLE_STAGE_SCHEMA + + try: + SINGLE_STAGE_SCHEMA(d) + except MultipleInvalid as exc: + raise StageFileFormatError(fname, exc) + + @classmethod + def create_stage(cls, repo, accompany_outs=False, **kwargs): + from dvc.stage import Stage + + stage = Stage.create(repo, accompany_outs=accompany_outs, **kwargs) + + ignore_build_cache = kwargs.get("ignore_build_cache", False) + # NOTE: remove outs before we check build cache + if kwargs.get("remove_outs", False): + logger.warning( + "--remove-outs is deprecated." + " It is now the default behavior," + " so there's no need to use this option anymore." + ) + stage.remove_outs(ignore_remove=False) + logger.warning("Build cache is ignored when using --remove-outs.") + ignore_build_cache = True + + dvcfile = Dvcfile(stage.repo, stage.path) + if dvcfile.exists(): + if any(out.persist for out in stage.outs): + logger.warning( + "Build cache is ignored when persisting outputs." + ) + ignore_build_cache = True + + if not ignore_build_cache and stage.can_be_skipped: + logger.info("Stage is cached, skipping.") + return None + + msg = ( + "'{}' already exists. Do you wish to run the command and " + "overwrite it?".format(stage.relpath) + ) + if not (kwargs.get("overwrite", True) or prompt.confirm(msg)): + raise StageFileAlreadyExistsError(stage.relpath) + + os.unlink(dvcfile.path) + + return stage diff --git a/dvc/output/base.py b/dvc/output/base.py index 573f7f06a6..879dd9a713 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -442,7 +442,7 @@ def get_used_cache(self, **kwargs): @classmethod def _validate_output_path(cls, path): - from dvc.stage import Stage + from dvc.dvcfile import Dvcfile - if Stage.is_valid_filename(path): + if Dvcfile.is_valid_filename(path): raise cls.IsStageFileError(path) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index e22b996749..8d50c54ded 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -193,7 +193,7 @@ def check_modified_graph(self, new_stages): def collect(self, target, with_deps=False, recursive=False, graph=None): import networkx as nx - from dvc.stage import Stage + from ..dvcfile import Dvcfile if not target: return list(graph) if graph else self.stages @@ -204,7 +204,7 @@ def collect(self, target, with_deps=False, recursive=False, graph=None): stages = nx.dfs_postorder_nodes(graph or self.graph) return [stage for stage in stages if path_isin(stage.path, target)] - stage = Stage.load(self, target) + stage = Dvcfile(self, target).load() # Optimization: do not collect the graph for a specific target if not with_deps: @@ -214,14 +214,14 @@ def collect(self, target, with_deps=False, recursive=False, graph=None): return list(nx.dfs_postorder_nodes(pipeline, stage)) def collect_granular(self, target, *args, **kwargs): - from dvc.stage import Stage + from ..dvcfile import Dvcfile if not target: return [(stage, None) for stage in self.stages] # Optimization: do not collect the graph for a specific .dvc target - if Stage.is_valid_filename(target) and not kwargs.get("with_deps"): - return [(Stage.load(self, target), None)] + if Dvcfile.is_valid_filename(target) and not kwargs.get("with_deps"): + return [(Dvcfile(self, target).load(), None)] try: (out,) = self.find_outs_by_path(target, strict=False) @@ -411,7 +411,7 @@ def stages(self): NOTE: For large repos, this could be an expensive operation. Consider using some memoization. """ - from dvc.stage import Stage + from ..dvcfile import Dvcfile stages = [] outs = set() @@ -419,9 +419,9 @@ def stages(self): for root, dirs, files in self.tree.walk(self.root_dir): for fname in files: path = os.path.join(root, fname) - if not Stage.is_valid_filename(path): + if not Dvcfile.is_valid_filename(path): continue - stage = Stage.load(self, path) + stage = Dvcfile(self, path).load() stages.append(stage) for out in stage.outs: diff --git a/dvc/repo/add.py b/dvc/repo/add.py index b0a0fa586c..30a582a977 100644 --- a/dvc/repo/add.py +++ b/dvc/repo/add.py @@ -4,6 +4,7 @@ import colorama from . import locked +from ..dvcfile import Dvcfile from ..exceptions import ( RecursiveAddingWhileUsingFilename, OverlappingOutputPathsError, @@ -11,7 +12,6 @@ from ..output.base import OutputDoesNotExistError from ..progress import Tqdm from ..repo.scm_context import scm_context -from ..stage import Stage from ..utils import LARGE_DIR_SIZE logger = logging.getLogger(__name__) @@ -85,7 +85,7 @@ def add(repo, targets, recursive=False, no_commit=False, fname=None): if not no_commit: stage.commit() - stage.dump() + Dvcfile(repo, stage.path).dump(stage) pbar_stages.update() stages_list += stages @@ -107,7 +107,7 @@ def _find_all_targets(repo, target, recursive): unit="file", ) if not repo.is_dvc_internal(fname) - if not Stage.is_stage_file(fname) + if not Dvcfile.is_stage_file(fname) if not repo.scm.belongs_to_scm(fname) if not repo.scm.is_tracked(fname) ] @@ -123,8 +123,8 @@ def _create_stages(repo, targets, fname, pbar=None): disable=len(targets) < LARGE_DIR_SIZE, unit="file", ): - stage = Stage.create( - repo, outs=[out], accompany_outs=True, fname=fname + stage = Dvcfile.create_stage( + repo, accompany_outs=True, outs=[out], fname=fname ) repo._reset() diff --git a/dvc/repo/checkout.py b/dvc/repo/checkout.py index 3a239bf1cb..0e4af1c3ba 100644 --- a/dvc/repo/checkout.py +++ b/dvc/repo/checkout.py @@ -2,8 +2,7 @@ import os from dvc.compat import fspath -from dvc.exceptions import CheckoutError -from dvc.exceptions import CheckoutErrorSuggestGit +from dvc.exceptions import CheckoutError, CheckoutErrorSuggestGit from dvc.progress import Tqdm from dvc.utils import relpath @@ -42,7 +41,10 @@ def _checkout( relink=False, recursive=False, ): - from dvc.stage import StageFileDoesNotExistError, StageFileBadNameError + from dvc.stage.exceptions import ( + StageFileBadNameError, + StageFileDoesNotExistError, + ) unused = [] stats = { diff --git a/dvc/repo/commit.py b/dvc/repo/commit.py index ceba50e597..738d93c1f9 100644 --- a/dvc/repo/commit.py +++ b/dvc/repo/commit.py @@ -1,4 +1,5 @@ from . import locked +from dvc.dvcfile import Dvcfile @locked @@ -7,4 +8,5 @@ def commit(self, target, with_deps=False, recursive=False, force=False): for stage in stages: stage.check_can_commit(force=force) stage.commit() - stage.dump() + + Dvcfile(self, stage.path).dump(stage) diff --git a/dvc/repo/get.py b/dvc/repo/get.py index 428e68f9af..81c90c7afb 100644 --- a/dvc/repo/get.py +++ b/dvc/repo/get.py @@ -4,7 +4,6 @@ import shortuuid from dvc.exceptions import DvcException -from dvc.stage import Stage from dvc.utils import resolve_output from dvc.utils.fs import remove from dvc.path_info import PathInfo @@ -23,10 +22,11 @@ def __init__(self): @staticmethod def get(url, path, out=None, rev=None): from dvc.external_repo import external_repo + from dvc.dvcfile import Dvcfile out = resolve_output(path, out) - if Stage.is_valid_filename(out): + if Dvcfile.is_valid_filename(out): raise GetDVCFileError() # Creating a directory right beside the output to make sure that they diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index d62f867788..fd0deba807 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -6,18 +6,17 @@ @locked_repo @scm_context def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): - from dvc.stage import Stage + from dvc.dvcfile import Dvcfile out = resolve_output(url, out) - - stage = Stage.create( + stage = Dvcfile.create_stage( self, cmd=None, deps=[url], outs=[out], - fname=fname, erepo=erepo, accompany_outs=True, + fname=fname, ) if stage is None: @@ -29,6 +28,6 @@ def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): stage.locked = locked - stage.dump() + Dvcfile(self, stage.path).dump(stage) return stage diff --git a/dvc/repo/lock.py b/dvc/repo/lock.py index 6da2576040..ca7f7c4107 100644 --- a/dvc/repo/lock.py +++ b/dvc/repo/lock.py @@ -3,10 +3,11 @@ @locked def lock(self, target, unlock=False): - from dvc.stage import Stage + from ..dvcfile import Dvcfile - stage = Stage.load(self, target) + dvcfile = Dvcfile(self, target) + stage = dvcfile.load() stage.locked = False if unlock else True - stage.dump() + dvcfile.dump(stage) return stage diff --git a/dvc/repo/metrics/modify.py b/dvc/repo/metrics/modify.py index acb547674f..05e7716670 100644 --- a/dvc/repo/metrics/modify.py +++ b/dvc/repo/metrics/modify.py @@ -1,3 +1,4 @@ +from dvc.dvcfile import Dvcfile from dvc.exceptions import DvcException from dvc.repo import locked @@ -37,4 +38,5 @@ def modify(repo, path, typ=None, xpath=None, delete=False): out.verify_metric() - out.stage.dump() + dvcfile = Dvcfile(repo, out.stage.path) + dvcfile.dump(out.stage) diff --git a/dvc/repo/move.py b/dvc/repo/move.py index d67a148660..4b27bc8175 100644 --- a/dvc/repo/move.py +++ b/dvc/repo/move.py @@ -33,6 +33,7 @@ def move(self, from_path, to_path): """ import dvc.output as Output from dvc.stage import Stage + from ..dvcfile import DVC_FILE_SUFFIX, Dvcfile from_out = Output.loads_from(Stage(self), [from_path])[0] assert from_out.scheme == "local" @@ -54,7 +55,7 @@ def move(self, from_path, to_path): stage.path = os.path.join( os.path.dirname(to_path), - os.path.basename(to_path) + Stage.STAGE_FILE_SUFFIX, + os.path.basename(to_path) + DVC_FILE_SUFFIX, ) stage.wdir = os.path.abspath( @@ -68,4 +69,4 @@ def move(self, from_path, to_path): out.move(to_out) stage.save() - stage.dump() + Dvcfile(self, stage.path).dump(stage) diff --git a/dvc/repo/remove.py b/dvc/repo/remove.py index 6c751e565d..1001d2c967 100644 --- a/dvc/repo/remove.py +++ b/dvc/repo/remove.py @@ -3,9 +3,9 @@ @locked def remove(self, target, outs_only=False): - from dvc.stage import Stage + from ..dvcfile import Dvcfile - stage = Stage.load(self, target) + stage = Dvcfile(self, target).load() if outs_only: stage.remove_outs(force=True) else: diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index b14245822b..bb31368c02 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -21,7 +21,10 @@ def _reproduce_stage(stage, **kwargs): return [] if not kwargs.get("dry", False): - stage.dump() + from ..dvcfile import Dvcfile + + dvcfile = Dvcfile(stage.repo, stage.path) + dvcfile.dump(stage) return [stage] @@ -58,7 +61,7 @@ def reproduce( all_pipelines=False, **kwargs ): - from dvc.stage import Stage + from ..dvcfile import Dvcfile if not target and not all_pipelines: raise InvalidArgumentError( @@ -76,7 +79,7 @@ def reproduce( if all_pipelines: pipelines = active_pipelines else: - stage = Stage.load(self, target) + stage = Dvcfile(self, target).load() pipelines = [get_pipeline(active_pipelines, stage)] targets = [] diff --git a/dvc/repo/run.py b/dvc/repo/run.py index e5d62ec872..74b5af5f10 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -5,9 +5,9 @@ @locked @scm_context def run(self, no_exec=False, **kwargs): - from dvc.stage import Stage + from dvc.dvcfile import Dvcfile - stage = Stage.create(self, **kwargs) + stage = Dvcfile.create_stage(self, **kwargs) if stage is None: return None @@ -17,6 +17,6 @@ def run(self, no_exec=False, **kwargs): if not no_exec: stage.run(no_commit=kwargs.get("no_commit", False)) - stage.dump() + Dvcfile(self, stage.path).dump(stage) return stage diff --git a/dvc/repo/tag/add.py b/dvc/repo/tag/add.py index 44f02d70cc..38875d6e1c 100644 --- a/dvc/repo/tag/add.py +++ b/dvc/repo/tag/add.py @@ -2,6 +2,7 @@ from copy import copy from dvc.repo import locked +from dvc.dvcfile import Dvcfile logger = logging.getLogger(__name__) @@ -19,4 +20,5 @@ def add(self, tag, target=None, with_deps=False, recursive=False): out.tags[tag] = copy(out.info) changed = True if changed: - stage.dump() + dvcfile = Dvcfile(self, stage.path) + dvcfile.dump(stage) diff --git a/dvc/repo/tag/remove.py b/dvc/repo/tag/remove.py index 91c8321cec..f8fb5638ce 100644 --- a/dvc/repo/tag/remove.py +++ b/dvc/repo/tag/remove.py @@ -1,6 +1,7 @@ import logging from dvc.repo import locked +from dvc.dvcfile import Dvcfile logger = logging.getLogger(__name__) @@ -18,4 +19,5 @@ def remove(self, tag, target=None, with_deps=False, recursive=False): del out.tags[tag] changed = True if changed: - stage.dump() + dvcfile = Dvcfile(self, stage.path) + dvcfile.dump(stage) diff --git a/dvc/repo/update.py b/dvc/repo/update.py index 44252d0057..f9bee48f3f 100644 --- a/dvc/repo/update.py +++ b/dvc/repo/update.py @@ -3,11 +3,13 @@ @locked def update(self, target, rev=None): - from dvc.stage import Stage + from ..dvcfile import Dvcfile + + dvcfile = Dvcfile(self, target) + stage = dvcfile.load() - stage = Stage.load(self, target) stage.update(rev) - stage.dump() + dvcfile.dump(stage) return stage diff --git a/dvc/stage.py b/dvc/stage/__init__.py similarity index 64% rename from dvc/stage.py rename to dvc/stage/__init__.py index c008cd3e80..81aabfa4b9 100644 --- a/dvc/stage.py +++ b/dvc/stage/__init__.py @@ -1,203 +1,37 @@ import pathlib import logging import os -import re import signal import subprocess import threading - -from functools import wraps from itertools import chain -from funcy import decorator - -from voluptuous import Any, Schema, MultipleInvalid import dvc.dependency as dependency import dvc.output as output import dvc.prompt as prompt from dvc.exceptions import CheckoutError, DvcException +from .decorators import rwlocked, unlocked_repo +from .exceptions import ( + StageCmdFailedError, + StagePathOutsideError, + StagePathNotFoundError, + StagePathNotDirectoryError, + StageCommitError, + StageUpdateError, + MissingDep, + MissingDataSource, +) +from . import schema from dvc.utils import dict_md5 from dvc.utils import fix_env from dvc.utils import relpath from dvc.utils.fs import path_isin -from dvc.utils.collections import apply_diff from dvc.utils.fs import contains_symlink_up_to -from dvc.utils.stage import dump_stage_file -from dvc.utils.stage import parse_stage -from dvc.utils.stage import parse_stage_for_update - logger = logging.getLogger(__name__) -class StageCmdFailedError(DvcException): - def __init__(self, stage): - msg = "stage '{}' cmd '{}' failed".format(stage.relpath, stage.cmd) - super().__init__(msg) - - -class StageFileFormatError(DvcException): - def __init__(self, fname, e): - msg = "DVC-file '{}' format error: {}".format(fname, str(e)) - super().__init__(msg) - - -class StageFileDoesNotExistError(DvcException): - def __init__(self, fname): - msg = "'{}' does not exist.".format(fname) - - sname = fname + Stage.STAGE_FILE_SUFFIX - if Stage.is_stage_file(sname): - msg += " Do you mean '{}'?".format(sname) - - super().__init__(msg) - - -class StageFileAlreadyExistsError(DvcException): - def __init__(self, relpath): - msg = "not overwriting '{}'".format(relpath) - super().__init__(msg) - - -class StageFileIsNotDvcFileError(DvcException): - def __init__(self, fname): - msg = "'{}' is not a DVC-file".format(fname) - - sname = fname + Stage.STAGE_FILE_SUFFIX - if Stage.is_stage_file(sname): - msg += " Do you mean '{}'?".format(sname) - - super().__init__(msg) - - -class StageFileBadNameError(DvcException): - pass - - -class StagePathOutsideError(DvcException): - pass - - -class StagePathNotFoundError(DvcException): - pass - - -class StagePathNotDirectoryError(DvcException): - pass - - -class StageCommitError(DvcException): - pass - - -class StageUpdateError(DvcException): - def __init__(self, path): - super().__init__( - "update is not supported for '{}' that is not an " - "import.".format(path) - ) - - -class MissingDep(DvcException): - def __init__(self, deps): - assert len(deps) > 0 - - if len(deps) > 1: - dep = "dependencies" - else: - dep = "dependency" - - msg = "missing '{}': {}".format(dep, ", ".join(map(str, deps))) - super().__init__(msg) - - -class MissingDataSource(DvcException): - def __init__(self, missing_files): - assert len(missing_files) > 0 - - source = "source" - if len(missing_files) > 1: - source += "s" - - msg = "missing data '{}': {}".format(source, ", ".join(missing_files)) - super().__init__(msg) - - -@decorator -def rwlocked(call, read=None, write=None): - import sys - from dvc.rwlock import rwlock - from dvc.dependency.repo import DependencyREPO - - if read is None: - read = [] - - if write is None: - write = [] - - stage = call._args[0] - - assert stage.repo.lock.is_locked - - def _chain(names): - return [ - item.path_info - for attr in names - for item in getattr(stage, attr) - # There is no need to lock DependencyREPO deps, as there is no - # corresponding OutputREPO, so we can't even write it. - if not isinstance(item, DependencyREPO) - ] - - cmd = " ".join(sys.argv) - - with rwlock(stage.repo.tmp_dir, cmd, _chain(read), _chain(write)): - return call() - - -def unlocked_repo(f): - @wraps(f) - def wrapper(stage, *args, **kwargs): - stage.repo.state.dump() - stage.repo.lock.unlock() - stage.repo._reset() - try: - ret = f(stage, *args, **kwargs) - finally: - stage.repo.lock.lock() - stage.repo.state.load() - return ret - - return wrapper - - -class Stage(object): - STAGE_FILE = "Dvcfile" - STAGE_FILE_SUFFIX = ".dvc" - - PARAM_MD5 = "md5" - PARAM_CMD = "cmd" - PARAM_WDIR = "wdir" - PARAM_DEPS = "deps" - PARAM_OUTS = "outs" - PARAM_LOCKED = "locked" - PARAM_META = "meta" - PARAM_ALWAYS_CHANGED = "always_changed" - - SCHEMA = { - PARAM_MD5: output.CHECKSUM_SCHEMA, - PARAM_CMD: Any(str, None), - PARAM_WDIR: Any(str, None), - PARAM_DEPS: Any([dependency.SCHEMA], None), - PARAM_OUTS: Any([output.SCHEMA], None), - PARAM_LOCKED: bool, - PARAM_META: object, - PARAM_ALWAYS_CHANGED: bool, - } - COMPILED_SCHEMA = Schema(SCHEMA) - - TAG_REGEX = r"^(?P.*)@(?P[^\\/@:]*)$" - +class Stage(schema.StageParams): def __init__( self, repo, @@ -218,7 +52,7 @@ def __init__( outs = [] self.repo = repo - self.path = path + self._path = path self.cmd = cmd self.wdir = wdir self.outs = outs @@ -229,6 +63,14 @@ def __init__( self.always_changed = always_changed self._stage_text = stage_text + @property + def path(self): + return self._path + + @path.setter + def path(self, path): + self._path = path + def __repr__(self): return "Stage: '{path}'".format( path=self.relpath if self.path else "No path" @@ -239,7 +81,7 @@ def __hash__(self): def __eq__(self, other): return ( - self.__class__ == other.__class__ + isinstance(other, Stage) and self.repo is other.repo and self.path_in_repo == other.path_in_repo ) @@ -257,17 +99,6 @@ def is_data_source(self): """Whether the DVC-file was created with `dvc add` or `dvc import`""" return self.cmd is None - @staticmethod - def is_valid_filename(path): - return ( - path.endswith(Stage.STAGE_FILE_SUFFIX) - or os.path.basename(path) == Stage.STAGE_FILE - ) - - @staticmethod - def is_stage_file(path): - return os.path.isfile(path) and Stage.is_valid_filename(path) - def changed_md5(self): return self.md5 != self._compute_md5() @@ -412,19 +243,14 @@ def update(self, rev=None): self.locked = locked @staticmethod - def validate(d, fname=None): - try: - Stage.COMPILED_SCHEMA(d) - except MultipleInvalid as exc: - raise StageFileFormatError(fname, exc) + def _stage_fname(outs, add): + from dvc.dvcfile import DVC_FILE, DVC_FILE_SUFFIX - @classmethod - def _stage_fname(cls, outs, add): if not outs: - return cls.STAGE_FILE + return DVC_FILE out = outs[0] - fname = out.path_info.name + cls.STAGE_FILE_SUFFIX + fname = out.path_info.name + DVC_FILE_SUFFIX if ( add @@ -459,6 +285,22 @@ def _check_stage_path(repo, path, is_wdir=False): error_msg.format("is outside of DVC repo") ) + def _check_and_set_wdir(self, wdir, is_wdir=True): + wdir = os.path.abspath(wdir) + Stage._check_stage_path(self.repo, wdir, is_wdir=is_wdir) + self.wdir = wdir + + def _check_and_set_path(self, fname): + path = os.path.abspath(fname) + Stage._check_stage_path(self.repo, os.path.dirname(path)) + self.path = path + + @property + def can_be_skipped(self): + return ( + self.is_cached and not self.is_callback and not self.always_changed + ) + @property def is_cached(self): """ @@ -466,8 +308,9 @@ def is_cached(self): """ from dvc.remote.local import RemoteLOCAL from dvc.remote.s3 import RemoteS3 + from dvc.dvcfile import Dvcfile - old = Stage.load(self.repo, self.path) + old = Dvcfile(self.repo, self.path).load() if old._changed_outs(): return False @@ -506,7 +349,7 @@ def is_cached(self): @staticmethod def create(repo, accompany_outs=False, **kwargs): - wdir = kwargs.get("wdir", None) or os.curdir + wdir = kwargs.get("wdir") or os.curdir fname = kwargs.get("fname", None) stage = Stage( @@ -517,174 +360,61 @@ def create(repo, accompany_outs=False, **kwargs): always_changed=kwargs.get("always_changed", False), ) - Stage._fill_stage_outputs(stage, **kwargs) - deps = dependency.loads_from( - stage, kwargs.get("deps", []), erepo=kwargs.get("erepo", None) - ) - params = dependency.loads_params(stage, kwargs.get("params", [])) - stage.deps = deps + params + stage._fill_stage_outputs(**kwargs) + stage._fill_stage_dependencies(**kwargs) stage._check_circular_dependency() stage._check_duplicated_arguments() - if not fname: - fname = Stage._stage_fname(stage.outs, accompany_outs) - stage._check_dvc_filename(fname) + fname = fname or Stage._stage_fname(stage.outs, accompany_outs) + + from dvc.dvcfile import Dvcfile + + Dvcfile.check_dvc_filename(fname) # Autodetecting wdir for add, we need to create outs first to do that, # so we start with wdir = . and remap out paths later. - if accompany_outs and kwargs.get("wdir") is None: + if accompany_outs and not kwargs.get("wdir"): wdir = os.path.dirname(fname) + stage._fix_outs_deps_path(wdir) - for out in chain(stage.outs, stage.deps): - if out.is_in_repo: - out.def_path = relpath(out.path_info, wdir) - - wdir = os.path.abspath(wdir) - path = os.path.abspath(fname) - - Stage._check_stage_path(repo, wdir, is_wdir=kwargs.get("wdir")) - Stage._check_stage_path(repo, os.path.dirname(path)) - - stage.wdir = wdir - stage.path = path - - ignore_build_cache = kwargs.get("ignore_build_cache", False) - - # NOTE: remove outs before we check build cache - if kwargs.get("remove_outs", False): - logger.warning( - "--remove-outs is deprecated." - " It is now the default behavior," - " so there's no need to use this option anymore." - ) - stage.remove_outs(ignore_remove=False) - logger.warning("Build cache is ignored when using --remove-outs.") - ignore_build_cache = True - - if os.path.exists(path) and any(out.persist for out in stage.outs): - logger.warning("Build cache is ignored when persisting outputs.") - ignore_build_cache = True - - if os.path.exists(path): - if ( - not ignore_build_cache - and stage.is_cached - and not stage.is_callback - and not stage.always_changed - ): - logger.info("Stage is cached, skipping.") - return None - - msg = ( - "'{}' already exists. Do you wish to run the command and " - "overwrite it?".format(stage.relpath) - ) - - if not kwargs.get("overwrite", True) and not prompt.confirm(msg): - raise StageFileAlreadyExistsError(stage.relpath) - - os.unlink(path) + stage._check_and_set_wdir(wdir, is_wdir=kwargs.get("wdir", False)) + stage._check_and_set_path(fname) return stage - @staticmethod - def _fill_stage_outputs(stage, **kwargs): - stage.outs = output.loads_from( - stage, kwargs.get("outs", []), use_cache=True - ) - stage.outs += output.loads_from( - stage, kwargs.get("metrics", []), use_cache=True, metric=True - ) - stage.outs += output.loads_from( - stage, kwargs.get("outs_persist", []), use_cache=True, persist=True - ) - stage.outs += output.loads_from( - stage, kwargs.get("outs_no_cache", []), use_cache=False - ) - stage.outs += output.loads_from( - stage, - kwargs.get("metrics_no_cache", []), - use_cache=False, - metric=True, - ) - stage.outs += output.loads_from( - stage, - kwargs.get("outs_persist_no_cache", []), - use_cache=False, - persist=True, - ) - - @staticmethod - def _check_dvc_filename(fname): - if not Stage.is_valid_filename(fname): - raise StageFileBadNameError( - "bad DVC-file name '{}'. DVC-files should be named " - "'Dvcfile' or have a '.dvc' suffix (e.g. '{}.dvc').".format( - relpath(fname), os.path.basename(fname) - ) + def _fill_stage_outputs(self, **kwargs): + assert not self.outs + + self.outs = [] + for key in [ + "outs", + "metrics", + "outs_persist", + "outs_no_cache", + "metrics_no_cache", + "outs_persist_no_cache", + ]: + self.outs += output.loads_from( + self, + kwargs.get(key, []), + use_cache="no_cache" not in key, + persist="persist" in key, + metric="metrics" in key, ) - @staticmethod - def _check_file_exists(repo, fname): - if not repo.tree.exists(fname): - raise StageFileDoesNotExistError(fname) - - @staticmethod - def _check_isfile(repo, fname): - if not repo.tree.isfile(fname): - raise StageFileIsNotDvcFileError(fname) - - @classmethod - def _get_path_tag(cls, s): - regex = re.compile(cls.TAG_REGEX) - match = regex.match(s) - if not match: - return s, None - return match.group("path"), match.group("tag") - - @staticmethod - def load(repo, fname): - fname, tag = Stage._get_path_tag(fname) - - # it raises the proper exceptions by priority: - # 1. when the file doesn't exists - # 2. filename is not a DVC-file - # 3. path doesn't represent a regular file - Stage._check_file_exists(repo, fname) - Stage._check_dvc_filename(fname) - Stage._check_isfile(repo, fname) - - with repo.tree.open(fname) as fd: - stage_text = fd.read() - d = parse_stage(stage_text, fname) - - Stage.validate(d, fname=relpath(fname)) - path = os.path.abspath(fname) - - stage = Stage( - repo=repo, - path=path, - wdir=os.path.abspath( - os.path.join( - os.path.dirname(path), d.get(Stage.PARAM_WDIR, ".") - ) - ), - cmd=d.get(Stage.PARAM_CMD), - md5=d.get(Stage.PARAM_MD5), - locked=d.get(Stage.PARAM_LOCKED, False), - tag=tag, - always_changed=d.get(Stage.PARAM_ALWAYS_CHANGED, False), - # We store stage text to apply updates to the same structure - stage_text=stage_text, + def _fill_stage_dependencies(self, **kwargs): + assert not self.deps + self.deps = [] + self.deps += dependency.loads_from( + self, kwargs.get("deps", []), erepo=kwargs.get("erepo", None) ) + self.deps += dependency.loads_params(self, kwargs.get("params", [])) - stage.deps = dependency.loadd_from( - stage, d.get(Stage.PARAM_DEPS) or [] - ) - stage.outs = output.loadd_from(stage, d.get(Stage.PARAM_OUTS) or []) - - return stage + def _fix_outs_deps_path(self, wdir): + for out in chain(self.outs, self.deps): + if out.is_in_repo: + out.def_path = relpath(out.path_info, wdir) def dumpd(self): rel_wdir = relpath(self.wdir, os.path.dirname(self.path)) @@ -706,35 +436,6 @@ def dumpd(self): if value } - def dump(self): - fname = self.path - - self._check_dvc_filename(fname) - - logger.debug( - "Saving information to '{file}'.".format(file=relpath(fname)) - ) - state = self.dumpd() - - # When we load a stage we parse yaml with a fast parser, which strips - # off all the comments and formatting. To retain those on update we do - # a trick here: - # - reparse the same yaml text with a slow but smart ruamel yaml parser - # - apply changes to a returned structure - # - serialize it - if self._stage_text is not None: - saved_state = parse_stage_for_update(self._stage_text, fname) - # Stage doesn't work with meta in any way, so .dumpd() doesn't - # have it. We simply copy it over. - if "meta" in saved_state: - state["meta"] = saved_state["meta"] - apply_diff(state, saved_state) - state = saved_state - - dump_stage_file(fname, state) - - self.repo.scm.track_file(relpath(fname)) - def _compute_md5(self): from dvc.output.base import OutputBase @@ -1040,7 +741,7 @@ def get_all_files_number(self, filter_info=None): ) def get_used_cache(self, *args, **kwargs): - from .cache import NamedCache + from dvc.cache import NamedCache cache = NamedCache() for out in self._filter_outs(kwargs.get("filter_info")): diff --git a/dvc/stage/decorators.py b/dvc/stage/decorators.py new file mode 100644 index 0000000000..31a1f840ab --- /dev/null +++ b/dvc/stage/decorators.py @@ -0,0 +1,50 @@ +from funcy import decorator +from functools import wraps + + +@decorator +def rwlocked(call, read=None, write=None): + import sys + from dvc.rwlock import rwlock + from dvc.dependency.repo import DependencyREPO + + if read is None: + read = [] + + if write is None: + write = [] + + stage = call._args[0] + + assert stage.repo.lock.is_locked + + def _chain(names): + return [ + item.path_info + for attr in names + for item in getattr(stage, attr) + # There is no need to lock DependencyREPO deps, as there is no + # corresponding OutputREPO, so we can't even write it. + if not isinstance(item, DependencyREPO) + ] + + cmd = " ".join(sys.argv) + + with rwlock(stage.repo.tmp_dir, cmd, _chain(read), _chain(write)): + return call() + + +def unlocked_repo(f): + @wraps(f) + def wrapper(stage, *args, **kwargs): + stage.repo.state.dump() + stage.repo.lock.unlock() + stage.repo._reset() + try: + ret = f(stage, *args, **kwargs) + finally: + stage.repo.lock.lock() + stage.repo.state.load() + return ret + + return wrapper diff --git a/dvc/stage/exceptions.py b/dvc/stage/exceptions.py new file mode 100644 index 0000000000..f5211ec856 --- /dev/null +++ b/dvc/stage/exceptions.py @@ -0,0 +1,94 @@ +from dvc.exceptions import DvcException + + +class StageCmdFailedError(DvcException): + def __init__(self, stage): + msg = "stage '{}' cmd '{}' failed".format(stage.relpath, stage.cmd) + super().__init__(msg) + + +class StageFileFormatError(DvcException): + def __init__(self, fname, e): + msg = "DVC-file '{}' format error: {}".format(fname, str(e)) + super().__init__(msg) + + +class StageFileDoesNotExistError(DvcException): + def __init__(self, fname): + from dvc.dvcfile import DVC_FILE_SUFFIX, Dvcfile + + msg = "'{}' does not exist.".format(fname) + + sname = fname + DVC_FILE_SUFFIX + if Dvcfile.is_stage_file(sname): + msg += " Do you mean '{}'?".format(sname) + + super().__init__(msg) + + +class StageFileAlreadyExistsError(DvcException): + def __init__(self, relpath): + msg = "not overwriting '{}'".format(relpath) + super().__init__(msg) + + +class StageFileIsNotDvcFileError(DvcException): + def __init__(self, fname): + from dvc.dvcfile import Dvcfile, DVC_FILE_SUFFIX + + msg = "'{}' is not a DVC-file".format(fname) + + sname = fname + DVC_FILE_SUFFIX + if Dvcfile.is_stage_file(sname): + msg += " Do you mean '{}'?".format(sname) + + super().__init__(msg) + + +class StageFileBadNameError(DvcException): + pass + + +class StagePathOutsideError(DvcException): + pass + + +class StagePathNotFoundError(DvcException): + pass + + +class StagePathNotDirectoryError(DvcException): + pass + + +class StageCommitError(DvcException): + pass + + +class StageUpdateError(DvcException): + def __init__(self, path): + super().__init__( + "update is not supported for '{}' that is not an " + "import.".format(path) + ) + + +class MissingDep(DvcException): + def __init__(self, deps): + assert len(deps) > 0 + + dep = "dependencies" if len(deps) > 1 else "dependency" + msg = "missing '{}': {}".format(dep, ", ".join(map(str, deps))) + super().__init__(msg) + + +class MissingDataSource(DvcException): + def __init__(self, missing_files): + assert len(missing_files) > 0 + + source = "source" + if len(missing_files) > 1: + source += "s" + + msg = "missing data '{}': {}".format(source, ", ".join(missing_files)) + super().__init__(msg) diff --git a/dvc/stage/schema.py b/dvc/stage/schema.py new file mode 100644 index 0000000000..dc811c447d --- /dev/null +++ b/dvc/stage/schema.py @@ -0,0 +1,28 @@ +from voluptuous import Any, Schema +from dvc import dependency +from dvc import output + + +class StageParams: + PARAM_MD5 = "md5" + PARAM_CMD = "cmd" + PARAM_WDIR = "wdir" + PARAM_DEPS = "deps" + PARAM_OUTS = "outs" + PARAM_LOCKED = "locked" + PARAM_META = "meta" + PARAM_ALWAYS_CHANGED = "always_changed" + + +SCHEMA = { + StageParams.PARAM_MD5: output.CHECKSUM_SCHEMA, + StageParams.PARAM_CMD: Any(str, None), + StageParams.PARAM_WDIR: Any(str, None), + StageParams.PARAM_DEPS: Any([dependency.SCHEMA], None), + StageParams.PARAM_OUTS: Any([output.SCHEMA], None), + StageParams.PARAM_LOCKED: bool, + StageParams.PARAM_META: object, + StageParams.PARAM_ALWAYS_CHANGED: bool, +} + +SINGLE_STAGE_SCHEMA = Schema(SCHEMA) diff --git a/tests/func/test_add.py b/tests/func/test_add.py index 6a23ee832c..f6d8a1c22e 100644 --- a/tests/func/test_add.py +++ b/tests/func/test_add.py @@ -11,6 +11,7 @@ import dvc as dvc_module from dvc.cache import Cache +from dvc.dvcfile import DVC_FILE_SUFFIX from dvc.exceptions import DvcException, OverlappingOutputPathsError from dvc.exceptions import RecursiveAddingWhileUsingFilename from dvc.exceptions import StageFileCorruptedError @@ -354,7 +355,7 @@ def _test(self): ret = main(["add", os.path.join(self.link_name, self.data_file_name)]) self.assertEqual(0, ret) - stage_file = self.data_file_name + Stage.STAGE_FILE_SUFFIX + stage_file = self.data_file_name + DVC_FILE_SUFFIX self.assertTrue(os.path.exists(stage_file)) d = load_stage_file(stage_file) @@ -395,13 +396,13 @@ def is_symlink_true_below_dvc_root(path): self.assertEqual(0, ret) stage_file_path_on_data_below_symlink = ( - os.path.basename(self.DATA) + Stage.STAGE_FILE_SUFFIX + os.path.basename(self.DATA) + DVC_FILE_SUFFIX ) self.assertFalse( os.path.exists(stage_file_path_on_data_below_symlink) ) - stage_file_path = self.DATA + Stage.STAGE_FILE_SUFFIX + stage_file_path = self.DATA + DVC_FILE_SUFFIX self.assertTrue(os.path.exists(stage_file_path)) @@ -410,7 +411,7 @@ def test(self): ret = main(["add", self.FOO]) assert 0 == ret - foo_stage = relpath(self.FOO + Stage.STAGE_FILE_SUFFIX) + foo_stage = relpath(self.FOO + DVC_FILE_SUFFIX) # corrupt stage file with open(foo_stage, "a+") as file: @@ -678,7 +679,7 @@ def test_add_empty_files(tmp_dir, dvc, link): stages = tmp_dir.dvc_gen(file, "") assert (tmp_dir / file).exists() - assert (tmp_dir / (file + Stage.STAGE_FILE_SUFFIX)).exists() + assert (tmp_dir / (file + DVC_FILE_SUFFIX)).exists() assert os.path.exists(stages[0].outs[0].cache_path) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index e6b92111a2..20d59a5ee5 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -15,8 +15,11 @@ from dvc.remote.local import RemoteLOCAL from dvc.repo import Repo as DvcRepo from dvc.stage import Stage -from dvc.stage import StageFileBadNameError -from dvc.stage import StageFileDoesNotExistError +from dvc.dvcfile import Dvcfile, DVC_FILE_SUFFIX +from dvc.stage.exceptions import ( + StageFileDoesNotExistError, + StageFileBadNameError, +) from dvc.system import System from dvc.utils import relpath from dvc.utils.fs import walk_files @@ -219,7 +222,7 @@ def test(self): ret = main(["add", self.DATA_DIR]) self.assertEqual(0, ret) - stage_path = self.DATA_DIR + Stage.STAGE_FILE_SUFFIX + stage_path = self.DATA_DIR + DVC_FILE_SUFFIX stage = load_stage_file(stage_path) staged_files = self.outs_info(stage) @@ -583,7 +586,7 @@ def test_checkout_stats_on_failure(tmp_dir, dvc, scm): {"foo": "foo", "dir": {"subdir": {"file": "file"}}, "other": "other"}, commit="initial", ) - stage = Stage.load(dvc, "foo.dvc") + stage = Dvcfile(dvc, "foo.dvc").load() tmp_dir.dvc_gen({"foo": "foobar", "other": "other other"}, commit="second") # corrupt cache diff --git a/tests/func/test_commit.py b/tests/func/test_commit.py index 8cd254553a..26f0f0fa69 100644 --- a/tests/func/test_commit.py +++ b/tests/func/test_commit.py @@ -1,6 +1,6 @@ import pytest -from dvc.stage import StageCommitError +from dvc.stage.exceptions import StageCommitError from dvc.utils.stage import dump_stage_file from dvc.utils.stage import load_stage_file diff --git a/tests/func/test_import.py b/tests/func/test_import.py index 11324cb911..58a2406719 100644 --- a/tests/func/test_import.py +++ b/tests/func/test_import.py @@ -8,7 +8,7 @@ from dvc.cache import Cache from dvc.exceptions import DownloadError, PathMissingError from dvc.config import NoRemoteError -from dvc.stage import Stage +from dvc.dvcfile import Dvcfile from dvc.system import System from dvc.utils.fs import makedirs, remove import dvc.data_cloud as cloud @@ -182,7 +182,7 @@ def test_pull_imported_stage(tmp_dir, dvc, erepo_dir): erepo_dir.dvc_gen("foo", "foo content", commit="create foo") dvc.imp(fspath(erepo_dir), "foo", "foo_imported") - dst_stage = Stage.load(dvc, "foo_imported.dvc") + dst_stage = Dvcfile(dvc, "foo_imported.dvc").load() dst_cache = dst_stage.outs[0].cache_path remove("foo_imported") @@ -232,7 +232,7 @@ def test_download_error_pulling_imported_stage(tmp_dir, dvc, erepo_dir): erepo_dir.dvc_gen("foo", "foo content", commit="create foo") dvc.imp(fspath(erepo_dir), "foo", "foo_imported") - dst_stage = Stage.load(dvc, "foo_imported.dvc") + dst_stage = Dvcfile(dvc, "foo_imported.dvc").load() dst_cache = dst_stage.outs[0].cache_path remove("foo_imported") @@ -295,7 +295,8 @@ def test_pull_no_rev_lock(erepo_dir, tmp_dir, dvc): stage = dvc.imp(fspath(erepo_dir), "foo", "foo_imported") assert "rev" not in stage.deps[0].def_repo stage.deps[0].def_repo.pop("rev_lock") - stage.dump() + + Dvcfile(dvc, stage.path).dump(stage) remove(stage.outs[0].cache_path) (tmp_dir / "foo_imported").unlink() diff --git a/tests/func/test_move.py b/tests/func/test_move.py index 416ea36827..f3996e0808 100644 --- a/tests/func/test_move.py +++ b/tests/func/test_move.py @@ -1,9 +1,9 @@ import os +from dvc.dvcfile import DVC_FILE_SUFFIX from dvc.exceptions import DvcException from dvc.exceptions import MoveNotDataSourceError from dvc.main import main -from dvc.stage import Stage from dvc.utils.stage import load_stage_file from tests.basic_env import TestDvc from tests.basic_env import TestDvcGit @@ -84,13 +84,13 @@ def test(self): class TestMoveFileToDirectory(TestDvc): def test(self): - foo_dvc_file = self.FOO + Stage.STAGE_FILE_SUFFIX + foo_dvc_file = self.FOO + DVC_FILE_SUFFIX ret = main(["add", self.FOO]) self.assertEqual(ret, 0) self.assertTrue(os.path.exists(foo_dvc_file)) new_foo_path = os.path.join(self.DATA_DIR, self.FOO) - new_foo_dvc_path = new_foo_path + Stage.STAGE_FILE_SUFFIX + new_foo_dvc_path = new_foo_path + DVC_FILE_SUFFIX ret = main(["move", self.FOO, new_foo_path]) self.assertEqual(ret, 0) @@ -102,13 +102,13 @@ def test(self): class TestMoveFileToDirectoryWithoutSpecifiedTargetName(TestDvc): def test(self): - foo_stage_file_path = self.FOO + Stage.STAGE_FILE_SUFFIX + foo_stage_file_path = self.FOO + DVC_FILE_SUFFIX ret = main(["add", self.FOO]) self.assertEqual(ret, 0) self.assertTrue(os.path.exists(foo_stage_file_path)) target_foo_path = os.path.join(self.DATA_DIR, self.FOO) - target_foo_stage_file_path = target_foo_path + Stage.STAGE_FILE_SUFFIX + target_foo_stage_file_path = target_foo_path + DVC_FILE_SUFFIX ret = main(["move", self.FOO, self.DATA_DIR]) self.assertEqual(ret, 0) @@ -135,7 +135,7 @@ def test(self): self.dvc.move(self.DATA_DIR, dir_name) - data_dir_stage = self.DATA_DIR + Stage.STAGE_FILE_SUFFIX + data_dir_stage = self.DATA_DIR + DVC_FILE_SUFFIX self.assertFalse(os.path.exists(self.DATA_DIR)) self.assertFalse(os.path.exists(data_dir_stage)) @@ -146,13 +146,13 @@ def test(self): ) self.assertTrue(os.path.exists(new_dir_name)) - self.assertTrue(os.path.isfile(new_dir_name + Stage.STAGE_FILE_SUFFIX)) + self.assertTrue(os.path.isfile(new_dir_name + DVC_FILE_SUFFIX)) self.assertEqual(set(os.listdir(new_dir_name)), orig_listdir) class TestMoveFileBetweenDirectories(TestDvc): def test(self): - data_stage_file = self.DATA + Stage.STAGE_FILE_SUFFIX + data_stage_file = self.DATA + DVC_FILE_SUFFIX ret = main(["add", self.DATA]) self.assertEqual(ret, 0) self.assertTrue(os.path.exists(data_stage_file)) @@ -164,7 +164,7 @@ def test(self): self.assertEqual(ret, 0) new_data_path = os.path.join(new_data_dir, os.path.basename(self.DATA)) - new_data_stage_file = new_data_path + Stage.STAGE_FILE_SUFFIX + new_data_stage_file = new_data_path + DVC_FILE_SUFFIX self.assertFalse(os.path.exists(self.DATA)) self.assertFalse(os.path.exists(data_stage_file)) diff --git a/tests/func/test_remove.py b/tests/func/test_remove.py index d2a622af3b..7df47300bc 100644 --- a/tests/func/test_remove.py +++ b/tests/func/test_remove.py @@ -5,7 +5,7 @@ from dvc.exceptions import DvcException from dvc.main import main from dvc.stage import Stage -from dvc.stage import StageFileDoesNotExistError +from dvc.stage.exceptions import StageFileDoesNotExistError from dvc.system import System from dvc.utils.fs import remove from tests.basic_env import TestDvc diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index af5a30435b..bdaa0544ff 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -28,7 +28,8 @@ from dvc.remote.local import RemoteLOCAL from dvc.repo import Repo as DvcRepo from dvc.stage import Stage -from dvc.stage import StageFileDoesNotExistError +from dvc.dvcfile import Dvcfile +from dvc.stage.exceptions import StageFileDoesNotExistError from dvc.system import System from dvc.utils import file_md5 from dvc.utils import relpath @@ -175,7 +176,7 @@ def test_nested(self): # be processed before dir1 to load error.dvc first. self.dvc.stages = [ nested_stage, - Stage.load(self.dvc, error_stage_path), + Dvcfile(self.dvc, error_stage_path).load(), ] with patch.object(self.dvc, "_reset"): # to prevent `stages` resetting @@ -1207,31 +1208,34 @@ def test(self): class TestReproAllPipelines(TestDvc): def test(self): - self.dvc.run( - fname="start.dvc", outs=["start.txt"], cmd="echo start > start.txt" - ) - - self.dvc.run( - fname="middle.dvc", - deps=["start.txt"], - outs=["middle.txt"], - cmd="echo middle > middle.txt", - ) - - self.dvc.run( - fname="final.dvc", - deps=["middle.txt"], - outs=["final.txt"], - cmd="echo final > final.txt", - ) - - self.dvc.run( - fname="disconnected.dvc", - outs=["disconnected.txt"], - cmd="echo other > disconnected.txt", - ) + stages = [ + self.dvc.run( + fname="start.dvc", + outs=["start.txt"], + cmd="echo start > start.txt", + ), + self.dvc.run( + fname="middle.dvc", + deps=["start.txt"], + outs=["middle.txt"], + cmd="echo middle > middle.txt", + ), + self.dvc.run( + fname="final.dvc", + deps=["middle.txt"], + outs=["final.txt"], + cmd="echo final > final.txt", + ), + self.dvc.run( + fname="disconnected.dvc", + outs=["disconnected.txt"], + cmd="echo other > disconnected.txt", + ), + ] - with patch.object(Stage, "reproduce") as mock_reproduce: + with patch.object( + Stage, "reproduce", side_effect=stages + ) as mock_reproduce: ret = main(["repro", "--all-pipelines"]) self.assertEqual(ret, 0) self.assertEqual(mock_reproduce.call_count, 4) @@ -1269,7 +1273,7 @@ def test_force_with_dependencies(self): ret = main(["repro", "--force", "datetime.dvc"]) self.assertEqual(ret, 0) - repro_out = Stage.load(self.dvc, "datetime.dvc").outs[0] + repro_out = Dvcfile(self.dvc, "datetime.dvc").load().outs[0] self.assertNotEqual(run_out.checksum, repro_out.checksum) @@ -1315,13 +1319,11 @@ def test(self): self.assertEqual(0, ret) self._caplog.clear() + + from dvc.dvcfile import DVC_FILE_SUFFIX + ret = main( - [ - "repro", - "--force", - "--metrics", - metrics_file + Stage.STAGE_FILE_SUFFIX, - ] + ["repro", "--force", "--metrics", metrics_file + DVC_FILE_SUFFIX] ) self.assertEqual(0, ret) diff --git a/tests/func/test_run.py b/tests/func/test_run.py index 8fcd7603ed..94c8130c09 100644 --- a/tests/func/test_run.py +++ b/tests/func/test_run.py @@ -14,17 +14,20 @@ from dvc.exceptions import OutputDuplicationError from dvc.exceptions import OverlappingOutputPathsError from dvc.exceptions import StagePathAsOutputError +from dvc.dvcfile import DVC_FILE_SUFFIX from dvc.main import main from dvc.output import OutputBase from dvc.output.base import OutputIsStageFileError from dvc.repo import Repo as DvcRepo -from dvc.stage import MissingDep from dvc.stage import Stage -from dvc.stage import StageFileAlreadyExistsError -from dvc.stage import StageFileBadNameError -from dvc.stage import StagePathNotDirectoryError -from dvc.stage import StagePathNotFoundError -from dvc.stage import StagePathOutsideError +from dvc.stage.exceptions import ( + StageFileAlreadyExistsError, + StageFileBadNameError, + StagePathOutsideError, + StagePathNotFoundError, + StagePathNotDirectoryError, + MissingDep, +) from dvc.system import System from dvc.utils import file_md5 from dvc.utils.stage import load_stage_file @@ -229,9 +232,7 @@ def test(self): with self.assertRaises(StagePathOutsideError): self.dvc.run( cmd="", - fname=os.path.join( - self.mkdtemp(), self.FOO + Stage.STAGE_FILE_SUFFIX - ), + fname=os.path.join(self.mkdtemp(), self.FOO + DVC_FILE_SUFFIX), ) def test_same_prefix(self): @@ -239,16 +240,14 @@ def test_same_prefix(self): path = "{}-{}".format(self._root_dir, uuid.uuid4()) os.mkdir(path) self.dvc.run( - cmd="", - fname=os.path.join(path, self.FOO + Stage.STAGE_FILE_SUFFIX), + cmd="", fname=os.path.join(path, self.FOO + DVC_FILE_SUFFIX), ) def test_not_found(self): with self.assertRaises(StagePathNotFoundError): path = os.path.join(self._root_dir, str(uuid.uuid4())) self.dvc.run( - cmd="", - fname=os.path.join(path, self.FOO + Stage.STAGE_FILE_SUFFIX), + cmd="", fname=os.path.join(path, self.FOO + DVC_FILE_SUFFIX), ) @@ -554,7 +553,7 @@ def test_fname_changes_path_and_wdir(self): dname = "dir" os.mkdir(os.path.join(self._root_dir, dname)) foo = os.path.join(dname, self.FOO) - fname = os.path.join(dname, "stage" + Stage.STAGE_FILE_SUFFIX) + fname = os.path.join(dname, "stage" + DVC_FILE_SUFFIX) stage = self.dvc.run( cmd="echo test > {}".format(foo), outs=[foo], fname=fname ) @@ -645,7 +644,7 @@ def outs_command(self): def _test(self): file = "file.txt" file_content = "content" - stage_file = file + Stage.STAGE_FILE_SUFFIX + stage_file = file + DVC_FILE_SUFFIX self.run_command(file, file_content) self.stage_should_contain_persist_flag(stage_file) @@ -715,8 +714,8 @@ def test(self): ) error_output = str(err.exception) - data_dir_stage = self.DATA_DIR + Stage.STAGE_FILE_SUFFIX - data_stage = os.path.basename(self.DATA) + Stage.STAGE_FILE_SUFFIX + data_dir_stage = self.DATA_DIR + DVC_FILE_SUFFIX + data_stage = os.path.basename(self.DATA) + DVC_FILE_SUFFIX self.assertIn("Paths for outs:\n", error_output) self.assertIn( diff --git a/tests/func/test_stage.py b/tests/func/test_stage.py index 64c934fb67..08e9b488a2 100644 --- a/tests/func/test_stage.py +++ b/tests/func/test_stage.py @@ -1,77 +1,70 @@ import os import tempfile - +import pytest from dvc.main import main from dvc.output.local import OutputLOCAL from dvc.remote.local import RemoteLOCAL from dvc.repo import Repo from dvc.stage import Stage -from dvc.stage import StageFileFormatError +from dvc.dvcfile import Dvcfile +from dvc.stage.exceptions import StageFileFormatError from dvc.utils.stage import dump_stage_file from dvc.utils.stage import load_stage_file from tests.basic_env import TestDvc -class TestSchema(TestDvc): - def _validate_fail(self, d): - with self.assertRaises(StageFileFormatError): - Stage.validate(d) +def test_cmd_obj(): + with pytest.raises(StageFileFormatError): + Dvcfile.validate({Stage.PARAM_CMD: {}}) + + +def test_cmd_none(): + Dvcfile.validate({Stage.PARAM_CMD: None}) -class TestSchemaCmd(TestSchema): - def test_cmd_object(self): - d = {Stage.PARAM_CMD: {}} - self._validate_fail(d) +def test_no_cmd(): + Dvcfile.validate({}) - def test_cmd_none(self): - d = {Stage.PARAM_CMD: None} - Stage.validate(d) - def test_no_cmd(self): - d = {} - Stage.validate(d) +def test_cmd_str(): + Dvcfile.validate({Stage.PARAM_CMD: "cmd"}) - def test_cmd_str(self): - d = {Stage.PARAM_CMD: "cmd"} - Stage.validate(d) +def test_object(): + with pytest.raises(StageFileFormatError): + Dvcfile.validate({Stage.PARAM_DEPS: {}}) -class TestSchemaDepsOuts(TestSchema): - def test_object(self): - d = {Stage.PARAM_DEPS: {}} - self._validate_fail(d) + with pytest.raises(StageFileFormatError): + Dvcfile.validate({Stage.PARAM_OUTS: {}}) - d = {Stage.PARAM_OUTS: {}} - self._validate_fail(d) - def test_none(self): - d = {Stage.PARAM_DEPS: None} - Stage.validate(d) +def test_none(): + Dvcfile.validate({Stage.PARAM_DEPS: None}) + Dvcfile.validate({Stage.PARAM_OUTS: None}) - d = {Stage.PARAM_OUTS: None} - Stage.validate(d) - def test_empty_list(self): - d = {Stage.PARAM_DEPS: []} - Stage.validate(d) +def test_empty_list(): + d = {Stage.PARAM_DEPS: []} + Dvcfile.validate(d) - d = {Stage.PARAM_OUTS: []} - Stage.validate(d) + d = {Stage.PARAM_OUTS: []} + Dvcfile.validate(d) - def test_list(self): - lst = [ - {OutputLOCAL.PARAM_PATH: "foo", RemoteLOCAL.PARAM_CHECKSUM: "123"}, - {OutputLOCAL.PARAM_PATH: "bar", RemoteLOCAL.PARAM_CHECKSUM: None}, - {OutputLOCAL.PARAM_PATH: "baz"}, - ] - d = {Stage.PARAM_DEPS: lst} - Stage.validate(d) - lst[0][OutputLOCAL.PARAM_CACHE] = True - lst[1][OutputLOCAL.PARAM_CACHE] = False - d = {Stage.PARAM_OUTS: lst} - Stage.validate(d) +def test_list(): + lst = [ + {OutputLOCAL.PARAM_PATH: "foo", RemoteLOCAL.PARAM_CHECKSUM: "123"}, + {OutputLOCAL.PARAM_PATH: "bar", RemoteLOCAL.PARAM_CHECKSUM: None}, + {OutputLOCAL.PARAM_PATH: "baz"}, + ] + d = {Stage.PARAM_DEPS: lst} + Dvcfile.validate(d) + + lst[0][OutputLOCAL.PARAM_CACHE] = True + lst[1][OutputLOCAL.PARAM_CACHE] = False + d = {Stage.PARAM_OUTS: lst} + Dvcfile.validate(d) class TestReload(TestDvc): @@ -88,9 +81,11 @@ def test(self): d[stage.PARAM_MD5] = md5 dump_stage_file(stage.relpath, d) - stage = Stage.load(self.dvc, stage.relpath) + dvcfile = Dvcfile(self.dvc, stage.relpath) + stage = dvcfile.load() + self.assertTrue(stage is not None) - stage.dump() + dvcfile.dump(stage) d = load_stage_file(stage.relpath) self.assertEqual(d[stage.PARAM_MD5], md5) @@ -111,7 +106,7 @@ def test_ignored_in_checksum(self): self.assertNotIn(Stage.PARAM_WDIR, d.keys()) with self.dvc.lock, self.dvc.state: - stage = Stage.load(self.dvc, stage.relpath) + stage = Dvcfile(self.dvc, stage.relpath).load() self.assertFalse(stage.changed()) @@ -162,7 +157,7 @@ def test_md5_ignores_comments(tmp_dir, dvc): with open(stage.path, "a") as f: f.write("# End comment\n") - new_stage = Stage.load(dvc, stage.path) + new_stage = Dvcfile(dvc, stage.path).load() assert not new_stage.changed_md5() @@ -175,8 +170,9 @@ def test_meta_is_preserved(tmp_dir, dvc): dump_stage_file(stage.path, data) # Loading and dumping to test that it works and meta is retained - new_stage = Stage.load(dvc, stage.path) - new_stage.dump() + dvcfile = Dvcfile(dvc, stage.path) + new_stage = dvcfile.load() + dvcfile.dump(new_stage) new_data = load_stage_file(stage.path) assert new_data["meta"] == data["meta"] diff --git a/tests/func/test_update.py b/tests/func/test_update.py index ca69a04309..a77fe9ea61 100644 --- a/tests/func/test_update.py +++ b/tests/func/test_update.py @@ -1,7 +1,7 @@ import pytest import os -from dvc.stage import Stage +from dvc.dvcfile import Dvcfile from dvc.compat import fspath, fspath_py35 @@ -28,7 +28,7 @@ def test_update_import(tmp_dir, dvc, erepo_dir, cached): dvc.update(stage.path) assert (tmp_dir / "version").read_text() == "updated" - stage = Stage.load(dvc, stage.path) + stage = Dvcfile(dvc, stage.path).load() assert stage.deps[0].def_repo["rev_lock"] == new_rev @@ -71,7 +71,7 @@ def test_update_import_after_remote_updates_to_dvc(tmp_dir, dvc, erepo_dir): assert imported.is_file() assert imported.read_text() == "updated" - stage = Stage.load(dvc, stage.path) + stage = Dvcfile(dvc, stage.path).load() assert stage.deps[0].def_repo == { "url": fspath(erepo_dir), "rev": "branch", diff --git a/tests/unit/test_stage.py b/tests/unit/test_stage.py index 298f3fd51e..3a0c3419e6 100644 --- a/tests/unit/test_stage.py +++ b/tests/unit/test_stage.py @@ -9,8 +9,7 @@ from dvc.dependency.repo import DependencyREPO from dvc.path_info import PathInfo from dvc.stage import Stage -from dvc.stage import StageUpdateError - +from dvc.stage.exceptions import StageUpdateError TEST_STAGE_DICT = { "md5": "123456", From ff12d71a09cc422593d008c1487d70b9226399a8 Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Sun, 12 Apr 2020 12:28:44 +0545 Subject: [PATCH 2/4] Split create_stage out of dvcfile to Stage::create() and overwrite --- dvc/command/run.py | 7 ------- dvc/dvcfile.py | 48 +++++++++---------------------------------- dvc/repo/add.py | 7 ++++++- dvc/repo/imp_url.py | 8 ++++++-- dvc/repo/run.py | 15 ++++++++++---- dvc/stage/__init__.py | 15 ++++++++++++++ 6 files changed, 48 insertions(+), 52 deletions(-) diff --git a/dvc/command/run.py b/dvc/command/run.py index 99c746b5be..ced17d7354 100644 --- a/dvc/command/run.py +++ b/dvc/command/run.py @@ -47,7 +47,6 @@ def run(self): no_exec=self.args.no_exec, overwrite=overwrite, ignore_build_cache=self.args.ignore_build_cache, - remove_outs=self.args.remove_outs, no_commit=self.args.no_commit, outs_persist=self.args.outs_persist, outs_persist_no_cache=self.args.outs_persist_no_cache, @@ -168,12 +167,6 @@ def add_parser(subparsers, parent_parser): help="Run this stage even if it has been already ran with the same " "command/dependencies/outputs/etc before.", ) - run_parser.add_argument( - "--remove-outs", - action="store_true", - default=False, - help="Deprecated, this is now the default behavior", - ) run_parser.add_argument( "--no-commit", action="store_true", diff --git a/dvc/dvcfile.py b/dvc/dvcfile.py index b39deb9259..d760909404 100644 --- a/dvc/dvcfile.py +++ b/dvc/dvcfile.py @@ -160,43 +160,15 @@ def validate(d, fname=None): except MultipleInvalid as exc: raise StageFileFormatError(fname, exc) - @classmethod - def create_stage(cls, repo, accompany_outs=False, **kwargs): - from dvc.stage import Stage - - stage = Stage.create(repo, accompany_outs=accompany_outs, **kwargs) - - ignore_build_cache = kwargs.get("ignore_build_cache", False) - # NOTE: remove outs before we check build cache - if kwargs.get("remove_outs", False): - logger.warning( - "--remove-outs is deprecated." - " It is now the default behavior," - " so there's no need to use this option anymore." - ) - stage.remove_outs(ignore_remove=False) - logger.warning("Build cache is ignored when using --remove-outs.") - ignore_build_cache = True - - dvcfile = Dvcfile(stage.repo, stage.path) - if dvcfile.exists(): - if any(out.persist for out in stage.outs): - logger.warning( - "Build cache is ignored when persisting outputs." - ) - ignore_build_cache = True - - if not ignore_build_cache and stage.can_be_skipped: - logger.info("Stage is cached, skipping.") - return None - - msg = ( - "'{}' already exists. Do you wish to run the command and " - "overwrite it?".format(stage.relpath) - ) - if not (kwargs.get("overwrite", True) or prompt.confirm(msg)): - raise StageFileAlreadyExistsError(stage.relpath) + def overwrite_with_prompt(self, force=False): + if not self.exists(): + return - os.unlink(dvcfile.path) + msg = ( + "'{}' already exists. Do you wish to run the command and " + "overwrite it?".format(self.path) + ) + if not (force or prompt.confirm(msg)): + raise StageFileAlreadyExistsError(self.path) - return stage + os.unlink(self.path) diff --git a/dvc/repo/add.py b/dvc/repo/add.py index 30a582a977..1ee34da02c 100644 --- a/dvc/repo/add.py +++ b/dvc/repo/add.py @@ -115,6 +115,8 @@ def _find_all_targets(repo, target, recursive): def _create_stages(repo, targets, fname, pbar=None): + from dvc.stage import Stage + stages = [] for out in Tqdm( @@ -123,9 +125,12 @@ def _create_stages(repo, targets, fname, pbar=None): disable=len(targets) < LARGE_DIR_SIZE, unit="file", ): - stage = Dvcfile.create_stage( + stage = Stage.create( repo, accompany_outs=True, outs=[out], fname=fname ) + if stage: + Dvcfile(repo, stage.path).overwrite_with_prompt(force=True) + repo._reset() if not stage: diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index fd0deba807..02e52134ae 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -7,9 +7,10 @@ @scm_context def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): from dvc.dvcfile import Dvcfile + from dvc.stage import Stage out = resolve_output(url, out) - stage = Dvcfile.create_stage( + stage = Stage.create( self, cmd=None, deps=[url], @@ -22,12 +23,15 @@ def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): if stage is None: return None + dvcfile = Dvcfile(self, stage.path) + dvcfile.overwrite_with_prompt(force=True) + self.check_modified_graph([stage]) stage.run() stage.locked = locked - Dvcfile(self, stage.path).dump(stage) + dvcfile.dump(stage) return stage diff --git a/dvc/repo/run.py b/dvc/repo/run.py index 74b5af5f10..50ea12f34a 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -1,22 +1,29 @@ +import logging + from . import locked from .scm_context import scm_context +logger = logging.getLogger(__name__) + @locked @scm_context def run(self, no_exec=False, **kwargs): + from dvc.stage import Stage from dvc.dvcfile import Dvcfile - stage = Dvcfile.create_stage(self, **kwargs) - - if stage is None: + stage = Stage.create(self, **kwargs) + if not stage: return None + dvcfile = Dvcfile(self, stage.path) + dvcfile.overwrite_with_prompt(force=kwargs.get("overwrite", True)) + self.check_modified_graph([stage]) if not no_exec: stage.run(no_commit=kwargs.get("no_commit", False)) - Dvcfile(self, stage.path).dump(stage) + dvcfile.dump(stage) return stage diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 81aabfa4b9..18a73e3f21 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -381,6 +381,21 @@ def create(repo, accompany_outs=False, **kwargs): stage._check_and_set_wdir(wdir, is_wdir=kwargs.get("wdir", False)) stage._check_and_set_path(fname) + dvcfile = Dvcfile(stage.repo, stage.path) + if dvcfile.exists(): + has_persist_outs = any(out.persist for out in stage.outs) + ignore_build_cache = ( + kwargs.get("ignore_build_cache", False) or has_persist_outs + ) + if has_persist_outs: + logger.warning( + "Build cache is ignored when persisting outputs." + ) + + if not ignore_build_cache and stage.can_be_skipped: + logger.info("Stage is cached, skipping.") + return None + return stage def _fill_stage_outputs(self, **kwargs): From 15ae2d920a5cf403e938c5fb4db774d1dc3a9aaf Mon Sep 17 00:00:00 2001 From: Saugat Pachhai Date: Sun, 12 Apr 2020 12:39:58 +0545 Subject: [PATCH 3/4] Remove logging from Repo::run --- dvc/repo/run.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dvc/repo/run.py b/dvc/repo/run.py index 50ea12f34a..64075f3a51 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -1,10 +1,6 @@ -import logging - from . import locked from .scm_context import scm_context -logger = logging.getLogger(__name__) - @locked @scm_context @@ -13,7 +9,7 @@ def run(self, no_exec=False, **kwargs): from dvc.dvcfile import Dvcfile stage = Stage.create(self, **kwargs) - if not stage: + if stage is None: return None dvcfile = Dvcfile(self, stage.path) From 1eba747efeff50ae6c505d63465ed28d0c8ad14d Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Sun, 12 Apr 2020 14:46:52 +0300 Subject: [PATCH 4/4] tests: stop using --remove-outs --- tests/unit/command/test_run.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/unit/command/test_run.py b/tests/unit/command/test_run.py index a9b7e60a4f..02fcab48ea 100644 --- a/tests/unit/command/test_run.py +++ b/tests/unit/command/test_run.py @@ -24,7 +24,6 @@ def test_run(mocker, dvc): "--yes", "--overwrite-dvcfile", "--ignore-build-cache", - "--remove-outs", "--no-commit", "--outs-persist", "outs-persist", @@ -59,7 +58,6 @@ def test_run(mocker, dvc): no_exec=True, overwrite=True, ignore_build_cache=True, - remove_outs=True, no_commit=True, always_changed=True, cmd="command", @@ -85,7 +83,6 @@ def test_run_args_from_cli(mocker, dvc): no_exec=False, overwrite=False, ignore_build_cache=False, - remove_outs=False, no_commit=False, always_changed=False, cmd="echo foo", @@ -111,7 +108,6 @@ def test_run_args_with_spaces(mocker, dvc): no_exec=False, overwrite=False, ignore_build_cache=False, - remove_outs=False, no_commit=False, always_changed=False, cmd='echo "foo bar"',