Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 63 additions & 48 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from multiprocessing import cpu_count
from operator import itemgetter

from shortuuid import uuid

import dvc.prompt as prompt
from dvc.config import Config
from dvc.exceptions import ConfirmRemoveError
Expand Down Expand Up @@ -425,10 +427,16 @@ def _save_file(self, path_info, checksum, save_link=True):
cache_info = self.checksum_to_path_info(checksum)
if self.changed_cache(checksum):
self.move(path_info, cache_info)
self.link(cache_info, path_info)
elif self.iscopy(path_info) and self._cache_is_copy(path_info):
# Default relink procedure involves unneeded copy
if self.protected:
self.protect(path_info)
else:
self.unprotect(path_info)
else:
self.remove(path_info)

self.link(cache_info, path_info)
self.link(cache_info, path_info)

if save_link:
self.state.save_link(path_info)
Expand All @@ -439,6 +447,28 @@ def _save_file(self, path_info, checksum, save_link=True):
self.state.save(path_info, checksum)
self.state.save(cache_info, checksum)

def _cache_is_copy(self, path_info):
"""Checks whether cache uses copies."""
if self.cache_type_confirmed:
return self.cache_types[0] == "copy"

if set(self.cache_types) <= {"copy"}:
return True

workspace_file = path_info.with_name("." + uuid())
test_cache_file = self.path_info / ".cache_type_test_file"
if not self.exists(test_cache_file):
with self.open(test_cache_file, "wb") as fobj:
fobj.write(bytes(1))
try:
self.link(test_cache_file, workspace_file)
finally:
self.remove(workspace_file)
self.remove(test_cache_file)

self.cache_type_confirmed = True
return self.cache_types[0] == "copy"

def _save_dir(self, path_info, checksum):
cache_info = self.checksum_to_path_info(checksum)
dir_info = self.get_dir_cache(checksum)
Expand Down Expand Up @@ -467,6 +497,10 @@ def isdir(self, path_info):
"""
return False

def iscopy(self, path_info):
"""Check if this file is an independent copy."""
return False # We can't be sure by default

def walk_files(self, path_info):
"""Return a generator with `PathInfo`s to all the files"""
raise NotImplementedError
Expand All @@ -483,10 +517,6 @@ def save(self, path_info, checksum_info):
)

checksum = checksum_info[self.PARAM_CHECKSUM]
if not self.changed_cache(checksum):
self._checkout(path_info, checksum)
return

self._save(path_info, checksum)

def _save(self, path_info, checksum):
Expand Down Expand Up @@ -758,45 +788,21 @@ def safe_remove(self, path_info, force=False):
self.remove(path_info)

def _checkout_file(
self,
path_info,
checksum,
force,
progress_callback=None,
save_link=True,
self, path_info, checksum, force, progress_callback=None
):
# NOTE: In case if path_info is already cached and path_info's
# link type matches cache link type, we would like to avoid
# relinking.
if self.changed(
path_info, {self.PARAM_CHECKSUM: checksum}
) or not self._link_matches(path_info):
"""The file is changed we need to checkout a new copy"""
cache_info = self.checksum_to_path_info(checksum)
if self.exists(path_info):
msg = "data '{}' exists. Removing before checkout."
logger.warning(msg.format(str(path_info)))
self.safe_remove(path_info, force=force)

cache_info = self.checksum_to_path_info(checksum)
self.link(cache_info, path_info)

if save_link:
self.state.save_link(path_info)

self.state.save(path_info, checksum)
else:
# NOTE: performing (un)protection costs us +/- the same as checking
# if path_info is protected. Instead of implementing logic,
# just (un)protect according to self.protected.
if self.protected:
self.protect(path_info)
else:
# NOTE dont allow copy, because we checked before that link
# type matches cache, and we don't want data duplication
self.unprotect(path_info, allow_copy=False)

self.link(cache_info, path_info)
self.state.save_link(path_info)
self.state.save(path_info, checksum)
if progress_callback:
progress_callback(str(path_info))

def _link_matches(self, path_info):
return True

def makedirs(self, path_info):
"""Optional: Implement only if the remote needs to create
directories before copying/linking/moving data
Expand All @@ -818,14 +824,17 @@ def _checkout_dir(
for entry in dir_info:
relative_path = entry[self.PARAM_RELPATH]
entry_checksum = entry[self.PARAM_CHECKSUM]
entry_cache_info = self.checksum_to_path_info(entry_checksum)
entry_info = path_info / relative_path
self._checkout_file(
entry_info,
entry_checksum,
force,
progress_callback,
save_link=False,
)

entry_checksum_info = {self.PARAM_CHECKSUM: entry_checksum}
if self.changed(entry_info, entry_checksum_info):
if self.exists(entry_info):
self.safe_remove(entry_info, force=force)
self.link(entry_cache_info, entry_info)
self.state.save(entry_info, entry_checksum)
if progress_callback:
progress_callback(str(entry_info))

self._remove_redundant_files(path_info, dir_info, force)

Expand All @@ -850,6 +859,7 @@ def checkout(

checksum = checksum_info.get(self.PARAM_CHECKSUM)
failed = None
skip = False
if not checksum:
logger.warning(
"No checksum info found for '{}'. "
Expand All @@ -858,13 +868,18 @@ def checkout(
self.safe_remove(path_info, force=force)
failed = path_info

elif not self.changed(path_info, checksum_info):
msg = "Data '{}' didn't change."
logger.debug(msg.format(str(path_info)))
skip = True

elif self.changed_cache(checksum):
msg = "Cache '{}' not found. File '{}' won't be created."
logger.warning(msg.format(checksum, str(path_info)))
self.safe_remove(path_info, force=force)
failed = path_info

if failed:
if failed or skip:
if progress_callback:
progress_callback(
str(path_info), self.get_files_number(checksum)
Expand Down Expand Up @@ -898,7 +913,7 @@ def get_files_number(self, checksum):
return 1

@staticmethod
def unprotect(path_info, allow_copy=True):
def unprotect(path_info):
pass

def _get_unpacked_dir_names(self, checksums):
Expand Down
75 changes: 22 additions & 53 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ def isfile(path_info):
def isdir(path_info):
return os.path.isdir(fspath_py35(path_info))

def iscopy(self, path_info):
return not (
System.is_symlink(path_info) or System.is_hardlink(path_info)
)

@staticmethod
def getsize(path_info):
return os.path.getsize(fspath_py35(path_info))
Expand Down Expand Up @@ -394,22 +399,19 @@ def _log_missing_caches(checksum_info_dict):
logger.warning(msg)

@staticmethod
def _unprotect_file(path, allow_copy=True):
def _unprotect_file(path):
if System.is_symlink(path) or System.is_hardlink(path):
if allow_copy:
logger.debug("Unprotecting '{}'".format(path))
tmp = os.path.join(os.path.dirname(path), "." + str(uuid()))

# The operations order is important here - if some application
# would access the file during the process of copyfile then it
# would get only the part of file. So, at first, the file
# should be copied with the temporary name, and then
# original file should be replaced by new.
copyfile(
path, tmp, name="Unprotecting '{}'".format(relpath(path))
)
remove(path)
os.rename(tmp, path)
logger.debug("Unprotecting '{}'".format(path))
tmp = os.path.join(os.path.dirname(path), "." + str(uuid()))

# The operations order is important here - if some application
# would access the file during the process of copyfile then it
# would get only the part of file. So, at first, the file should be
# copied with the temporary name, and then original file should be
# replaced by new.
copyfile(path, tmp, name="Unprotecting '{}'".format(relpath(path)))
remove(path)
os.rename(tmp, path)

else:
logger.debug(
Expand All @@ -419,21 +421,21 @@ def _unprotect_file(path, allow_copy=True):

os.chmod(path, os.stat(path).st_mode | stat.S_IWRITE)

def _unprotect_dir(self, path, allow_copy=True):
def _unprotect_dir(self, path):
for fname in walk_files(path, self.repo.dvcignore):
self._unprotect_file(fname, allow_copy)
RemoteLOCAL._unprotect_file(fname)

def unprotect(self, path_info, allow_copy=True):
def unprotect(self, path_info):
path = path_info.fspath
if not os.path.exists(path):
raise DvcException(
"can't unprotect non-existing data '{}'".format(path)
)

if os.path.isdir(path):
self._unprotect_dir(path, allow_copy)
self._unprotect_dir(path)
else:
self._unprotect_file(path, allow_copy)
RemoteLOCAL._unprotect_file(path)

@staticmethod
def protect(path_info):
Expand Down Expand Up @@ -507,36 +509,3 @@ def _get_unpacked_dir_names(self, checksums):
if self.is_dir_checksum(c):
unpacked.add(c + self.UNPACKED_DIR_SUFFIX)
return unpacked

def _get_cache_type(self, path_info):
if self.cache_type_confirmed:
return self.cache_types[0]

workspace_file = path_info.with_name("." + uuid())
test_cache_file = self.path_info / ".cache_type_test_file"
if not self.exists(test_cache_file):
with open(fspath_py35(test_cache_file), "wb") as fobj:
fobj.write(bytes(1))
try:
self.link(test_cache_file, workspace_file)
finally:
self.remove(workspace_file)
self.remove(test_cache_file)

self.cache_type_confirmed = True
return self.cache_types[0]

def _link_matches(self, path_info):
is_hardlink = System.is_hardlink(path_info)
is_symlink = System.is_symlink(path_info)
is_copy_or_reflink = not is_hardlink and not is_symlink

cache_type = self._get_cache_type(path_info)

if cache_type == "symlink":
return is_symlink

if cache_type == "hardlink":
return is_hardlink

return is_copy_or_reflink
22 changes: 0 additions & 22 deletions tests/func/test_add.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,28 +598,6 @@ def test_should_relink_on_repeated_add(
assert link_test_func(repo_dir.FOO)


@pytest.mark.parametrize(
"link, link_func",
[("hardlink", System.hardlink), ("symlink", System.symlink)],
)
def test_should_relink_single_file_in_dir(link, link_func, dvc_repo, repo_dir):
dvc_repo.cache.local.cache_types = [link]

dvc_repo.add(repo_dir.DATA_DIR)

# NOTE status triggers unpacked dir creation for hardlink case
dvc_repo.status()

dvc_repo.unprotect(repo_dir.DATA_SUB)

link_spy = spy(link_func)

with patch.object(dvc_repo.cache.local, link, link_spy):
dvc_repo.add(repo_dir.DATA_DIR)

assert link_spy.mock.call_count == 1


@pytest.mark.parametrize("link", ["hardlink", "symlink", "copy"])
def test_should_protect_on_repeated_add(link, dvc_repo, repo_dir):
dvc_repo.cache.local.cache_types = [link]
Expand Down
4 changes: 4 additions & 0 deletions tests/func/test_checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ def test_checkout_no_checksum(repo_dir, dvc_repo):
assert not os.path.exists(repo_dir.FOO)


@pytest.mark.skip
@pytest.mark.parametrize(
"link, link_test_func",
[("hardlink", System.is_hardlink), ("symlink", System.is_symlink)],
Expand All @@ -497,6 +498,7 @@ def test_should_relink_on_checkout(link, link_test_func, repo_dir, dvc_repo):
assert link_test_func(repo_dir.DATA_SUB)


@pytest.mark.skip
@pytest.mark.parametrize("link", ["hardlink", "symlink", "copy"])
def test_should_protect_on_checkout(link, dvc_repo, repo_dir):
dvc_repo.cache.local.cache_types = [link]
Expand All @@ -510,6 +512,7 @@ def test_should_protect_on_checkout(link, dvc_repo, repo_dir):
assert not os.access(repo_dir.FOO, os.W_OK)


@pytest.mark.skip
def test_should_relink_only_one_file_in_dir(dvc_repo, repo_dir):
dvc_repo.cache.local.cache_types = ["symlink"]

Expand All @@ -523,6 +526,7 @@ def test_should_relink_only_one_file_in_dir(dvc_repo, repo_dir):
assert link_spy.mock.call_count == 1


@pytest.mark.skip
@pytest.mark.parametrize("link", ["hardlink", "symlink", "copy"])
def test_should_not_relink_on_unchanged_dependency(link, dvc_repo, repo_dir):
dvc_repo.cache.local.cache_types = [link]
Expand Down
Loading