Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions dvc/command/imp_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ class CmdImportUrl(CmdBase):
def run(self):
try:
self.repo.imp_url(
self.args.url,
out=self.args.out,
resume=self.args.resume,
fname=self.args.file,
self.args.url, out=self.args.out, fname=self.args.file
)
except DvcException:
logger.exception(
Expand Down Expand Up @@ -54,12 +51,6 @@ def add_parser(subparsers, parent_parser):
"ssh://example.com:/path/to/file\n"
"remote://myremote/path/to/file (see `dvc remote`)",
)
import_parser.add_argument(
"--resume",
action="store_true",
default=False,
help="Resume previously started download.",
)
import_parser.add_argument(
"out", nargs="?", help="Destination path to put files to."
)
Expand Down
4 changes: 2 additions & 2 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ def verify_metric(self):
"verify metric is not supported for {}".format(self.scheme)
)

def download(self, to, resume=False):
self.remote.download([self.path_info], [to.path_info], resume=resume)
def download(self, to):
self.remote.download(self.path_info, to.path_info)

def checkout(self, force=False, progress_callback=None, tag=None):
if not self.use_cache:
Expand Down
161 changes: 57 additions & 104 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import logging
import tempfile
import itertools
from contextlib import contextmanager
from operator import itemgetter
from multiprocessing import cpu_count
from concurrent.futures import as_completed, ThreadPoolExecutor
Expand Down Expand Up @@ -211,7 +210,7 @@ def _get_dir_info_checksum(self, dir_info):

from_info = PathInfo(tmp)
to_info = self.cache.path_info / tmp_fname("")
self.cache.upload([from_info], [to_info], no_progress_bar=True)
self.cache.upload(from_info, to_info, no_progress_bar=True)

checksum = self.get_file_checksum(to_info) + self.CHECKSUM_DIR_SUFFIX
return checksum, to_info
Expand All @@ -233,7 +232,7 @@ def load_dir_cache(self, checksum):
fobj = tempfile.NamedTemporaryFile(delete=False)
path = fobj.name
to_info = PathInfo(path)
self.cache.download([path_info], [to_info], no_progress_bar=True)
self.cache.download(path_info, to_info, no_progress_bar=True)

try:
with open(path, "r") as fobj:
Expand Down Expand Up @@ -417,113 +416,81 @@ def _save(self, path_info, checksum):
return
self._save_file(path_info, checksum)

@contextmanager
def transfer_context(self):
yield None

def upload(self, from_infos, to_infos, names=None, no_progress_bar=False):
def upload(self, from_info, to_info, name=None, no_progress_bar=False):
if not hasattr(self, "_upload"):
raise RemoteActionNotImplemented("upload", self.scheme)
names = self._verify_path_args(to_infos, from_infos, names)
fails = 0

with self.transfer_context() as ctx:
for from_info, to_info, name in zip(from_infos, to_infos, names):
if to_info.scheme != self.scheme:
raise NotImplementedError
if to_info.scheme != self.scheme:
raise NotImplementedError

if from_info.scheme != "local":
raise NotImplementedError
if from_info.scheme != "local":
raise NotImplementedError

msg = "Uploading '{}' to '{}'"
logger.debug(msg.format(from_info, to_info))
logger.debug("Uploading '{}' to '{}'".format(from_info, to_info))

if not name:
name = from_info.name
name = name or from_info.name

if not no_progress_bar:
progress.update_target(name, 0, None)
if not no_progress_bar:
progress.update_target(name, 0, None)

try:
self._upload(
from_info.fspath,
to_info,
name=name,
ctx=ctx,
no_progress_bar=no_progress_bar,
)
except Exception:
fails += 1
msg = "failed to upload '{}' to '{}'"
logger.exception(msg.format(from_info, to_info))
continue

if not no_progress_bar:
progress.finish_target(name)

return fails

def download(
self,
from_infos,
to_infos,
names=None,
no_progress_bar=False,
resume=False,
):
try:
self._upload(
from_info.fspath,
to_info,
name=name,
no_progress_bar=no_progress_bar,
)
except Exception:
msg = "failed to upload '{}' to '{}'"
logger.exception(msg.format(from_info, to_info))
return 1 # 1 fail

if not no_progress_bar:
progress.finish_target(name)

return 0

def download(self, from_info, to_info, name=None, no_progress_bar=False):
if not hasattr(self, "_download"):
raise RemoteActionNotImplemented("download", self.scheme)

names = self._verify_path_args(from_infos, to_infos, names)
fails = 0

with self.transfer_context() as ctx:
for to_info, from_info, name in zip(to_infos, from_infos, names):
if from_info.scheme != self.scheme:
raise NotImplementedError
if from_info.scheme != self.scheme:
raise NotImplementedError

if to_info.scheme == self.scheme != "local":
self.copy(from_info, to_info, ctx=ctx)
continue
if to_info.scheme == self.scheme != "local":
self.copy(from_info, to_info)
return 0

if to_info.scheme != "local":
raise NotImplementedError
if to_info.scheme != "local":
raise NotImplementedError

msg = "Downloading '{}' to '{}'".format(from_info, to_info)
logger.debug(msg)
logger.debug("Downloading '{}' to '{}'".format(from_info, to_info))

tmp_file = tmp_fname(to_info)
if not name:
name = to_info.name
name = name or to_info.name

if not no_progress_bar:
# real progress is not always available,
# lets at least show start and finish
progress.update_target(name, 0, None)
if not no_progress_bar:
# real progress is not always available,
# lets at least show start and finish
progress.update_target(name, 0, None)

makedirs(fspath_py35(to_info.parent), exist_ok=True)
makedirs(fspath_py35(to_info.parent), exist_ok=True)
tmp_file = tmp_fname(to_info)

try:
self._download(
from_info,
tmp_file,
name=name,
ctx=ctx,
resume=resume,
no_progress_bar=no_progress_bar,
)
except Exception:
fails += 1
msg = "failed to download '{}' to '{}'"
logger.exception(msg.format(from_info, to_info))
continue
try:
self._download(
from_info, tmp_file, name=name, no_progress_bar=no_progress_bar
)
except Exception:
msg = "failed to download '{}' to '{}'"
logger.exception(msg.format(from_info, to_info))
return 1 # 1 fail

move(tmp_file, fspath_py35(to_info))
move(tmp_file, fspath_py35(to_info))

if not no_progress_bar:
progress.finish_target(name)
if not no_progress_bar:
progress.finish_target(name)

return fails
return 0

def remove(self, path_info):
raise RemoteActionNotImplemented("remove", self.scheme)
Expand All @@ -532,26 +499,12 @@ def move(self, from_info, to_info):
self.copy(from_info, to_info)
self.remove(from_info)

def copy(self, from_info, to_info, ctx=None):
def copy(self, from_info, to_info):
raise RemoteActionNotImplemented("copy", self.scheme)

def exists(self, path_info):
raise NotImplementedError

@classmethod
def _verify_path_args(cls, from_infos, to_infos, names=None):
assert isinstance(from_infos, list)
assert isinstance(to_infos, list)
assert len(from_infos) == len(to_infos)

if not names:
names = len(to_infos) * [None]
else:
assert isinstance(names, list)
assert len(names) == len(to_infos)

return names

def path_to_checksum(self, path):
return "".join(self.path_cls(path).parts[-2:])

Expand Down
35 changes: 12 additions & 23 deletions dvc/remote/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
import itertools
from contextlib import contextmanager
from funcy import cached_property

try:
from google.cloud import storage
Expand Down Expand Up @@ -42,7 +42,7 @@ def compat_config(config):
ret[Config.SECTION_REMOTE_URL] = url
return ret

@property
@cached_property
def gs(self):
return (
storage.Client.from_service_account_json(self.credentialpath)
Expand All @@ -64,16 +64,14 @@ def get_file_checksum(self, path_info):
md5 = base64.b64decode(b64_md5)
return codecs.getencoder("hex")(md5)[0].decode("utf-8")

def copy(self, from_info, to_info, ctx=None):
gs = ctx or self.gs

from_bucket = gs.bucket(from_info.bucket)
def copy(self, from_info, to_info):
from_bucket = self.gs.bucket(from_info.bucket)
blob = from_bucket.get_blob(from_info.path)
if not blob:
msg = "'{}' doesn't exist in the cloud".format(from_info.path)
raise DvcException(msg)

to_bucket = gs.bucket(to_info.bucket)
to_bucket = self.gs.bucket(to_info.bucket)
from_bucket.copy_blob(blob, to_bucket, new_name=to_info.path)

def remove(self, path_info):
Expand All @@ -87,10 +85,8 @@ def remove(self, path_info):

blob.delete()

def _list_paths(self, bucket, prefix, gs=None):
gs = gs or self.gs

for blob in gs.bucket(bucket).list_blobs(prefix=prefix):
def _list_paths(self, bucket, prefix):
for blob in self.gs.bucket(bucket).list_blobs(prefix=prefix):
yield blob.name

def list_cache_paths(self):
Expand All @@ -102,28 +98,21 @@ def exists(self, path_info):

def batch_exists(self, path_infos, callback):
paths = []
gs = self.gs

for path_info in path_infos:
paths.append(
self._list_paths(path_info.bucket, path_info.path, gs)
)
paths.append(self._list_paths(path_info.bucket, path_info.path))
callback.update(str(path_info))

paths = set(itertools.chain.from_iterable(paths))

return [path_info.path in paths for path_info in path_infos]

@contextmanager
def transfer_context(self):
yield self.gs

def _upload(self, from_file, to_info, ctx=None, **_kwargs):
bucket = ctx.bucket(to_info.bucket)
def _upload(self, from_file, to_info, **_kwargs):
bucket = self.gs.bucket(to_info.bucket)
blob = bucket.blob(to_info.path)
blob.upload_from_filename(from_file)

def _download(self, from_info, to_file, ctx=None, **_kwargs):
bucket = ctx.bucket(from_info.bucket)
def _download(self, from_info, to_file, **_kwargs):
bucket = self.gs.bucket(from_info.bucket)
blob = bucket.get_blob(from_info.path)
blob.download_to_filename(to_file)
Loading