diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 93e22cdc2e..90fc167e16 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -1,4 +1,5 @@ import errno +import hashlib import itertools import json import logging @@ -6,7 +7,7 @@ from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor from copy import copy -from functools import partial +from functools import partial, wraps from multiprocessing import cpu_count from operator import itemgetter @@ -23,6 +24,7 @@ from dvc.ignore import DvcIgnore from dvc.path_info import PathInfo, URLInfo, WindowsPathInfo from dvc.progress import Tqdm +from dvc.remote.index import RemoteIndex, RemoteIndexNoop from dvc.remote.slow_link_detection import slow_link_guard from dvc.state import StateNoop from dvc.utils import tmp_fname @@ -70,11 +72,24 @@ def __init__(self, checksum): ) +def index_locked(f): + @wraps(f) + def wrapper(remote_obj, *args, **kwargs): + remote = kwargs.get("remote") + if remote: + with remote.index: + return f(remote_obj, *args, **kwargs) + return f(remote_obj, *args, **kwargs) + + return wrapper + + class RemoteBASE(object): scheme = "base" path_cls = URLInfo REQUIRES = {} JOBS = 4 * cpu_count() + INDEX_CLS = RemoteIndex PARAM_RELPATH = "relpath" CHECKSUM_DIR_SUFFIX = ".dir" @@ -111,6 +126,15 @@ def __init__(self, repo, config): self.cache_types = config.get("type") or copy(self.DEFAULT_CACHE_TYPES) self.cache_type_confirmed = False + url = config.get("url") + if url: + index_name = hashlib.sha256(url.encode("utf-8")).hexdigest() + self.index = self.INDEX_CLS( + self.repo, index_name, dir_suffix=self.CHECKSUM_DIR_SUFFIX + ) + else: + self.index = RemoteIndexNoop() + @classmethod def get_missing_deps(cls): import importlib @@ -734,6 +758,7 @@ def all(self, jobs=None, name=None): remote_size, remote_checksums, jobs, name ) + @index_locked def gc(self, named_cache, jobs=None): used = self.extract_used_local_checksums(named_cache) @@ -754,6 +779,8 @@ def gc(self, named_cache, jobs=None): self._remove_unpacked_dir(checksum) self.remove(path_info) removed = True + if removed: + self.index.clear() return removed def is_protected(self, path_info): @@ -872,10 +899,18 @@ def cache_exists(self, checksums, jobs=None, name=None): # cache_exists() (see ssh, local) assert self.TRAVERSE_PREFIX_LEN >= 2 - if len(checksums) == 1 or not self.CAN_TRAVERSE: - return self._cache_object_exists(checksums, jobs, name) + checksums = set(checksums) + indexed_checksums = set(self.index.intersection(checksums)) + checksums -= indexed_checksums + logger.debug( + "Matched '{}' indexed checksums".format(len(indexed_checksums)) + ) + if not checksums: + return indexed_checksums - checksums = frozenset(checksums) + if len(checksums) == 1 or not self.CAN_TRAVERSE: + remote_checksums = self._cache_object_exists(checksums, jobs, name) + return list(indexed_checksums) + remote_checksums # Max remote size allowed for us to use traverse method remote_size, remote_checksums = self._estimate_cache_size( @@ -898,19 +933,25 @@ def cache_exists(self, checksums, jobs=None, name=None): len(checksums), traverse_weight ) ) - return list( - checksums & remote_checksums - ) + self._cache_object_exists( - checksums - remote_checksums, jobs, name + return ( + list(indexed_checksums) + + list(checksums & remote_checksums) + + self._cache_object_exists( + checksums - remote_checksums, jobs, name + ) ) logger.debug( - "Querying {} checksums via traverse".format(len(checksums)) + "Querying '{}' checksums via traverse".format(len(checksums)) ) - remote_checksums = self._cache_checksums_traverse( - remote_size, remote_checksums, jobs, name + remote_checksums = set( + self._cache_checksums_traverse( + remote_size, remote_checksums, jobs, name + ) + ) + return list(indexed_checksums) + list( + checksums & set(remote_checksums) ) - return list(checksums & set(remote_checksums)) def _checksums_with_limit( self, limit, prefix=None, progress_callback=None diff --git a/dvc/remote/index.py b/dvc/remote/index.py new file mode 100644 index 0000000000..e56a9f99bd --- /dev/null +++ b/dvc/remote/index.py @@ -0,0 +1,214 @@ +import logging +import os +import sqlite3 +import threading + +from funcy import lchunks + +from dvc.state import _connect_sqlite + +logger = logging.getLogger(__name__) + + +class RemoteIndexNoop: + """No-op class for remotes which are not indexed (i.e. local).""" + + def __init__(self, *args, **kwargs): + pass + + def __enter__(self): + pass + + def __exit__(self, typ, value, tbck): + pass + + def __iter__(self): + return iter([]) + + def __contains__(self, checksum): + return False + + @staticmethod + def checksums(): + return [] + + @staticmethod + def dir_checksums(): + return [] + + def load(self): + pass + + def dump(self): + pass + + def clear(self): + pass + + def update(self, *args): + pass + + @staticmethod + def intersection(*args): + return [] + + +class RemoteIndex: + """Class for indexing remote checksums in a sqlite3 database. + + Args: + repo: repo for this remote index. + name: name for this index. Index db will be loaded from and saved to + ``.dvc/tmp/index/{name}.idx``. + dir_suffix: suffix used for naming directory checksums + """ + + INDEX_SUFFIX = ".idx" + VERSION = 1 + INDEX_TABLE = "remote_index" + INDEX_TABLE_LAYOUT = "checksum TEXT PRIMARY KEY, " "dir INTEGER NOT NULL" + + def __init__(self, repo, name, dir_suffix=".dir"): + self.path = os.path.join( + repo.index_dir, "{}{}".format(name, self.INDEX_SUFFIX) + ) + + self.dir_suffix = dir_suffix + self.database = None + self.cursor = None + self.modified = False + self.lock = threading.Lock() + + def __iter__(self): + cmd = "SELECT checksum FROM {}".format(self.INDEX_TABLE) + for (checksum,) in self._execute(cmd): + yield checksum + + def __enter__(self): + self.lock.acquire() + self.load() + + def __exit__(self, typ, value, tbck): + self.dump() + self.lock.release() + + def __contains__(self, checksum): + cmd = "SELECT checksum FROM {} WHERE checksum = (?)".format( + self.INDEX_TABLE + ) + self._execute(cmd, (checksum,)) + return self.cursor.fetchone() is not None + + def checksums(self): + """Iterate over checksums stored in the index.""" + return iter(self) + + def dir_checksums(self): + """Iterate over .dir checksums stored in the index.""" + cmd = "SELECT checksum FROM {} WHERE dir = 1".format(self.INDEX_TABLE) + for (checksum,) in self._execute(cmd): + yield checksum + + def is_dir_checksum(self, checksum): + return checksum.endswith(self.dir_suffix) + + def _execute(self, cmd, parameters=()): + return self.cursor.execute(cmd, parameters) + + def _executemany(self, cmd, seq_of_parameters): + return self.cursor.executemany(cmd, seq_of_parameters) + + def _prepare_db(self, empty=False): + if not empty: + cmd = "PRAGMA user_version;" + self._execute(cmd) + ret = self.cursor.fetchall() + assert len(ret) == 1 + assert len(ret[0]) == 1 + assert isinstance(ret[0][0], int) + version = ret[0][0] + + if version != self.VERSION: + logger.error( + "Index file version '{}' will be reformatted " + "to the current version '{}'.".format( + version, self.VERSION, + ) + ) + cmd = "DROP TABLE IF EXISTS {};" + self._execute(cmd.format(self.INDEX_TABLE)) + + cmd = "CREATE TABLE IF NOT EXISTS {} ({})" + self._execute(cmd.format(self.INDEX_TABLE, self.INDEX_TABLE_LAYOUT)) + + cmd = "PRAGMA user_version = {};" + self._execute(cmd.format(self.VERSION)) + + def load(self): + """(Re)load this index database.""" + retries = 1 + while True: + assert self.database is None + assert self.cursor is None + + empty = not os.path.isfile(self.path) + self.database = _connect_sqlite(self.path, {"nolock": 1}) + self.cursor = self.database.cursor() + + try: + self._prepare_db(empty=empty) + return + except sqlite3.DatabaseError: + self.cursor.close() + self.database.close() + self.database = None + self.cursor = None + if retries > 0: + os.unlink(self.path) + retries -= 1 + else: + raise + + def dump(self): + """Save this index database.""" + assert self.database is not None + + self.database.commit() + self.cursor.close() + self.database.close() + self.database = None + self.cursor = None + + def clear(self): + """Clear this index (to force re-indexing later). + + Changes to the index will not committed until dump() is called. + """ + cmd = "DELETE FROM {}".format(self.INDEX_TABLE) + self._execute(cmd) + + def update(self, dir_checksums, file_checksums): + """Update this index, adding the specified checksums. + + Changes to the index will not committed until dump() is called. + """ + cmd = "INSERT OR IGNORE INTO {} (checksum, dir) VALUES (?, ?)".format( + self.INDEX_TABLE + ) + self._executemany( + cmd, ((checksum, True) for checksum in dir_checksums) + ) + self._executemany( + cmd, ((checksum, False) for checksum in file_checksums) + ) + + def intersection(self, checksums): + """Iterate over values from `checksums` which exist in the index.""" + # sqlite has a compile time limit of 999, see: + # https://www.sqlite.org/c3ref/c_limit_attached.html#sqlitelimitvariablenumber + for chunk in lchunks(999, checksums): + cmd = "SELECT checksum FROM {} WHERE checksum IN ({})".format( + self.INDEX_TABLE, ",".join("?" for checksum in chunk) + ) + for (checksum,) in self._execute(cmd, chunk): + yield checksum diff --git a/dvc/remote/local.py b/dvc/remote/local.py index 16b846db52..2e247bf7d6 100644 --- a/dvc/remote/local.py +++ b/dvc/remote/local.py @@ -13,8 +13,15 @@ from dvc.exceptions import DvcException, DownloadError, UploadError from dvc.path_info import PathInfo from dvc.progress import Tqdm -from dvc.remote.base import RemoteBASE, STATUS_MAP -from dvc.remote.base import STATUS_DELETED, STATUS_MISSING, STATUS_NEW +from dvc.remote.base import ( + index_locked, + RemoteBASE, + STATUS_MAP, + STATUS_DELETED, + STATUS_MISSING, + STATUS_NEW, +) +from dvc.remote.index import RemoteIndexNoop from dvc.scheme import Schemes from dvc.scm.tree import is_working_tree from dvc.system import System @@ -30,6 +37,7 @@ class RemoteLOCAL(RemoteBASE): PARAM_CHECKSUM = "md5" PARAM_PATH = "path" TRAVERSE_PREFIX_LEN = 2 + INDEX_CLS = RemoteIndexNoop UNPACKED_DIR_SUFFIX = ".unpacked" @@ -249,6 +257,7 @@ def _download( def open(path_info, mode="r", encoding=None): return open(fspath_py35(path_info), mode=mode, encoding=encoding) + @index_locked def status( self, named_cache, @@ -303,29 +312,23 @@ def _status( remote_exists = set() dir_md5s = set(named_cache.dir_keys(self.scheme)) if dir_md5s: - # If .dir checksum exists on the remote, assume directory - # contents also exists on the remote - for dir_checksum in remote._cache_object_exists(dir_md5s): - file_checksums = list( - named_cache.child_keys(self.scheme, dir_checksum) - ) - logger.debug( - "'{}' exists on remote, " - "assuming '{}' files also exist".format( - dir_checksum, len(file_checksums) - ) - ) - md5s.remove(dir_checksum) - remote_exists.add(dir_checksum) - md5s.difference_update(file_checksums) - remote_exists.update(file_checksums) + remote_exists.update( + self._indexed_dir_checksums(named_cache, remote, dir_md5s) + ) + md5s.difference_update(remote_exists) if md5s: remote_exists.update( remote.cache_exists( md5s, jobs=jobs, name=str(remote.path_info) ) ) + return self._make_status( + named_cache, remote, show_checksums, local_exists, remote_exists + ) + def _make_status( + self, named_cache, remote, show_checksums, local_exists, remote_exists + ): def make_names(checksum, names): return {"name": checksum if show_checksums else " ".join(names)} @@ -354,6 +357,43 @@ def make_names(checksum, names): return dir_status, file_status, dir_paths + def _indexed_dir_checksums(self, named_cache, remote, dir_md5s): + # Validate our index by verifying all indexed .dir checksums + # still exist on the remote + indexed_dirs = set(remote.index.dir_checksums()) + indexed_dir_exists = set() + if indexed_dirs: + indexed_dir_exists.update( + remote._cache_object_exists(indexed_dirs) + ) + missing_dirs = indexed_dirs.difference(indexed_dir_exists) + if missing_dirs: + logger.debug( + "Remote cache missing indexed .dir checksums '{}', " + "clearing remote index".format(", ".join(missing_dirs)) + ) + remote.index.clear() + + # Check if non-indexed (new) dir checksums exist on remote + dir_exists = dir_md5s.intersection(indexed_dir_exists) + dir_exists.update(remote._cache_object_exists(dir_md5s - dir_exists)) + + # If .dir checksum exists on the remote, assume directory contents + # still exists on the remote + for dir_checksum in dir_exists: + file_checksums = list( + named_cache.child_keys(self.scheme, dir_checksum) + ) + if dir_checksum not in remote.index: + logger.debug( + "Indexing new .dir '{}' with '{}' nested files".format( + dir_checksum, len(file_checksums) + ) + ) + remote.index.update([dir_checksum], file_checksums) + yield dir_checksum + yield from file_checksums + @staticmethod def _fill_statuses(checksum_info_dir, local_exists, remote_exists): # Using sets because they are way faster for lookups @@ -464,9 +504,26 @@ def _process( if fails: if download: + remote.index.clear() raise DownloadError(fails) raise UploadError(fails) + if not download: + # index successfully pushed dirs + for to_info, future in dir_futures.items(): + if future.result() == 0: + dir_checksum = remote.path_to_checksum(str(to_info)) + file_checksums = list( + named_cache.child_keys(self.scheme, dir_checksum) + ) + logger.debug( + "Indexing pushed dir '{}' with " + "'{}' nested files".format( + dir_checksum, len(file_checksums) + ) + ) + remote.index.update([dir_checksum], file_checksums) + return len(dir_plans[0]) + len(file_plans[0]) @staticmethod @@ -485,6 +542,7 @@ def _dir_upload(func, futures, from_info, to_info, name): return 1 return func(from_info, to_info, name) + @index_locked def push(self, named_cache, remote, jobs=None, show_checksums=False): return self._process( named_cache, @@ -494,6 +552,7 @@ def push(self, named_cache, remote, jobs=None, show_checksums=False): download=False, ) + @index_locked def pull(self, named_cache, remote, jobs=None, show_checksums=False): return self._process( named_cache, diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 66e33d2e7d..c8d6631863 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -85,7 +85,8 @@ def __init__(self, root_dir=None): self.tree = WorkingTree(self.root_dir) self.tmp_dir = os.path.join(self.dvc_dir, "tmp") - makedirs(self.tmp_dir, exist_ok=True) + self.index_dir = os.path.join(self.tmp_dir, "index") + makedirs(self.index_dir, exist_ok=True) hardlink_lock = self.config["core"].get("hardlink_lock", False) self.lock = make_lock( diff --git a/tests/func/remote/test_index.py b/tests/func/remote/test_index.py new file mode 100644 index 0000000000..3158e61f2f --- /dev/null +++ b/tests/func/remote/test_index.py @@ -0,0 +1,103 @@ +import pytest + +from dvc.compat import fspath +from dvc.exceptions import DownloadError, UploadError +from dvc.remote.base import RemoteBASE +from dvc.remote.index import RemoteIndex +from dvc.remote.local import RemoteLOCAL +from dvc.utils.fs import remove + + +@pytest.fixture(scope="function") +def remote(tmp_dir, dvc, tmp_path_factory, mocker): + url = fspath(tmp_path_factory.mktemp("upstream")) + dvc.config["remote"]["upstream"] = {"url": url} + dvc.config["core"]["remote"] = "upstream" + + # patch cache_exists since the RemoteLOCAL normally overrides + # RemoteBASE.cache_exists. + def cache_exists(self, *args, **kwargs): + return RemoteBASE.cache_exists(self, *args, **kwargs) + + mocker.patch.object(RemoteLOCAL, "cache_exists", cache_exists) + + # patch index class since RemoteLOCAL normally overrides index class + mocker.patch.object(RemoteLOCAL, "INDEX_CLS", RemoteIndex) + + return dvc.cloud.get_remote("upstream") + + +def test_indexed_on_status(tmp_dir, dvc, tmp_path_factory, remote): + foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] + bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + baz = bar.dir_cache[0] + dvc.push() + with remote.index: + remote.index.clear() + + dvc.status(cloud=True) + with remote.index: + assert {bar.checksum, baz["md5"]} == set(remote.index.checksums()) + assert [bar.checksum] == list(remote.index.dir_checksums()) + assert foo.checksum not in remote.index.checksums() + + +def test_indexed_on_push(tmp_dir, dvc, tmp_path_factory, remote): + foo = tmp_dir.dvc_gen({"foo": "foo content"})[0].outs[0] + bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + baz = bar.dir_cache[0] + + dvc.push() + with remote.index: + assert {bar.checksum, baz["md5"]} == set(remote.index.checksums()) + assert [bar.checksum] == list(remote.index.dir_checksums()) + assert foo.checksum not in remote.index.checksums() + + +def test_indexed_dir_missing(tmp_dir, dvc, tmp_path_factory, remote): + bar = tmp_dir.dvc_gen({"bar": {"baz": "baz content"}})[0].outs[0] + with remote.index: + remote.index.update([bar.checksum], []) + dvc.status(cloud=True) + with remote.index: + assert not list(remote.index.checksums()) + + +def test_clear_on_gc(tmp_dir, dvc, tmp_path_factory, remote, mocker): + (foo,) = tmp_dir.dvc_gen({"foo": "foo content"}) + dvc.push() + dvc.remove(foo.relpath) + + mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") + dvc.gc(workspace=True, cloud=True) + mocked_clear.assert_called_with() + + +def test_clear_on_download_err(tmp_dir, dvc, tmp_path_factory, remote, mocker): + tmp_dir.dvc_gen({"foo": "foo content"}) + dvc.push() + remove(dvc.cache.local.cache_dir) + + mocked_clear = mocker.patch.object(remote.INDEX_CLS, "clear") + mocker.patch.object(RemoteLOCAL, "_download", side_effect=Exception) + with pytest.raises(DownloadError): + dvc.pull() + mocked_clear.assert_called_once_with() + + +def test_partial_upload(tmp_dir, dvc, tmp_path_factory, remote, mocker): + tmp_dir.dvc_gen({"foo": "foo content"}) + tmp_dir.dvc_gen({"bar": {"baz": "baz content"}}) + + original = RemoteLOCAL._upload + + def unreliable_upload(self, from_file, to_info, name=None, **kwargs): + if "baz" in name: + raise Exception("stop baz") + return original(self, from_file, to_info, name, **kwargs) + + mocker.patch.object(RemoteLOCAL, "_upload", unreliable_upload) + with pytest.raises(UploadError): + dvc.push() + with remote.index: + assert not list(remote.index.checksums()) diff --git a/tests/unit/remote/test_base.py b/tests/unit/remote/test_base.py index e8cafa3b26..7334594627 100644 --- a/tests/unit/remote/test_base.py +++ b/tests/unit/remote/test_base.py @@ -52,7 +52,7 @@ def test_cache_exists(object_exists, traverse, dvc): with mock.patch.object( remote, "cache_checksums", return_value=list(range(256)) ): - checksums = list(range(1000)) + checksums = set(range(1000)) remote.cache_exists(checksums) object_exists.assert_called_with(checksums, None, None) traverse.assert_not_called() diff --git a/tests/unit/remote/test_index.py b/tests/unit/remote/test_index.py new file mode 100644 index 0000000000..2331f0b252 --- /dev/null +++ b/tests/unit/remote/test_index.py @@ -0,0 +1,57 @@ +import os + +import pytest +from funcy import first + +from dvc.remote.index import RemoteIndex + + +@pytest.fixture(scope="function") +def index(dvc): + index = RemoteIndex(dvc, "foo") + index.load() + yield index + index.dump() + os.unlink(index.path) + + +def test_init(dvc, index): + assert str(index.path) == os.path.join(dvc.index_dir, "foo.idx") + + +def test_is_dir_checksum(dvc, index): + assert index.is_dir_checksum("foo.dir") + assert not index.is_dir_checksum("foo") + + +def test_roundtrip(dvc, index): + expected_dir = {"1234.dir"} + expected_file = {"5678"} + index.update(expected_dir, expected_file) + index.dump() + index.load() + assert set(index.dir_checksums()) == expected_dir + assert set(index.checksums()) == expected_dir | expected_file + + +def test_clear(dvc, index): + index.update( + ["1234.dir"], ["5678"], + ) + index.clear() + assert first(index.checksums()) is None + + +def test_update(dvc, index): + expected_dir = {"1234.dir"} + expected_file = {"5678"} + index.update(expected_dir, expected_file) + assert set(index.dir_checksums()) == expected_dir + assert set(index.checksums()) == expected_dir | expected_file + + +def test_intersection(dvc, index): + checksums = (str(i) for i in range(2000)) + expected = {str(i) for i in range(1000)} + index.update([], checksums) + assert set(index.intersection(expected)) == expected