Skip to content

Commit

Permalink
index: fetch: check for changed files
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Jan 25, 2024
1 parent c5a89dd commit f398036
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 31 deletions.
46 changes: 42 additions & 4 deletions src/dvc_data/index/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from dvc_objects.fs.callbacks import DEFAULT_CALLBACK, TqdmCallback

from dvc_data.hashfile.db import get_index
from dvc_data.hashfile.meta import Meta
from dvc_data.hashfile.transfer import transfer

from .build import build
from .checkout import apply, compare
from .collect import collect # noqa: F401
from .index import ObjectStorage
from .index import DataIndex, ObjectStorage
from .save import md5, save

if TYPE_CHECKING:
Expand Down Expand Up @@ -45,6 +46,42 @@ def _onerror(data, cache, failed_keys, src_path, dest_path, exc):
)


def _filter_changed(index):
ret = DataIndex()
ret.storage_map = index.storage_map

for _, entry in index.items():
if entry.meta and entry.meta.isdir:
ret.add(entry)
continue

if not entry.meta or entry.meta.version_id:
ret.add(entry)
continue

try:
data_fs, data_path = index.storage_map.get_data(entry)
except ValueError:
continue

try:
info = data_fs.info(data_path)
except FileNotFoundError:
continue

if getattr(data_fs, "immutable", None):
ret.add(entry)
continue

meta = Meta.from_info(info)
old = getattr(entry.meta, data_fs.PARAM_CHECKSUM, None) if entry.meta else None
new = getattr(meta, data_fs.PARAM_CHECKSUM, None)
if old and new and old == new:
ret.add(entry)

return ret


def fetch(
idxs,
callback: "Callback" = DEFAULT_CALLBACK,
Expand Down Expand Up @@ -94,7 +131,7 @@ def fetch(
fetched += len(result.transferred)
failed += len(result.failed)
elif isinstance(cache, ObjectStorage):
md5(fs_index, check_meta=False)
updated = md5(fs_index)

def _on_error(failed, oid, exc):
if isinstance(exc, FileNotFoundError):
Expand All @@ -107,14 +144,15 @@ def _on_error(failed, oid, exc):
)

fetched += save(
fs_index,
updated,
jobs=jobs,
callback=cb,
on_error=partial(_on_error, failed),
)
else:
old = build(cache.path, cache.fs)
diff = compare(old, fs_index)
filtered = _filter_changed(fs_index)
diff = compare(old, filtered)
cache.fs.makedirs(cache.fs.parent(cache.path), exist_ok=True)

failed_keys: Set["DataIndexKey"] = set()
Expand Down
67 changes: 43 additions & 24 deletions src/dvc_data/index/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,75 @@
from dvc_data.hashfile.db import HashFileDB
from dvc_data.hashfile.state import StateBase

from .index import BaseDataIndex, DataIndexKey
from .index import BaseDataIndex, DataIndex, DataIndexKey


def _meta_matches(fs, path, old_meta):
try:
info = fs.info(path)
except FileNotFoundError:
return False

if getattr(fs, "immutable", False):
return True

new_meta = Meta.from_info(info, fs.protocol)
old = getattr(old_meta, fs.PARAM_CHECKSUM, None) if old_meta else None
new = getattr(new_meta, fs.PARAM_CHECKSUM, None)
if not old or not new:
return None

return old == new


def md5(
index: "BaseDataIndex",
state: Optional["StateBase"] = None,
storage: str = "data",
name: str = DEFAULT_ALGORITHM,
check_meta: bool = True,
) -> None:
from .index import DataIndexEntry
) -> "DataIndex":
from .index import DataIndex, DataIndexEntry

entries = {}
ret = DataIndex()

for key, entry in index.iteritems():
for _, entry in index.iteritems():
if entry.meta and entry.meta.isdir:
ret.add(entry)
continue

hash_info = None
if entry.hash_info and entry.hash_info.name in ("md5", "md5-dos2unix"):
continue
hash_info = entry.hash_info

try:
fs, path = index.storage_map.get_storage(entry, storage)
except ValueError:
continue

info = None
if check_meta:
try:
info = fs.info(path)
except FileNotFoundError:
continue

meta = Meta.from_info(info, fs.protocol)
if entry.meta != meta:
continue
matches = _meta_matches(fs, path, entry.meta)
if matches:
ret.add(entry)
elif matches is not None:
continue

try:
_, hash_info = hash_file(path, fs, name, state=state, info=info)
_, hi = hash_file(path, fs, name, state=state)
except FileNotFoundError:
continue

entries[key] = DataIndexEntry(
key=entry.key,
meta=entry.meta,
hash_info=hash_info,
if hash_info and hi != hash_info:
continue

ret.add(
DataIndexEntry(
key=entry.key,
meta=entry.meta,
hash_info=hi,
)
)

for key, entry in entries.items():
index[key] = entry
ret.storage_map = index.storage_map
return ret


def build_tree(
Expand Down
6 changes: 3 additions & 3 deletions tests/index/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_md5(tmp_upath, odb, as_filesystem):

fs = as_filesystem(tmp_upath.fs)
index = build(str(tmp_upath), fs)
md5(index)
index = md5(index)
assert index[("foo",)].hash_info == HashInfo(
"md5",
"d3b07384d113edec49eaa6238ad5ff00",
Expand All @@ -56,7 +56,7 @@ def test_save(tmp_upath, odb, as_filesystem):

fs = as_filesystem(tmp_upath.fs)
index = build(str(tmp_upath), fs)
md5(index)
index = md5(index)
save(index, odb=odb)
assert odb.exists("d3b07384d113edec49eaa6238ad5ff00")
assert odb.exists("1f69c66028c35037e8bf67e5bc4ceb6a.dir")
Expand Down Expand Up @@ -397,7 +397,7 @@ def test_update(tmp_upath, odb, as_filesystem):

fs = as_filesystem(tmp_upath.fs)
old = build(str(tmp_upath), fs)
md5(old)
old = md5(old)

index = build(str(tmp_upath), fs)
update(index, old)
Expand Down

0 comments on commit f398036

Please sign in to comment.