Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 2 additions & 29 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,33 +249,14 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
if not (
tree.isdvc(path_info, strict=False) and tree.fetch
):
self.tree.copy_fobj(fobj, cache_info)
self.tree.upload_fobj(fobj, cache_info)
callback = kwargs.get("download_callback")
if callback:
callback(1)

self.tree.state.save(cache_info, hash_info)

def _transfer_file_as_whole(self, from_tree, from_info):
from dvc.utils import tmp_fname

# When we can't use the chunked upload, we have to first download
# and then calculate the hash as if it were a local file and then
# upload it.
local_tree = self.repo.cache.local.tree
local_info = local_tree.path_info / tmp_fname()

from_tree.download(from_info, local_info)
hash_info = local_tree.get_file_hash(local_info)

self.tree.upload(
local_info,
self.tree.hash_to_path_info(hash_info.value),
name=from_info.name,
)
return hash_info

def _transfer_file_as_chunked(self, from_tree, from_info):
def _transfer_file(self, from_tree, from_info):
from dvc.utils import tmp_fname
from dvc.utils.stream import HashedStreamReader

Expand All @@ -298,14 +279,6 @@ def _transfer_file_as_chunked(self, from_tree, from_info):
self.move(tmp_info, self.tree.hash_to_path_info(hash_info.value))
return hash_info

def _transfer_file(self, from_tree, from_info):
try:
hash_info = self._transfer_file_as_chunked(from_tree, from_info)
except RemoteActionNotImplemented:
hash_info = self._transfer_file_as_whole(from_tree, from_info)

return hash_info

def _transfer_directory_contents(self, from_tree, from_info, jobs, pbar):
rel_path_infos = {}
from_infos = from_tree.walk_files(from_info)
Expand Down
21 changes: 13 additions & 8 deletions dvc/tree/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,24 @@ def getsize(self, path_info):
def get_file_hash(self, path_info):
return HashInfo(self.PARAM_CHECKSUM, self.get_etag(path_info))

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):

def _upload_fobj(self, fobj, to_info):
blob_client = self.blob_service.get_blob_client(
to_info.bucket, to_info.path
)
blob_client.upload_blob(fobj, overwrite=True)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
total = os.path.getsize(from_file)
with open(from_file, "rb") as fobj:
with Tqdm.wrapattr(
fobj, "read", desc=name, total=total, disable=no_progress_bar
) as wrapped:
blob_client.upload_blob(wrapped, overwrite=True)
self.upload_fobj(
fobj,
to_info,
desc=name,
total=total,
no_progress_bar=no_progress_bar,
)

def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
Expand Down
15 changes: 10 additions & 5 deletions dvc/tree/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class BaseTree:
TRAVERSE_PREFIX_LEN = 3
TRAVERSE_THRESHOLD_SIZE = 500000
CAN_TRAVERSE = True

# Needed for some providers, and http open()
CHUNK_SIZE = 64 * 1024 * 1024 # 64 MiB
Comment on lines +62 to 64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it is now only used in http, so I guess no need to have it in the base class.

Suggested change
# Needed for some providers, and http open()
CHUNK_SIZE = 64 * 1024 * 1024 # 64 MiB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the open() call of _transfer_file()


PARAM_CHECKSUM: ClassVar[Optional[str]] = None
Expand Down Expand Up @@ -226,9 +228,6 @@ def move(self, from_info, to_info):
def copy(self, from_info, to_info):
raise RemoteActionNotImplemented("copy", self.scheme)

def copy_fobj(self, fobj, to_info, chunk_size=None):
raise RemoteActionNotImplemented("copy_fobj", self.scheme)

def symlink(self, from_info, to_info):
raise RemoteActionNotImplemented("symlink", self.scheme)

Expand Down Expand Up @@ -364,8 +363,14 @@ def upload(
no_progress_bar=no_progress_bar,
)

def upload_fobj(self, fobj, to_info, no_progress_bar=False):
raise RemoteActionNotImplemented("upload_fobj", self.scheme)
def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args):
if not hasattr(self, "_upload_fobj"):
raise RemoteActionNotImplemented("upload_fobj", self.scheme)

with Tqdm.wrapattr(
fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args
) as wrapped:
self._upload_fobj(wrapped, to_info) # pylint: disable=no-member

def download(
self,
Expand Down
44 changes: 17 additions & 27 deletions dvc/tree/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,29 +345,12 @@ def _gdrive_shared_drive_id(self, item_id):
return item.get("driveId", None)

@_gdrive_retry
def _gdrive_upload_file(
self,
parent_id,
title,
no_progress_bar=False,
from_file="",
progress_name="",
):
def _gdrive_upload_fobj(self, fobj, parent_id, title):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, information is confusing, probably they forgot to update the comment, or it should be interpreted in some other way. I see for example, that rclone has 8MB - https://forum.rclone.org/t/google-drive-and-optimal-drive-chunk-size/1186/11 ... and people mention 256M when they deal with very large files. I think it would be a good option to have, and we might indeed consider making it smaller by default to reduce memory pressure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use 64 MiB on all other providers, though I think there is no way (as of now) to pass the chunk_size into MediaIoBaseUpload. Maybe we should consider adding it as a feature to pydrive2?

item = self._drive.CreateFile(
{"title": title, "parents": [{"id": parent_id}]}
)

with open(from_file, "rb") as fobj:
total = os.path.getsize(from_file)
with Tqdm.wrapattr(
fobj,
"read",
desc=progress_name,
total=total,
disable=no_progress_bar,
) as wrapped:
item.content = wrapped
item.Upload()
item.content = fobj
item.Upload()
return item

@_gdrive_retry
Expand Down Expand Up @@ -570,16 +553,23 @@ def getsize(self, path_info):
gdrive_file.FetchMetadata(fields="fileSize")
return gdrive_file.get("fileSize")

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
def _upload_fobj(self, fobj, to_info):
dirname = to_info.parent
assert dirname
parent_id = self._get_item_id(dirname, True)
parent_id = self._get_item_id(dirname, create=True)
self._gdrive_upload_fobj(fobj, parent_id, to_info.name)

self._gdrive_upload_file(
parent_id, to_info.name, no_progress_bar, from_file, name
)
def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
with open(from_file, "rb") as fobj:
self.upload_fobj(
fobj,
to_info,
no_progress_bar=no_progress_bar,
desc=name or to_info.name,
total=os.path.getsize(from_file),
)

def _download(self, from_info, to_file, name=None, no_progress_bar=False):
item_id = self._get_item_id(from_info)
Expand Down
43 changes: 18 additions & 25 deletions dvc/tree/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,9 @@ def wrapper(*args, **kwargs):


@dynamic_chunk_size
def _upload_to_bucket(
bucket,
from_file,
to_info,
chunk_size=None,
name=None,
no_progress_bar=False,
):
def _upload_to_bucket(bucket, fobj, to_info, chunk_size=None):
blob = bucket.blob(to_info.path, chunk_size=chunk_size)
with open(from_file, mode="rb") as fobj:
with Tqdm.wrapattr(
fobj,
"read",
desc=name or to_info.path,
total=os.path.getsize(from_file),
disable=no_progress_bar,
) as wrapped:
blob.upload_from_file(wrapped)
blob.upload_from_file(fobj)


class GSTree(BaseTree):
Expand Down Expand Up @@ -205,17 +190,25 @@ def get_file_hash(self, path_info):
size=blob.size,
)

def _upload_fobj(self, fobj, to_info):
bucket = self.gs.bucket(to_info.bucket)
# With other references being given in the @dynamic_chunk_size
# this function does not respect tree.CHUNK_SIZE, since it is
# too big for GS to handle. Rather it dynamically calculates the
# best possible chunk size
_upload_to_bucket(bucket, fobj, to_info)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
bucket = self.gs.bucket(to_info.bucket)
_upload_to_bucket(
bucket,
from_file,
to_info,
name=name,
no_progress_bar=no_progress_bar,
)
with open(from_file, mode="rb") as fobj:
self.upload_fobj(
fobj,
to_info,
desc=name or to_info.path,
total=os.path.getsize(from_file),
no_progress_bar=no_progress_bar,
)

def _download(self, from_info, to_file, name=None, no_progress_bar=False):
bucket = self.gs.bucket(from_info.bucket)
Expand Down
5 changes: 5 additions & 0 deletions dvc/tree/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ def get_file_hash(self, path_info):
size=self.getsize(path_info),
)

def _upload_fobj(self, fobj, to_info):
with self.hdfs(to_info) as hdfs:
with hdfs.open_output_stream(to_info.path) as fdest:
shutil.copyfileobj(fobj, fdest)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
Expand Down
44 changes: 23 additions & 21 deletions dvc/tree/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def _auth_method(self, path_info=None):
self.headers.update({self.custom_auth_header: self.password})
return None

def _generate_download_url(self, path_info):
return path_info.url

@wrap_prop(threading.Lock())
@cached_property
def _session(self):
Expand Down Expand Up @@ -169,6 +172,18 @@ def get_file_hash(self, path_info):

return HashInfo(self.PARAM_CHECKSUM, etag)

def _upload_fobj(self, fobj, to_info):
def chunks(fobj):
while True:
chunk = fobj.read(self.CHUNK_SIZE)
if not chunk:
break
yield chunk

response = self.request(self.method, to_info.url, data=chunks(fobj))
if response.status_code not in (200, 201):
raise HTTPError(response.status_code, response.reason)

def _download(self, from_info, to_file, name=None, no_progress_bar=False):
response = self.request("GET", from_info.url, stream=True)
if response.status_code != 200:
Expand All @@ -190,27 +205,14 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False):
def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
def chunks():
with open(from_file, "rb") as fd:
with Tqdm.wrapattr(
fd,
"read",
total=None
if no_progress_bar
else os.path.getsize(from_file),
leave=False,
desc=to_info.url if name is None else name,
disable=no_progress_bar,
) as fd_wrapped:
while True:
chunk = fd_wrapped.read(self.CHUNK_SIZE)
if not chunk:
break
yield chunk

response = self.request(self.method, to_info.url, data=chunks())
if response.status_code not in (200, 201):
raise HTTPError(response.status_code, response.reason)
with open(from_file, "rb") as fobj:
self.upload_fobj(
fobj,
to_info,
no_progress_bar=no_progress_bar,
desc=name or to_info.url,
total=None if no_progress_bar else os.path.getsize(from_file),
)

@staticmethod
def _content_length(response):
Expand Down
11 changes: 2 additions & 9 deletions dvc/tree/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ def copy(self, from_info, to_info):
self.remove(tmp_info)
raise

def copy_fobj(self, fobj, to_info, chunk_size=None):
def _upload_fobj(self, fobj, to_info):
self.makedirs(to_info.parent)
tmp_info = to_info.parent / tmp_fname("")
try:
copy_fobj_to_file(fobj, tmp_info, chunk_size=chunk_size)
copy_fobj_to_file(fobj, tmp_info)
os.rename(tmp_info, to_info)
except Exception:
self.remove(tmp_info)
Expand Down Expand Up @@ -253,13 +253,6 @@ def _upload(
)
os.replace(tmp_file, to_info)

def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args):
from dvc.progress import Tqdm

with Tqdm(bytes=True, disable=no_progress_bar, **pbar_args) as pbar:
with pbar.wrapattr(fobj, "read") as fobj:
self.copy_fobj(fobj, to_info, chunk_size=self.CHUNK_SIZE)

@staticmethod
def _download(
from_info, to_file, name=None, no_progress_bar=False, **_kwargs
Expand Down
3 changes: 3 additions & 0 deletions dvc/tree/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def remove(self, path_info):
logger.debug(f"Removing oss://{path_info}")
self.oss_service.delete_object(path_info.path)

def _upload_fobj(self, fobj, to_info):
self.oss_service.put_object(to_info.path, fobj)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
Expand Down
25 changes: 4 additions & 21 deletions dvc/tree/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ def get_file_hash(self, path_info):
size=obj.content_length,
)

def _upload_fobj(self, fobj, to_info):
with self._get_obj(to_info) as obj:
obj.upload_fileobj(fobj)

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
):
Expand All @@ -365,27 +369,6 @@ def _upload(
from_file, Callback=pbar.update, ExtraArgs=self.extra_args,
)

def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args):
from boto3.s3.transfer import TransferConfig

config = TransferConfig(
multipart_threshold=self.CHUNK_SIZE,
multipart_chunksize=self.CHUNK_SIZE,
max_concurrency=1,
use_threads=False,
)
with self._get_s3() as s3:
with Tqdm(
disable=no_progress_bar, bytes=True, **pbar_args
) as pbar:
s3.meta.client.upload_fileobj(
fobj,
to_info.bucket,
to_info.path,
Config=config,
Callback=pbar.update,
)

def _download(self, from_info, to_file, name=None, no_progress_bar=False):
with self._get_obj(from_info) as obj:
with Tqdm(
Expand Down
Loading