Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dvc run --deterministic option. #1400

Merged
merged 21 commits into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion dvc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,28 @@ def parse_args(argv=None):
'--yes',
action='store_true',
default=False,
help="Automatic 'yes' answer to all prompts. E.g. "
help="(OBSOLETED, use --overwrite-dvcfile instead) "
"Automatic 'yes' answer to all prompts. E.g. "
"when '.dvc' file exists and dvc asks if you "
"want to overwrite it.")
run_parser.add_argument(
'--overwrite-dvcfile',
action='store_true',
default=False,
help="Overwrite existing dvc file without asking "
"for confirmation.")
run_parser.add_argument(
'--ignore-build-cache',
action='store_true',
default=False,
help="Run this stage even if it has been already "
"ran with the same command/dependencies/outputs/etc "
"before.")
run_parser.add_argument(
'--remove-outs',
action='store_true',
default=False,
help="Remove outputs before running the command.")
run_parser.add_argument(
'command',
nargs=argparse.REMAINDER,
Expand Down
9 changes: 5 additions & 4 deletions dvc/command/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ def _joined_cmd(self):
return cmd

def run(self):
overwrite = (self.args.yes or self.args.overwrite_dvcfile)
try:
if self.args.yes:
self.project.prompt.default = True

self.project.run(cmd=self._joined_cmd(),
outs=self.args.outs,
outs_no_cache=self.args.outs_no_cache,
metrics_no_cache=self.args.metrics_no_cache,
deps=self.args.deps,
fname=self.args.file,
cwd=self.args.cwd,
no_exec=self.args.no_exec)
no_exec=self.args.no_exec,
overwrite=overwrite,
ignore_build_cache=self.args.ignore_build_cache,
remove_outs=self.args.remove_outs)
except DvcException as ex:
self.project.logger.error('Failed to run command', ex)
return 1
Expand Down
43 changes: 28 additions & 15 deletions dvc/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,21 @@ def move(self, from_path, to_path):
def _unprotect_file(self, path):
import stat
import uuid
from dvc.system import System
from dvc.utils import copyfile, move, remove

self.logger.debug("Unprotecting '{}'".format(path))
if System.is_symlink(path) or System.is_hardlink(path):
self.logger.debug("Unprotecting '{}'".format(path))

tmp = os.path.join(os.path.dirname(path), '.' + str(uuid.uuid4()))
move(path, tmp)
tmp = os.path.join(os.path.dirname(path), '.' + str(uuid.uuid4()))
move(path, tmp)

copyfile(tmp, path)
copyfile(tmp, path)

remove(tmp)
remove(tmp)
else:
self.logger.debug("Skipping copying for '{}', since it is not "
"a symlink or a hardlink.".format(path))

os.chmod(path, os.stat(path).st_mode | stat.S_IWRITE)

Expand Down Expand Up @@ -363,18 +368,26 @@ def run(self,
fname=None,
cwd=os.curdir,
no_exec=False,
overwrite=False):
overwrite=False,
ignore_build_cache=False,
remove_outs=False):
from dvc.stage import Stage

efiop marked this conversation as resolved.
Show resolved Hide resolved
stage = Stage.loads(project=self,
fname=fname,
cmd=cmd,
cwd=cwd,
outs=outs,
outs_no_cache=outs_no_cache,
metrics_no_cache=metrics_no_cache,
deps=deps,
overwrite=overwrite)
with self.state:
stage = Stage.loads(project=self,
fname=fname,
cmd=cmd,
cwd=cwd,
outs=outs,
outs_no_cache=outs_no_cache,
metrics_no_cache=metrics_no_cache,
deps=deps,
overwrite=overwrite,
ignore_build_cache=ignore_build_cache,
remove_outs=remove_outs)

if stage is None:
return None

all_stages = self.stages()

Expand Down
120 changes: 89 additions & 31 deletions dvc/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ def __init__(self, fname):
super(StageFileDoesNotExistError, self).__init__(msg)


class StageFileAlreadyExistsError(DvcException):
def __init__(self, relpath):
msg = "Stage '{}' already exists".format(relpath)
super(StageFileAlreadyExistsError, self).__init__(msg)


class StageFileIsNotDvcFileError(DvcException):
def __init__(self, fname):
msg = "'{}' is not a dvc file".format(fname)
Expand Down Expand Up @@ -149,7 +155,10 @@ def is_import(self):
len(self.deps) == 1 and \
len(self.outs) == 1

def _changed_deps(self, print_info, log):
def _changed_deps(self, log):
if self.locked:
return False

if self.is_callback:
msg = "Dvc file '{}' is a 'callback' stage (has a command and " \
"no dependencies) and thus always considered as changed."
Expand All @@ -159,37 +168,35 @@ def _changed_deps(self, print_info, log):
for dep in self.deps:
if not dep.changed():
continue
if print_info:
msg = "Dependency '{}' of '{}' changed."
log(msg.format(dep, self.relpath))
log("Dependency '{}' of '{}' changed.".format(dep, self.relpath))
return True

return False

def changed(self, print_info=False):
ret = False
def _changed_outs(self, log):
for out in self.outs:
if not out.changed():
continue
log("Output '{}' of '{}' changed.".format(out, self.relpath))
return True

return False

def _changed_md5(self, log):
if self.changed_md5():
log("Dvc file '{}' changed.".format(self.relpath))
return True
return False

def changed(self, print_info=False):
if print_info:
log = self.project.logger.info
else:
log = self.project.logger.debug

if not self.locked:
ret = self._changed_deps(print_info, log)

for out in self.outs:
if not out.changed():
continue
if print_info:
msg = "Output '{}' of '{}' changed."
log(msg.format(out, self.relpath))
ret = True

if self.changed_md5():
if print_info:
msg = "Dvc file '{}' changed."
log(msg.format(self.relpath))
ret = True
ret = any([self._changed_deps(log),
self._changed_outs(log),
self._changed_md5(log)])

if ret:
msg = "Stage '{}' changed.".format(self.relpath)
Expand All @@ -206,6 +213,12 @@ def remove_outs(self, ignore_remove=False):
for out in self.outs:
out.remove(ignore_remove=ignore_remove)

def unprotect_outs(self):
for out in self.outs:
if out.path_info['scheme'] != 'local' or not out.exists:
continue
self.project.unprotect(out.path)

def remove(self):
self.remove_outs(ignore_remove=True)
os.unlink(self.path)
Expand Down Expand Up @@ -290,6 +303,37 @@ def _check_inside_project(project, cwd):
if not os.path.realpath(cwd).startswith(proj_dir):
raise StageBadCwdError(cwd)

def is_cached(self):
"""
Checks if this stage has been already ran and saved to the same
dvc file.
"""
from dvc.remote.local import RemoteLOCAL
from dvc.remote.s3 import RemoteS3

old = Stage.load(self.project, self.path)
if old._changed_outs(log=self.project.logger.debug):
return False

# NOTE: need to save checksums for deps in order to compare them
# with what is written in the old stage.
for dep in self.deps:
dep.save()

old_d = old.dumpd()
new_d = self.dumpd()

# NOTE: need to remove checksums from old dict in order to compare
# it to the new one, since the new one doesn't have checksums yet.
old_d.pop(self.PARAM_MD5, None)
new_d.pop(self.PARAM_MD5, None)
outs = old_d.get(self.PARAM_OUTS, [])
for out in outs:
out.pop(RemoteLOCAL.PARAM_MD5, None)
out.pop(RemoteS3.PARAM_ETAG, None)

return old_d == new_d

@staticmethod
def loads(project=None,
cmd=None,
Expand All @@ -301,7 +345,10 @@ def loads(project=None,
cwd=os.curdir,
locked=False,
add=False,
overwrite=True):
overwrite=True,
ignore_build_cache=False,
remove_outs=False):

stage = Stage(project=project,
cwd=cwd,
cmd=cmd,
Expand All @@ -325,17 +372,28 @@ def loads(project=None,
cwd = os.path.abspath(cwd)
path = os.path.join(cwd, fname)

if os.path.exists(path):
relpath = os.path.relpath(path)
msg = "'{}' already exists. " \
"Do you wish to run the command and overwrite it?"
if not overwrite \
and not project.prompt.prompt(msg.format(relpath), False):
raise DvcException("'{}' already exists".format(relpath))

stage.cwd = cwd
stage.path = path

# NOTE: remove outs before we check build cache
if remove_outs:
stage.remove_outs(ignore_remove=False)
project.logger.warn("Build cache is ignored when using "
"--remove-outs.")
ignore_build_cache = True
else:
stage.unprotect_outs()

if os.path.exists(path):
if not ignore_build_cache and stage.is_cached():
Logger.info('Stage is cached, skipping.')
return None

msg = "'{}' already exists. Do you wish to run the command and " \
"overwrite it?".format(stage.relpath)
if not overwrite and not project.prompt.prompt(msg, False):
raise StageFileAlreadyExistsError(stage.relpath)

return stage

@staticmethod
Expand Down
26 changes: 25 additions & 1 deletion dvc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ def getdirinfo(path):
from ctypes import c_void_p, c_wchar_p, Structure, WinError, POINTER
from ctypes.wintypes import DWORD, HANDLE, BOOL

# NOTE: use this flag to open symlink itself and not the target
# See https://docs.microsoft.com/en-us/windows/desktop/api/
# fileapi/nf-fileapi-createfilew#symbolic-link-behavior
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000

FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_SHARE_READ = 0x00000001
OPEN_EXISTING = 3
Expand All @@ -147,7 +152,7 @@ class BY_HANDLE_FILE_INFORMATION(Structure):
("nFileIndexHigh", DWORD),
("nFileIndexLow", DWORD)]

flags = FILE_FLAG_BACKUP_SEMANTICS
flags = FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT

func = ctypes.windll.kernel32.CreateFileW
func.argtypes = [c_wchar_p,
Expand Down Expand Up @@ -240,3 +245,22 @@ def wait_for_input(timeout):
return System._wait_for_input_posix(timeout)
else:
return System._wait_for_input_windows(timeout)

@staticmethod
def is_symlink(path):
if System.is_unix():
return os.path.islink(path)

# https://docs.microsoft.com/en-us/windows/desktop/fileio/
# file-attribute-constants
FILE_ATTRIBUTE_REPARSE_POINT = 0x400
info = System.getdirinfo(path)
return info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT

@staticmethod
def is_hardlink(path):
if System.is_unix():
return os.stat(path).st_nlink > 1

info = System.getdirinfo(path)
return info.nNumberOfLinks > 1
Loading