Skip to content

Commit

Permalink
import-url/update: add --no-download flag
Browse files Browse the repository at this point in the history
add --no-download flag to dvc import-url/dvc update to only create/update .dvc
files without downloading the associated data.

Created .dvc files can be fetched using `dvc pull` or can be
updated using `dvc update --no-download`.
  • Loading branch information
dtrifiro committed Aug 8, 2022
1 parent 7bcc0fa commit 3e35588
Show file tree
Hide file tree
Showing 14 changed files with 263 additions and 27 deletions.
13 changes: 11 additions & 2 deletions dvc/commands/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def run(self):
out=self.args.out,
fname=self.args.file,
no_exec=self.args.no_exec,
no_download=self.args.no_download,
remote=self.args.remote,
to_remote=self.args.to_remote,
desc=self.args.desc,
Expand Down Expand Up @@ -66,11 +67,19 @@ def add_parser(subparsers, parent_parser):
help="Specify name of the .dvc file this command will generate.",
metavar="<filename>",
).complete = completion.DIR
import_parser.add_argument(
no_download_exec_group = import_parser.add_mutually_exclusive_group()
no_download_exec_group.add_argument(
"--no-exec",
action="store_true",
default=False,
help="Only create .dvc file without actually downloading it.",
help="Only create .dvc file without actually importing target data.",
)
no_download_exec_group.add_argument(
"--no-download",
action="store_true",
default=False,
help="Create .dvc file with remote checksums without actually "
"downloading the associated data.",
)
import_parser.add_argument(
"--to-remote",
Expand Down
7 changes: 7 additions & 0 deletions dvc/commands/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def run(self):
rev=self.args.rev,
recursive=self.args.recursive,
to_remote=self.args.to_remote,
no_download=self.args.no_download,
remote=self.args.remote,
jobs=self.args.jobs,
)
Expand Down Expand Up @@ -55,6 +56,12 @@ def add_parser(subparsers, parent_parser):
default=False,
help="Update all stages in the specified directory.",
)
update_parser.add_argument(
"--no-download",
action="store_true",
default=False,
help="Update .dvc file checksums without downloading the latest data.",
)
update_parser.add_argument(
"--to-remote",
action="store_true",
Expand Down
41 changes: 40 additions & 1 deletion dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import defaultdict
from contextlib import contextmanager
from functools import wraps
from typing import TYPE_CHECKING, Callable, Optional
from typing import TYPE_CHECKING, Callable, List, Optional

from funcy import cached_property

Expand All @@ -18,6 +18,7 @@
from dvc.fs import FileSystem
from dvc.repo.scm_context import SCMContext
from dvc.scm import Base
from dvc.stage import Stage

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -444,6 +445,44 @@ def used_objs(

return used

def partial_imports(
self,
targets=None,
all_branches=False,
all_tags=False,
all_commits=False,
all_experiments=False,
commit_date: Optional[str] = None,
recursive=False,
revs=None,
num=1,
) -> List["Stage"]:
"""Get the stages related to the given target and collect dependencies
which are missing outputs.
This is useful to retrieve files which have been imported to the repo
using --no-download.
Returns:
A list of partially imported stages
"""
from itertools import chain

partial_imports = chain.from_iterable(
self.index.partial_imports(targets, recursive=recursive)
for _ in self.brancher(
revs=revs,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
all_experiments=all_experiments,
commit_date=commit_date,
num=num,
)
)

return list(partial_imports)

@property
def stages(self): # obsolete, only for backward-compatibility
return self.index.stages
Expand Down
32 changes: 32 additions & 0 deletions dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ def fetch(
downloaded += d
failed += f

d, f = _fetch_partial_imports(
self,
targets,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
recursive=recursive,
revs=revs,
)
downloaded += d
failed += f

if failed:
raise DownloadError(failed)

Expand All @@ -107,3 +119,23 @@ def _fetch(repo, obj_ids, **kwargs):
except FileTransferError as exc:
failed += exc.amount
return downloaded, failed


def _fetch_partial_imports(repo, targets, **kwargs):
from dvc.stage.exceptions import DataSourceChanged

downloaded = 0
failed = 0
for stage in repo.partial_imports(targets, **kwargs):
stage.frozen = False
try:
stage.run(check_changed=True)
except DataSourceChanged as exc:
logger.warning(f"{exc}")
failed += 1
continue
stage.frozen = True
stage.dump()

downloaded += 1
return downloaded, failed
7 changes: 4 additions & 3 deletions dvc/repo/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def imp_url(
fname=None,
erepo=None,
frozen=True,
no_download=False,
no_exec=False,
remote=None,
to_remote=False,
Expand All @@ -31,9 +32,9 @@ def imp_url(
self, out, always_local=to_remote and not out
)

if to_remote and no_exec:
if to_remote and (no_exec or no_download):
raise InvalidArgumentError(
"--no-exec can't be combined with --to-remote"
"--no-exec/--no-download cannot be combined with --to-remote"
)

if not to_remote and remote:
Expand Down Expand Up @@ -80,7 +81,7 @@ def imp_url(
stage.save_deps()
stage.md5 = stage.compute_md5()
else:
stage.run(jobs=jobs)
stage.run(jobs=jobs, no_download=no_download)

stage.frozen = frozen

Expand Down
22 changes: 22 additions & 0 deletions dvc/repo/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,28 @@ def used_objs(
used[odb].update(objs)
return used

def partial_imports(
self,
targets: "TargetType" = None,
recursive: bool = False,
) -> List["Stage"]:
from itertools import chain

from dvc.utils.collections import ensure_list

collect_targets: Sequence[Optional[str]] = (None,)
if targets:
collect_targets = ensure_list(targets)

pairs = chain.from_iterable(
self.stage_collector.collect_granular(
target, recursive=recursive, with_deps=True
)
for target in collect_targets
)

return [stage for stage, _ in pairs if stage.is_partial_import]

# Following methods help us treat the collection as a set-like structure
# and provides faux-immutability.
# These methods do not preserve stages order.
Expand Down
14 changes: 13 additions & 1 deletion dvc/repo/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def update(
rev=None,
recursive=False,
to_remote=False,
no_download=False,
remote=None,
jobs=None,
):
Expand All @@ -21,6 +22,11 @@ def update(
if isinstance(targets, str):
targets = [targets]

if to_remote and no_download:
raise InvalidArgumentError(
"--to-remote can't be used with --no-download"
)

if not to_remote and remote:
raise InvalidArgumentError(
"--remote can't be used without --to-remote"
Expand All @@ -31,7 +37,13 @@ def update(
stages.update(self.stage.collect(target, recursive=recursive))

for stage in stages:
stage.update(rev, to_remote=to_remote, remote=remote, jobs=jobs)
stage.update(
rev,
to_remote=to_remote,
remote=remote,
no_download=no_download,
jobs=jobs,
)
dvcfile = Dvcfile(self, stage.path)
dvcfile.dump(stage)

Expand Down
55 changes: 45 additions & 10 deletions dvc/stage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def create_stage(cls, repo, path, external=False, **kwargs):

wdir = os.path.abspath(kwargs.get("wdir", None) or os.curdir)
path = os.path.abspath(path)

check_dvcfile_path(repo, path)
check_stage_path(repo, wdir, is_wdir=kwargs.get("wdir"))
check_stage_path(repo, os.path.dirname(path))
Expand Down Expand Up @@ -242,6 +243,14 @@ def is_import(self):
"""Whether the DVC file was created with `dvc import`."""
return not self.cmd and len(self.deps) == 1 and len(self.outs) == 1

@property
def is_partial_import(self) -> bool:
"""
Whether the DVC file was created using `dvc import --no-download`
or `dvc import-url --no-download`.
"""
return self.is_import and (self.frozen and not self.outs[0].hash_info)

@property
def is_repo_import(self):
if not self.is_import:
Expand Down Expand Up @@ -433,11 +442,23 @@ def reproduce(self, interactive=False, **kwargs):

return self

def update(self, rev=None, to_remote=False, remote=None, jobs=None):
def update(
self,
rev=None,
to_remote=False,
remote=None,
no_download=None,
jobs=None,
):
if not (self.is_repo_import or self.is_import):
raise StageUpdateError(self.relpath)
update_import(
self, rev=rev, to_remote=to_remote, remote=remote, jobs=jobs
self,
rev=rev,
to_remote=to_remote,
remote=remote,
no_download=no_download,
jobs=jobs,
)

def reload(self):
Expand All @@ -455,10 +476,12 @@ def compute_md5(self):
logger.debug("Computed %s md5: '%s'", self, m)
return m

def save(self, allow_missing=False):
def save(self, allow_missing=False, no_download=False):
self.save_deps(allow_missing=allow_missing)
self.save_outs(allow_missing=allow_missing)
self.md5 = self.compute_md5()

self.save_outs(allow_missing=allow_missing or no_download)
if not no_download:
self.md5 = self.compute_md5()

self.repo.stage_cache.save(self)

Expand Down Expand Up @@ -525,14 +548,17 @@ def run(
no_commit=False,
force=False,
allow_missing=False,
no_download=False,
**kwargs,
):
if (self.cmd or self.is_import) and not self.frozen and not dry:
self.remove_outs(ignore_remove=False, force=False)

if not self.frozen and self.is_import:
if (not self.frozen and self.is_import) or self.is_partial_import:
jobs = kwargs.get("jobs", None)
self._sync_import(dry, force, jobs)
self._sync_import(
dry, force, jobs, no_download, check_changed=self.frozen
)
elif not self.frozen and self.cmd:
self._run_stage(dry, force, **kwargs)
else:
Expand All @@ -546,7 +572,9 @@ def run(
if not dry:
if kwargs.get("checkpoint_func", None):
allow_missing = True
self.save(allow_missing=allow_missing)
if no_download:
allow_missing = True
self.save(allow_missing=allow_missing, no_download=no_download)
if not no_commit:
self.commit(allow_missing=allow_missing)

Expand All @@ -555,8 +583,10 @@ def _run_stage(self, dry, force, **kwargs):
return run_stage(self, dry, force, **kwargs)

@rwlocked(read=["deps"], write=["outs"])
def _sync_import(self, dry, force, jobs):
sync_import(self, dry, force, jobs)
def _sync_import(
self, dry, force, jobs, no_download, check_changed: bool = False
):
sync_import(self, dry, force, jobs, no_download, check_changed)

@rwlocked(read=["outs"])
def _check_missing_outputs(self):
Expand All @@ -570,6 +600,8 @@ def _func(o):

@rwlocked(write=["outs"])
def checkout(self, allow_missing=False, **kwargs):
if self.is_partial_import:
return {}
stats = defaultdict(list)
for out in self.filter_outs(kwargs.get("filter_info")):
key, outs = self._checkout(
Expand Down Expand Up @@ -658,6 +690,9 @@ def get_used_objs(
self, *args, **kwargs
) -> Dict[Optional["ObjectDB"], Set["HashInfo"]]:
"""Return set of object IDs used by this stage."""
if self.is_partial_import:
return {}

used_objs = defaultdict(set)
for out in self.filter_outs(kwargs.get("filter_info")):
for odb, objs in out.get_used_objs(*args, **kwargs).items():
Expand Down
5 changes: 5 additions & 0 deletions dvc/stage/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ def __init__(self, missing_files):
super().__init__(msg)


class DataSourceChanged(DvcException):
def __init__(self, path: str):
super().__init__(f"data source changed: {path}")


class StageNotFound(DvcException, KeyError):
def __init__(self, file, name):
self.file = file.relpath
Expand Down

0 comments on commit 3e35588

Please sign in to comment.