diff --git a/dvc/cache/base.py b/dvc/cache/base.py index 2fcbb3858f..ad4ebc8f98 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -249,33 +249,14 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs): if not ( tree.isdvc(path_info, strict=False) and tree.fetch ): - self.tree.copy_fobj(fobj, cache_info) + self.tree.upload_fobj(fobj, cache_info) callback = kwargs.get("download_callback") if callback: callback(1) self.tree.state.save(cache_info, hash_info) - def _transfer_file_as_whole(self, from_tree, from_info): - from dvc.utils import tmp_fname - - # When we can't use the chunked upload, we have to first download - # and then calculate the hash as if it were a local file and then - # upload it. - local_tree = self.repo.cache.local.tree - local_info = local_tree.path_info / tmp_fname() - - from_tree.download(from_info, local_info) - hash_info = local_tree.get_file_hash(local_info) - - self.tree.upload( - local_info, - self.tree.hash_to_path_info(hash_info.value), - name=from_info.name, - ) - return hash_info - - def _transfer_file_as_chunked(self, from_tree, from_info): + def _transfer_file(self, from_tree, from_info): from dvc.utils import tmp_fname from dvc.utils.stream import HashedStreamReader @@ -298,14 +279,6 @@ def _transfer_file_as_chunked(self, from_tree, from_info): self.move(tmp_info, self.tree.hash_to_path_info(hash_info.value)) return hash_info - def _transfer_file(self, from_tree, from_info): - try: - hash_info = self._transfer_file_as_chunked(from_tree, from_info) - except RemoteActionNotImplemented: - hash_info = self._transfer_file_as_whole(from_tree, from_info) - - return hash_info - def _transfer_directory_contents(self, from_tree, from_info, jobs, pbar): rel_path_infos = {} from_infos = from_tree.walk_files(from_info) diff --git a/dvc/tree/azure.py b/dvc/tree/azure.py index 57307cecce..b80ad74c35 100644 --- a/dvc/tree/azure.py +++ b/dvc/tree/azure.py @@ -170,19 +170,24 @@ def getsize(self, path_info): def get_file_hash(self, path_info): return HashInfo(self.PARAM_CHECKSUM, self.get_etag(path_info)) - def _upload( - self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs - ): - + def _upload_fobj(self, fobj, to_info): blob_client = self.blob_service.get_blob_client( to_info.bucket, to_info.path ) + blob_client.upload_blob(fobj, overwrite=True) + + def _upload( + self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs + ): total = os.path.getsize(from_file) with open(from_file, "rb") as fobj: - with Tqdm.wrapattr( - fobj, "read", desc=name, total=total, disable=no_progress_bar - ) as wrapped: - blob_client.upload_blob(wrapped, overwrite=True) + self.upload_fobj( + fobj, + to_info, + desc=name, + total=total, + no_progress_bar=no_progress_bar, + ) def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/base.py b/dvc/tree/base.py index a95e320711..b2861117b6 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -59,6 +59,8 @@ class BaseTree: TRAVERSE_PREFIX_LEN = 3 TRAVERSE_THRESHOLD_SIZE = 500000 CAN_TRAVERSE = True + + # Needed for some providers, and http open() CHUNK_SIZE = 64 * 1024 * 1024 # 64 MiB PARAM_CHECKSUM: ClassVar[Optional[str]] = None @@ -226,9 +228,6 @@ def move(self, from_info, to_info): def copy(self, from_info, to_info): raise RemoteActionNotImplemented("copy", self.scheme) - def copy_fobj(self, fobj, to_info, chunk_size=None): - raise RemoteActionNotImplemented("copy_fobj", self.scheme) - def symlink(self, from_info, to_info): raise RemoteActionNotImplemented("symlink", self.scheme) @@ -364,8 +363,14 @@ def upload( no_progress_bar=no_progress_bar, ) - def upload_fobj(self, fobj, to_info, no_progress_bar=False): - raise RemoteActionNotImplemented("upload_fobj", self.scheme) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + if not hasattr(self, "_upload_fobj"): + raise RemoteActionNotImplemented("upload_fobj", self.scheme) + + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args + ) as wrapped: + self._upload_fobj(wrapped, to_info) # pylint: disable=no-member def download( self, diff --git a/dvc/tree/gdrive.py b/dvc/tree/gdrive.py index 2007f8d156..05c71b67ae 100644 --- a/dvc/tree/gdrive.py +++ b/dvc/tree/gdrive.py @@ -345,29 +345,12 @@ def _gdrive_shared_drive_id(self, item_id): return item.get("driveId", None) @_gdrive_retry - def _gdrive_upload_file( - self, - parent_id, - title, - no_progress_bar=False, - from_file="", - progress_name="", - ): + def _gdrive_upload_fobj(self, fobj, parent_id, title): item = self._drive.CreateFile( {"title": title, "parents": [{"id": parent_id}]} ) - - with open(from_file, "rb") as fobj: - total = os.path.getsize(from_file) - with Tqdm.wrapattr( - fobj, - "read", - desc=progress_name, - total=total, - disable=no_progress_bar, - ) as wrapped: - item.content = wrapped - item.Upload() + item.content = fobj + item.Upload() return item @_gdrive_retry @@ -570,16 +553,23 @@ def getsize(self, path_info): gdrive_file.FetchMetadata(fields="fileSize") return gdrive_file.get("fileSize") - def _upload( - self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs - ): + def _upload_fobj(self, fobj, to_info): dirname = to_info.parent assert dirname - parent_id = self._get_item_id(dirname, True) + parent_id = self._get_item_id(dirname, create=True) + self._gdrive_upload_fobj(fobj, parent_id, to_info.name) - self._gdrive_upload_file( - parent_id, to_info.name, no_progress_bar, from_file, name - ) + def _upload( + self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs + ): + with open(from_file, "rb") as fobj: + self.upload_fobj( + fobj, + to_info, + no_progress_bar=no_progress_bar, + desc=name or to_info.name, + total=os.path.getsize(from_file), + ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): item_id = self._get_item_id(from_info) diff --git a/dvc/tree/gs.py b/dvc/tree/gs.py index 7141d7a33c..70f478c9f9 100644 --- a/dvc/tree/gs.py +++ b/dvc/tree/gs.py @@ -46,24 +46,9 @@ def wrapper(*args, **kwargs): @dynamic_chunk_size -def _upload_to_bucket( - bucket, - from_file, - to_info, - chunk_size=None, - name=None, - no_progress_bar=False, -): +def _upload_to_bucket(bucket, fobj, to_info, chunk_size=None): blob = bucket.blob(to_info.path, chunk_size=chunk_size) - with open(from_file, mode="rb") as fobj: - with Tqdm.wrapattr( - fobj, - "read", - desc=name or to_info.path, - total=os.path.getsize(from_file), - disable=no_progress_bar, - ) as wrapped: - blob.upload_from_file(wrapped) + blob.upload_from_file(fobj) class GSTree(BaseTree): @@ -205,17 +190,25 @@ def get_file_hash(self, path_info): size=blob.size, ) + def _upload_fobj(self, fobj, to_info): + bucket = self.gs.bucket(to_info.bucket) + # With other references being given in the @dynamic_chunk_size + # this function does not respect tree.CHUNK_SIZE, since it is + # too big for GS to handle. Rather it dynamically calculates the + # best possible chunk size + _upload_to_bucket(bucket, fobj, to_info) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - bucket = self.gs.bucket(to_info.bucket) - _upload_to_bucket( - bucket, - from_file, - to_info, - name=name, - no_progress_bar=no_progress_bar, - ) + with open(from_file, mode="rb") as fobj: + self.upload_fobj( + fobj, + to_info, + desc=name or to_info.path, + total=os.path.getsize(from_file), + no_progress_bar=no_progress_bar, + ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): bucket = self.gs.bucket(from_info.bucket) diff --git a/dvc/tree/hdfs.py b/dvc/tree/hdfs.py index 5f50add7ae..e115ac65de 100644 --- a/dvc/tree/hdfs.py +++ b/dvc/tree/hdfs.py @@ -235,6 +235,11 @@ def get_file_hash(self, path_info): size=self.getsize(path_info), ) + def _upload_fobj(self, fobj, to_info): + with self.hdfs(to_info) as hdfs: + with hdfs.open_output_stream(to_info.path) as fdest: + shutil.copyfileobj(fobj, fdest) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): diff --git a/dvc/tree/http.py b/dvc/tree/http.py index 7ffd785049..f3b762291e 100644 --- a/dvc/tree/http.py +++ b/dvc/tree/http.py @@ -74,6 +74,9 @@ def _auth_method(self, path_info=None): self.headers.update({self.custom_auth_header: self.password}) return None + def _generate_download_url(self, path_info): + return path_info.url + @wrap_prop(threading.Lock()) @cached_property def _session(self): @@ -169,6 +172,18 @@ def get_file_hash(self, path_info): return HashInfo(self.PARAM_CHECKSUM, etag) + def _upload_fobj(self, fobj, to_info): + def chunks(fobj): + while True: + chunk = fobj.read(self.CHUNK_SIZE) + if not chunk: + break + yield chunk + + response = self.request(self.method, to_info.url, data=chunks(fobj)) + if response.status_code not in (200, 201): + raise HTTPError(response.status_code, response.reason) + def _download(self, from_info, to_file, name=None, no_progress_bar=False): response = self.request("GET", from_info.url, stream=True) if response.status_code != 200: @@ -190,27 +205,14 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - def chunks(): - with open(from_file, "rb") as fd: - with Tqdm.wrapattr( - fd, - "read", - total=None - if no_progress_bar - else os.path.getsize(from_file), - leave=False, - desc=to_info.url if name is None else name, - disable=no_progress_bar, - ) as fd_wrapped: - while True: - chunk = fd_wrapped.read(self.CHUNK_SIZE) - if not chunk: - break - yield chunk - - response = self.request(self.method, to_info.url, data=chunks()) - if response.status_code not in (200, 201): - raise HTTPError(response.status_code, response.reason) + with open(from_file, "rb") as fobj: + self.upload_fobj( + fobj, + to_info, + no_progress_bar=no_progress_bar, + desc=name or to_info.url, + total=None if no_progress_bar else os.path.getsize(from_file), + ) @staticmethod def _content_length(response): diff --git a/dvc/tree/local.py b/dvc/tree/local.py index 71a51510eb..ded370b38d 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -166,11 +166,11 @@ def copy(self, from_info, to_info): self.remove(tmp_info) raise - def copy_fobj(self, fobj, to_info, chunk_size=None): + def _upload_fobj(self, fobj, to_info): self.makedirs(to_info.parent) tmp_info = to_info.parent / tmp_fname("") try: - copy_fobj_to_file(fobj, tmp_info, chunk_size=chunk_size) + copy_fobj_to_file(fobj, tmp_info) os.rename(tmp_info, to_info) except Exception: self.remove(tmp_info) @@ -253,13 +253,6 @@ def _upload( ) os.replace(tmp_file, to_info) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - from dvc.progress import Tqdm - - with Tqdm(bytes=True, disable=no_progress_bar, **pbar_args) as pbar: - with pbar.wrapattr(fobj, "read") as fobj: - self.copy_fobj(fobj, to_info, chunk_size=self.CHUNK_SIZE) - @staticmethod def _download( from_info, to_file, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/oss.py b/dvc/tree/oss.py index ade5f24180..302e4447b0 100644 --- a/dvc/tree/oss.py +++ b/dvc/tree/oss.py @@ -116,6 +116,9 @@ def remove(self, path_info): logger.debug(f"Removing oss://{path_info}") self.oss_service.delete_object(path_info.path) + def _upload_fobj(self, fobj, to_info): + self.oss_service.put_object(to_info.path, fobj) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): diff --git a/dvc/tree/s3.py b/dvc/tree/s3.py index 1aee7de6de..eb62786fd4 100644 --- a/dvc/tree/s3.py +++ b/dvc/tree/s3.py @@ -353,6 +353,10 @@ def get_file_hash(self, path_info): size=obj.content_length, ) + def _upload_fobj(self, fobj, to_info): + with self._get_obj(to_info) as obj: + obj.upload_fileobj(fobj) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): @@ -365,27 +369,6 @@ def _upload( from_file, Callback=pbar.update, ExtraArgs=self.extra_args, ) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - from boto3.s3.transfer import TransferConfig - - config = TransferConfig( - multipart_threshold=self.CHUNK_SIZE, - multipart_chunksize=self.CHUNK_SIZE, - max_concurrency=1, - use_threads=False, - ) - with self._get_s3() as s3: - with Tqdm( - disable=no_progress_bar, bytes=True, **pbar_args - ) as pbar: - s3.meta.client.upload_fileobj( - fobj, - to_info.bucket, - to_info.path, - Config=config, - Callback=pbar.update, - ) - def _download(self, from_info, to_file, name=None, no_progress_bar=False): with self._get_obj(from_info) as obj: with Tqdm( diff --git a/dvc/tree/ssh/__init__.py b/dvc/tree/ssh/__init__.py index 9b45a6942f..8a2dee669b 100644 --- a/dvc/tree/ssh/__init__.py +++ b/dvc/tree/ssh/__init__.py @@ -251,6 +251,10 @@ def getsize(self, path_info): with self.ssh(path_info) as ssh: return ssh.getsize(path_info.path) + def _upload_fobj(self, fobj, to_info): + with self.open(to_info, mode="wb") as fdest: + shutil.copyfileobj(fobj, fdest) + def _download(self, from_info, to_file, name=None, no_progress_bar=False): with self.ssh(from_info) as ssh: ssh.download( @@ -260,14 +264,6 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): no_progress_bar=no_progress_bar, ) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - from dvc.progress import Tqdm - - with Tqdm(bytes=True, disable=no_progress_bar, **pbar_args) as pbar: - with pbar.wrapattr(fobj, "read") as fobj: - with self.open(to_info, mode="wb") as fdest: - shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) - def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): diff --git a/dvc/tree/webdav.py b/dvc/tree/webdav.py index b66d8c1be2..9828d52686 100644 --- a/dvc/tree/webdav.py +++ b/dvc/tree/webdav.py @@ -192,6 +192,12 @@ def move(self, from_info, to_info): # Webdav client move self._client.move(from_info.path, to_info.path) + def _upload_fobj(self, fobj, to_info): + # In contrast to other upload_fobj implementations, this one does not + # exactly do a chunked-upload but rather put everything in one request. + self.makedirs(to_info.parent) + self._client.upload_to(buff=fobj, remote_path=to_info.path) + # Downloads file from remote to file def _download(self, from_info, to_file, name=None, no_progress_bar=False): # Progress from HTTPTree diff --git a/dvc/tree/webhdfs.py b/dvc/tree/webhdfs.py index bfed900d3a..cfb651309a 100644 --- a/dvc/tree/webhdfs.py +++ b/dvc/tree/webhdfs.py @@ -100,7 +100,7 @@ def open(self, path_info, mode="r", encoding=None, **kwargs): with self.hdfs_client.read( path_info.path, encoding=encoding ) as reader: - yield reader.read() + yield reader def walk_files(self, path_info, **kwargs): if not self.exists(path_info): @@ -143,6 +143,10 @@ def move(self, from_info, to_info): self.hdfs_client.makedirs(to_info.parent.path) self.hdfs_client.rename(from_info.path, to_info.path) + def _upload_fobj(self, fobj, to_info): + with self.hdfs_client.write(to_info.path) as fdest: + shutil.copyfileobj(fobj, fdest) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): diff --git a/dvc/utils/fs.py b/dvc/utils/fs.py index 78ffb40de4..5f04b9436b 100644 --- a/dvc/utils/fs.py +++ b/dvc/utils/fs.py @@ -215,10 +215,10 @@ def copyfile(src, dest, no_progress_bar=False, name=None): fdest_wrapped.write(buf) -def copy_fobj_to_file(fsrc, dest, chunk_size=None): +def copy_fobj_to_file(fsrc, dest): """Copy contents of open file object to destination path.""" with open(dest, "wb+") as fdest: - shutil.copyfileobj(fsrc, fdest, length=chunk_size) + shutil.copyfileobj(fsrc, fdest) def walk_files(directory): diff --git a/tests/func/test_tree.py b/tests/func/test_tree.py index afe4d5e5d4..3df94ee9ef 100644 --- a/tests/func/test_tree.py +++ b/tests/func/test_tree.py @@ -295,3 +295,33 @@ def test_tree_getsize(dvc, cloud): assert tree.getsize(path_info / "baz") == 7 assert tree.getsize(path_info / "data" / "foo") == 3 + + +@pytest.mark.parametrize( + "cloud", + [ + pytest.lazy_fixture("azure"), + pytest.lazy_fixture("gs"), + pytest.lazy_fixture("gdrive"), + pytest.lazy_fixture("hdfs"), + pytest.lazy_fixture("http"), + pytest.lazy_fixture("local_cloud"), + pytest.lazy_fixture("oss"), + pytest.lazy_fixture("s3"), + pytest.lazy_fixture("ssh"), + pytest.lazy_fixture("webhdfs"), + ], +) +def test_tree_upload_fobj(dvc, tmp_dir, cloud): + tmp_dir.gen("foo", "foo") + tree = get_cloud_tree(dvc, **cloud.config) + + from_info = tmp_dir / "foo" + to_info = tree.path_info / "foo" + + with open(from_info, "rb") as stream: + tree.upload_fobj(stream, to_info) + + assert tree.exists(to_info) + with tree.open(to_info, "rb") as stream: + assert stream.read() == b"foo" diff --git a/tests/unit/remote/test_azure.py b/tests/unit/remote/test_azure.py index 750fee0b43..da312d3bb6 100644 --- a/tests/unit/remote/test_azure.py +++ b/tests/unit/remote/test_azure.py @@ -42,13 +42,3 @@ def test_get_file_hash(tmp_dir, azure): assert hash_ assert isinstance(hash_, str) assert hash_.strip("'").strip('"') == hash_ - - -def test_azure_dir_gen(tmp_dir, azure): - azure.gen({"data": {"foo": "foo", "bar": {"baz": "baz"}}}) - data_info = azure / "data" - - assert (data_info / "foo").read_text() == "foo" - - (data_info / "bar" / "baz").write_text("quux") - assert (data_info / "bar" / "baz").read_bytes() == b"quux"