From 02f1c4a35e70768f5e980f0da391a678fa69e67a Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 21 Jan 2021 15:46:14 +0300 Subject: [PATCH 01/17] tree: Implement upload_fobj protocol to azure --- dvc/tree/azure.py | 10 ++++++++++ tests/unit/remote/test_azure.py | 10 ---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dvc/tree/azure.py b/dvc/tree/azure.py index 57307cecce..eab8453a8a 100644 --- a/dvc/tree/azure.py +++ b/dvc/tree/azure.py @@ -184,6 +184,16 @@ def _upload( ) as wrapped: blob_client.upload_blob(wrapped, overwrite=True) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + blob_client = self.blob_service.get_blob_client( + to_info.bucket, to_info.path + ) + blob_client.max_block_size = self.CHUNK_SIZE + blob_client.min_large_block_upload_threshold = self.CHUNK_SIZE + + with Tqdm.wrapattr(fobj, "read", bytes=True, **pbar_args) as wrapped: + blob_client.upload_blob(wrapped, overwrite=True, max_concurrency=1) + def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): diff --git a/tests/unit/remote/test_azure.py b/tests/unit/remote/test_azure.py index 750fee0b43..da312d3bb6 100644 --- a/tests/unit/remote/test_azure.py +++ b/tests/unit/remote/test_azure.py @@ -42,13 +42,3 @@ def test_get_file_hash(tmp_dir, azure): assert hash_ assert isinstance(hash_, str) assert hash_.strip("'").strip('"') == hash_ - - -def test_azure_dir_gen(tmp_dir, azure): - azure.gen({"data": {"foo": "foo", "bar": {"baz": "baz"}}}) - data_info = azure / "data" - - assert (data_info / "foo").read_text() == "foo" - - (data_info / "bar" / "baz").write_text("quux") - assert (data_info / "bar" / "baz").read_bytes() == b"quux" From ee43ff60f110e699285f281c1949cf312dcc5eb0 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 21 Jan 2021 16:11:25 +0300 Subject: [PATCH 02/17] tree: Implement upload_fobj protocol to google storage --- dvc/tree/gs.py | 46 +++++++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/dvc/tree/gs.py b/dvc/tree/gs.py index 7141d7a33c..470bb13836 100644 --- a/dvc/tree/gs.py +++ b/dvc/tree/gs.py @@ -46,24 +46,9 @@ def wrapper(*args, **kwargs): @dynamic_chunk_size -def _upload_to_bucket( - bucket, - from_file, - to_info, - chunk_size=None, - name=None, - no_progress_bar=False, -): +def _upload_to_bucket(bucket, fobj, to_info, chunk_size=None): blob = bucket.blob(to_info.path, chunk_size=chunk_size) - with open(from_file, mode="rb") as fobj: - with Tqdm.wrapattr( - fobj, - "read", - desc=name or to_info.path, - total=os.path.getsize(from_file), - disable=no_progress_bar, - ) as wrapped: - blob.upload_from_file(wrapped) + blob.upload_from_file(fobj) class GSTree(BaseTree): @@ -205,17 +190,28 @@ def get_file_hash(self, path_info): size=blob.size, ) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + bucket = self.gs.bucket(to_info.bucket) + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args + ): + # With other references being given in the @dynamic_chunk_size + # this function does not respect tree.CHUNK_SIZE, since it is + # too big for GS to handle. Rather it dynamically tries to find + # the best size and uploads in that way. + _upload_to_bucket(bucket, fobj, to_info) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - bucket = self.gs.bucket(to_info.bucket) - _upload_to_bucket( - bucket, - from_file, - to_info, - name=name, - no_progress_bar=no_progress_bar, - ) + with open(from_file, mode="rb") as fobj: + self.upload_fobj( + fobj, + to_info, + desc=name or to_info.path, + total=os.path.getsize(from_file), + no_progress_bar=no_progress_bar, + ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): bucket = self.gs.bucket(from_info.bucket) From 893bf2f60348208b4b473d8c4fbe562aa3b17f1d Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 21 Jan 2021 17:06:58 +0300 Subject: [PATCH 03/17] tree: Implement upload_fobj protocol to google drive --- dvc/tree/gdrive.py | 48 ++++++++++++++++++++-------------------------- dvc/tree/gs.py | 4 ++-- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/dvc/tree/gdrive.py b/dvc/tree/gdrive.py index 2007f8d156..b5415f5df7 100644 --- a/dvc/tree/gdrive.py +++ b/dvc/tree/gdrive.py @@ -345,29 +345,12 @@ def _gdrive_shared_drive_id(self, item_id): return item.get("driveId", None) @_gdrive_retry - def _gdrive_upload_file( - self, - parent_id, - title, - no_progress_bar=False, - from_file="", - progress_name="", - ): + def _gdrive_upload_fobj(self, fobj, parent_id, title): item = self._drive.CreateFile( {"title": title, "parents": [{"id": parent_id}]} ) - - with open(from_file, "rb") as fobj: - total = os.path.getsize(from_file) - with Tqdm.wrapattr( - fobj, - "read", - desc=progress_name, - total=total, - disable=no_progress_bar, - ) as wrapped: - item.content = wrapped - item.Upload() + item.content = fobj + item.Upload() return item @_gdrive_retry @@ -570,16 +553,27 @@ def getsize(self, path_info): gdrive_file.FetchMetadata(fields="fileSize") return gdrive_file.get("fileSize") - def _upload( - self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs - ): + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): dirname = to_info.parent assert dirname - parent_id = self._get_item_id(dirname, True) + parent_id = self._get_item_id(dirname, create=True) - self._gdrive_upload_file( - parent_id, to_info.name, no_progress_bar, from_file, name - ) + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args + ) as wrapped: + self._gdrive_upload_fobj(wrapped, parent_id, to_info.name) + + def _upload( + self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs + ): + with open(from_file, "rb") as fobj: + self.upload_fobj( + fobj, + to_info, + no_progress_bar=no_progress_bar, + desc=name or to_info.name, + total=os.path.getsize(from_file), + ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): item_id = self._get_item_id(from_info) diff --git a/dvc/tree/gs.py b/dvc/tree/gs.py index 470bb13836..7ffb20e043 100644 --- a/dvc/tree/gs.py +++ b/dvc/tree/gs.py @@ -194,12 +194,12 @@ def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): bucket = self.gs.bucket(to_info.bucket) with Tqdm.wrapattr( fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args - ): + ) as wrapped: # With other references being given in the @dynamic_chunk_size # this function does not respect tree.CHUNK_SIZE, since it is # too big for GS to handle. Rather it dynamically tries to find # the best size and uploads in that way. - _upload_to_bucket(bucket, fobj, to_info) + _upload_to_bucket(bucket, wrapped, to_info) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs From 34a6004011338d0fc583e5f340ca5ada3b040103 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 21 Jan 2021 18:50:51 +0300 Subject: [PATCH 04/17] tree: Implement upload_fobj protocol to hdfs --- dvc/tree/hdfs.py | 8 ++++++++ tests/func/test_tree.py | 25 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/dvc/tree/hdfs.py b/dvc/tree/hdfs.py index 5f50add7ae..d7b132380c 100644 --- a/dvc/tree/hdfs.py +++ b/dvc/tree/hdfs.py @@ -253,6 +253,14 @@ def _upload( sobj.write(wrapped.read()) hdfs.move(tmp_file, to_info.path) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + with self.hdfs(to_info) as hdfs: + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, **pbar_args + ) as wrapped: + with hdfs.open_output_stream(to_info.path) as sobj: + shutil.copyfileobj(wrapped, sobj, length=self.CHUNK_SIZE) + def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): diff --git a/tests/func/test_tree.py b/tests/func/test_tree.py index afe4d5e5d4..cf16239e36 100644 --- a/tests/func/test_tree.py +++ b/tests/func/test_tree.py @@ -295,3 +295,28 @@ def test_tree_getsize(dvc, cloud): assert tree.getsize(path_info / "baz") == 7 assert tree.getsize(path_info / "data" / "foo") == 3 + + +@pytest.mark.parametrize( + "cloud", + [ + pytest.lazy_fixture("azure"), + pytest.lazy_fixture("gs"), + pytest.lazy_fixture("gdrive"), + pytest.lazy_fixture("hdfs"), + pytest.lazy_fixture("local_cloud"), + pytest.lazy_fixture("oss"), + pytest.lazy_fixture("s3"), + pytest.lazy_fixture("ssh"), + ], +) +def test_tree_upload_fobj(dvc, tmp_dir, cloud): + tmp_dir.gen("foo", "foo") + tree = get_cloud_tree(dvc, **cloud.config) + path_info = tree.path_info + + with open(tmp_dir / "foo", "rb") as stream: + tree.upload_fobj(stream, path_info / "foo") + + with tree.open(path_info / "foo") as stream: + assert stream.read() == "foo" From c96615fa89551f033e706cd1762d2d8cef4b0262 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Thu, 21 Jan 2021 20:33:58 +0300 Subject: [PATCH 05/17] tree: Implement upload_fobj protocol to oss (alibaba cloud) --- dvc/tree/oss.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dvc/tree/oss.py b/dvc/tree/oss.py index ade5f24180..343d51827f 100644 --- a/dvc/tree/oss.py +++ b/dvc/tree/oss.py @@ -116,6 +116,21 @@ def remove(self, path_info): logger.debug(f"Removing oss://{path_info}") self.oss_service.delete_object(path_info.path) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args + ) as wrapped: + cursor = 0 + while True: + chunk = wrapped.read(self.CHUNK_SIZE) + if not chunk: + break + + result = self.oss_service.append_object( + to_info.path, cursor, chunk + ) + cursor = result.next_position + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): From 60bf00f8712e5b4db9af1cd50caa27abd9da3b00 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Fri, 22 Jan 2021 11:14:36 +0300 Subject: [PATCH 06/17] tree: Implement upload_fobj protocol to webhdfs (also some cleanup) --- dvc/tree/azure.py | 20 ++++++++++---------- dvc/tree/hdfs.py | 16 ++++++++-------- dvc/tree/webhdfs.py | 9 ++++++++- tests/func/test_tree.py | 5 +++-- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/dvc/tree/azure.py b/dvc/tree/azure.py index eab8453a8a..fdf5e4d258 100644 --- a/dvc/tree/azure.py +++ b/dvc/tree/azure.py @@ -170,6 +170,16 @@ def getsize(self, path_info): def get_file_hash(self, path_info): return HashInfo(self.PARAM_CHECKSUM, self.get_etag(path_info)) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + blob_client = self.blob_service.get_blob_client( + to_info.bucket, to_info.path + ) + blob_client.max_block_size = self.CHUNK_SIZE + blob_client.min_large_block_upload_threshold = self.CHUNK_SIZE + + with Tqdm.wrapattr(fobj, "read", bytes=True, **pbar_args) as wrapped: + blob_client.upload_blob(wrapped, overwrite=True, max_concurrency=1) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): @@ -184,16 +194,6 @@ def _upload( ) as wrapped: blob_client.upload_blob(wrapped, overwrite=True) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - blob_client = self.blob_service.get_blob_client( - to_info.bucket, to_info.path - ) - blob_client.max_block_size = self.CHUNK_SIZE - blob_client.min_large_block_upload_threshold = self.CHUNK_SIZE - - with Tqdm.wrapattr(fobj, "read", bytes=True, **pbar_args) as wrapped: - blob_client.upload_blob(wrapped, overwrite=True, max_concurrency=1) - def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): diff --git a/dvc/tree/hdfs.py b/dvc/tree/hdfs.py index d7b132380c..a137a95c11 100644 --- a/dvc/tree/hdfs.py +++ b/dvc/tree/hdfs.py @@ -235,6 +235,14 @@ def get_file_hash(self, path_info): size=self.getsize(path_info), ) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + with self.hdfs(to_info) as hdfs: + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, **pbar_args + ) as wrapped: + with hdfs.open_output_stream(to_info.path) as fdest: + shutil.copyfileobj(wrapped, fdest, length=self.CHUNK_SIZE) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): @@ -253,14 +261,6 @@ def _upload( sobj.write(wrapped.read()) hdfs.move(tmp_file, to_info.path) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - with self.hdfs(to_info) as hdfs: - with Tqdm.wrapattr( - fobj, "read", disable=no_progress_bar, **pbar_args - ) as wrapped: - with hdfs.open_output_stream(to_info.path) as sobj: - shutil.copyfileobj(wrapped, sobj, length=self.CHUNK_SIZE) - def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs ): diff --git a/dvc/tree/webhdfs.py b/dvc/tree/webhdfs.py index bfed900d3a..f07c94ebd4 100644 --- a/dvc/tree/webhdfs.py +++ b/dvc/tree/webhdfs.py @@ -100,7 +100,7 @@ def open(self, path_info, mode="r", encoding=None, **kwargs): with self.hdfs_client.read( path_info.path, encoding=encoding ) as reader: - yield reader.read() + yield reader def walk_files(self, path_info, **kwargs): if not self.exists(path_info): @@ -143,6 +143,13 @@ def move(self, from_info, to_info): self.hdfs_client.makedirs(to_info.parent.path) self.hdfs_client.rename(from_info.path, to_info.path) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, **pbar_args + ) as wrapped: + with self.hdfs_client.write(to_info.path) as fdest: + shutil.copyfileobj(wrapped, fdest, length=self.CHUNK_SIZE) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): diff --git a/tests/func/test_tree.py b/tests/func/test_tree.py index cf16239e36..b152272136 100644 --- a/tests/func/test_tree.py +++ b/tests/func/test_tree.py @@ -308,6 +308,7 @@ def test_tree_getsize(dvc, cloud): pytest.lazy_fixture("oss"), pytest.lazy_fixture("s3"), pytest.lazy_fixture("ssh"), + pytest.lazy_fixture("webhdfs"), ], ) def test_tree_upload_fobj(dvc, tmp_dir, cloud): @@ -318,5 +319,5 @@ def test_tree_upload_fobj(dvc, tmp_dir, cloud): with open(tmp_dir / "foo", "rb") as stream: tree.upload_fobj(stream, path_info / "foo") - with tree.open(path_info / "foo") as stream: - assert stream.read() == "foo" + with tree.open(path_info / "foo", "rb") as stream: + assert stream.read() == b"foo" From e8cb0730d1df14bb140513431fe87b4d7f55f72b Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Fri, 22 Jan 2021 11:35:50 +0300 Subject: [PATCH 07/17] tree: Implement upload_fobj protocol to http/https --- dvc/tree/http.py | 49 +++++++++++++++++++++++------------------ tests/func/test_tree.py | 15 ++++++++----- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/dvc/tree/http.py b/dvc/tree/http.py index 7ffd785049..6fd55d249e 100644 --- a/dvc/tree/http.py +++ b/dvc/tree/http.py @@ -169,6 +169,26 @@ def get_file_hash(self, path_info): return HashInfo(self.PARAM_CHECKSUM, etag) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + def chunks(fobj): + with Tqdm.wrapattr( + fobj, + "read", + bytes=True, + leave=False, + disable=no_progress_bar, + **pbar_args, + ) as wrapped: + while True: + chunk = wrapped.read(self.CHUNK_SIZE) + if not chunk: + break + yield chunk + + response = self.request(self.method, to_info.url, data=chunks(fobj)) + if response.status_code not in (200, 201): + raise HTTPError(response.status_code, response.reason) + def _download(self, from_info, to_file, name=None, no_progress_bar=False): response = self.request("GET", from_info.url, stream=True) if response.status_code != 200: @@ -190,27 +210,14 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - def chunks(): - with open(from_file, "rb") as fd: - with Tqdm.wrapattr( - fd, - "read", - total=None - if no_progress_bar - else os.path.getsize(from_file), - leave=False, - desc=to_info.url if name is None else name, - disable=no_progress_bar, - ) as fd_wrapped: - while True: - chunk = fd_wrapped.read(self.CHUNK_SIZE) - if not chunk: - break - yield chunk - - response = self.request(self.method, to_info.url, data=chunks()) - if response.status_code not in (200, 201): - raise HTTPError(response.status_code, response.reason) + with open(from_file, "rb") as fobj: + self.upload_fobj( + fobj, + to_info, + no_progress_bar=no_progress_bar, + desc=name or to_info.url, + total=os.path.getsize(from_file), + ) @staticmethod def _content_length(response): diff --git a/tests/func/test_tree.py b/tests/func/test_tree.py index b152272136..26096d6553 100644 --- a/tests/func/test_tree.py +++ b/tests/func/test_tree.py @@ -304,6 +304,7 @@ def test_tree_getsize(dvc, cloud): pytest.lazy_fixture("gs"), pytest.lazy_fixture("gdrive"), pytest.lazy_fixture("hdfs"), + pytest.lazy_fixture("http"), pytest.lazy_fixture("local_cloud"), pytest.lazy_fixture("oss"), pytest.lazy_fixture("s3"), @@ -314,10 +315,14 @@ def test_tree_getsize(dvc, cloud): def test_tree_upload_fobj(dvc, tmp_dir, cloud): tmp_dir.gen("foo", "foo") tree = get_cloud_tree(dvc, **cloud.config) - path_info = tree.path_info - with open(tmp_dir / "foo", "rb") as stream: - tree.upload_fobj(stream, path_info / "foo") + from_info = tmp_dir / "foo" + to_info = tree.path_info / "foo" + + with open(from_info, "rb") as stream: + tree.upload_fobj(stream, to_info) - with tree.open(path_info / "foo", "rb") as stream: - assert stream.read() == b"foo" + assert tree.exists(to_info) + tree.download(to_info, from_info) + with open(from_info) as stream: + assert stream.read() == "foo" From 38ad72f98cb31bdece14ecc440f8687a48603ca6 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Fri, 22 Jan 2021 12:01:02 +0300 Subject: [PATCH 08/17] Add _generate_download_url() to the HTTP (for tests) --- dvc/tree/http.py | 3 +++ tests/func/test_tree.py | 5 ++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dvc/tree/http.py b/dvc/tree/http.py index 6fd55d249e..a0428cbcbe 100644 --- a/dvc/tree/http.py +++ b/dvc/tree/http.py @@ -74,6 +74,9 @@ def _auth_method(self, path_info=None): self.headers.update({self.custom_auth_header: self.password}) return None + def _generate_download_url(self, path_info): + return path_info.url + @wrap_prop(threading.Lock()) @cached_property def _session(self): diff --git a/tests/func/test_tree.py b/tests/func/test_tree.py index 26096d6553..3df94ee9ef 100644 --- a/tests/func/test_tree.py +++ b/tests/func/test_tree.py @@ -323,6 +323,5 @@ def test_tree_upload_fobj(dvc, tmp_dir, cloud): tree.upload_fobj(stream, to_info) assert tree.exists(to_info) - tree.download(to_info, from_info) - with open(from_info) as stream: - assert stream.read() == "foo" + with tree.open(to_info, "rb") as stream: + assert stream.read() == b"foo" From 200a1fb1bb8d02c23f40f3af30eac5be0120b108 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Fri, 22 Jan 2021 15:41:25 +0300 Subject: [PATCH 09/17] tree: Implement a temporary upload_fobj protocol to webdav/webdavs --- dvc/tree/webdav.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/dvc/tree/webdav.py b/dvc/tree/webdav.py index b66d8c1be2..285abe22f1 100644 --- a/dvc/tree/webdav.py +++ b/dvc/tree/webdav.py @@ -192,6 +192,23 @@ def move(self, from_info, to_info): # Webdav client move self._client.move(from_info.path, to_info.path) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + # this is not exactly a chunked upload, since it doesn't quite work + # in the way. internally it uses requests, so in theory we should be + # able to pass a generator that yields chunks though it is not working + # for some reason. + self.makedirs(to_info.parent) + + with Tqdm.wrapattr( + fobj, + "read", + bytes=True, + leave=False, + disable=no_progress_bar, + **pbar_args, + ) as wrapped: + self._client.upload_to(buff=wrapped, remote_path=to_info.path) + # Downloads file from remote to file def _download(self, from_info, to_file, name=None, no_progress_bar=False): # Progress from HTTPTree From 4d455a74a41a0087f577710c97ceaf07e3aa2aa8 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 25 Jan 2021 17:17:24 +0300 Subject: [PATCH 10/17] Unify progress bars --- dvc/tree/azure.py | 6 ++---- dvc/tree/base.py | 10 ++++++++-- dvc/tree/gdrive.py | 8 ++------ dvc/tree/gs.py | 15 ++++++--------- dvc/tree/hdfs.py | 9 +++------ dvc/tree/http.py | 20 ++++++-------------- dvc/tree/local.py | 10 +++------- dvc/tree/oss.py | 25 +++++++++++-------------- dvc/tree/s3.py | 33 +++++++++++++-------------------- dvc/tree/ssh/__init__.py | 12 ++++-------- dvc/tree/webdav.py | 19 ++++--------------- dvc/tree/webhdfs.py | 9 +++------ 12 files changed, 65 insertions(+), 111 deletions(-) diff --git a/dvc/tree/azure.py b/dvc/tree/azure.py index fdf5e4d258..410b3a6d88 100644 --- a/dvc/tree/azure.py +++ b/dvc/tree/azure.py @@ -170,15 +170,13 @@ def getsize(self, path_info): def get_file_hash(self, path_info): return HashInfo(self.PARAM_CHECKSUM, self.get_etag(path_info)) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + def _upload_fobj(self, fobj, to_info): blob_client = self.blob_service.get_blob_client( to_info.bucket, to_info.path ) blob_client.max_block_size = self.CHUNK_SIZE blob_client.min_large_block_upload_threshold = self.CHUNK_SIZE - - with Tqdm.wrapattr(fobj, "read", bytes=True, **pbar_args) as wrapped: - blob_client.upload_blob(wrapped, overwrite=True, max_concurrency=1) + blob_client.upload_blob(fobj, overwrite=True, max_concurrency=1) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/base.py b/dvc/tree/base.py index a95e320711..af78b8698b 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -364,8 +364,14 @@ def upload( no_progress_bar=no_progress_bar, ) - def upload_fobj(self, fobj, to_info, no_progress_bar=False): - raise RemoteActionNotImplemented("upload_fobj", self.scheme) + def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + if not hasattr(self, "_upload_fobj"): + raise RemoteActionNotImplemented("upload_fobj", self.scheme) + + with Tqdm.wrapattr( + fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args + ) as wrapped: + self._upload_fobj(wrapped, to_info) # pylint: disable=no-member def download( self, diff --git a/dvc/tree/gdrive.py b/dvc/tree/gdrive.py index b5415f5df7..05c71b67ae 100644 --- a/dvc/tree/gdrive.py +++ b/dvc/tree/gdrive.py @@ -553,15 +553,11 @@ def getsize(self, path_info): gdrive_file.FetchMetadata(fields="fileSize") return gdrive_file.get("fileSize") - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + def _upload_fobj(self, fobj, to_info): dirname = to_info.parent assert dirname parent_id = self._get_item_id(dirname, create=True) - - with Tqdm.wrapattr( - fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args - ) as wrapped: - self._gdrive_upload_fobj(wrapped, parent_id, to_info.name) + self._gdrive_upload_fobj(fobj, parent_id, to_info.name) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/gs.py b/dvc/tree/gs.py index 7ffb20e043..70f478c9f9 100644 --- a/dvc/tree/gs.py +++ b/dvc/tree/gs.py @@ -190,16 +190,13 @@ def get_file_hash(self, path_info): size=blob.size, ) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + def _upload_fobj(self, fobj, to_info): bucket = self.gs.bucket(to_info.bucket) - with Tqdm.wrapattr( - fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args - ) as wrapped: - # With other references being given in the @dynamic_chunk_size - # this function does not respect tree.CHUNK_SIZE, since it is - # too big for GS to handle. Rather it dynamically tries to find - # the best size and uploads in that way. - _upload_to_bucket(bucket, wrapped, to_info) + # With other references being given in the @dynamic_chunk_size + # this function does not respect tree.CHUNK_SIZE, since it is + # too big for GS to handle. Rather it dynamically calculates the + # best possible chunk size + _upload_to_bucket(bucket, fobj, to_info) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/hdfs.py b/dvc/tree/hdfs.py index a137a95c11..a8f493bf22 100644 --- a/dvc/tree/hdfs.py +++ b/dvc/tree/hdfs.py @@ -235,13 +235,10 @@ def get_file_hash(self, path_info): size=self.getsize(path_info), ) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + def _upload_fobj(self, fobj, to_info): with self.hdfs(to_info) as hdfs: - with Tqdm.wrapattr( - fobj, "read", disable=no_progress_bar, **pbar_args - ) as wrapped: - with hdfs.open_output_stream(to_info.path) as fdest: - shutil.copyfileobj(wrapped, fdest, length=self.CHUNK_SIZE) + with hdfs.open_output_stream(to_info.path) as fdest: + shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/http.py b/dvc/tree/http.py index a0428cbcbe..faf67ff391 100644 --- a/dvc/tree/http.py +++ b/dvc/tree/http.py @@ -172,21 +172,13 @@ def get_file_hash(self, path_info): return HashInfo(self.PARAM_CHECKSUM, etag) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + def _upload_fobj(self, fobj, to_info): def chunks(fobj): - with Tqdm.wrapattr( - fobj, - "read", - bytes=True, - leave=False, - disable=no_progress_bar, - **pbar_args, - ) as wrapped: - while True: - chunk = wrapped.read(self.CHUNK_SIZE) - if not chunk: - break - yield chunk + while True: + chunk = fobj.read(self.CHUNK_SIZE) + if not chunk: + break + yield chunk response = self.request(self.method, to_info.url, data=chunks(fobj)) if response.status_code not in (200, 201): diff --git a/dvc/tree/local.py b/dvc/tree/local.py index 71a51510eb..5cbd0d03c8 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -2,6 +2,7 @@ import logging import os import stat +from functools import partialmethod from funcy import cached_property @@ -242,6 +243,8 @@ def get_file_hash(self, path_info): def getsize(path_info): return os.path.getsize(path_info) + _upload_fobj = partialmethod(copy_fobj, chunk_size=BaseTree.CHUNK_SIZE) + def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs, ): @@ -253,13 +256,6 @@ def _upload( ) os.replace(tmp_file, to_info) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - from dvc.progress import Tqdm - - with Tqdm(bytes=True, disable=no_progress_bar, **pbar_args) as pbar: - with pbar.wrapattr(fobj, "read") as fobj: - self.copy_fobj(fobj, to_info, chunk_size=self.CHUNK_SIZE) - @staticmethod def _download( from_info, to_file, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/oss.py b/dvc/tree/oss.py index 343d51827f..5dcd6b5184 100644 --- a/dvc/tree/oss.py +++ b/dvc/tree/oss.py @@ -116,20 +116,17 @@ def remove(self, path_info): logger.debug(f"Removing oss://{path_info}") self.oss_service.delete_object(path_info.path) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - with Tqdm.wrapattr( - fobj, "read", disable=no_progress_bar, bytes=True, **pbar_args - ) as wrapped: - cursor = 0 - while True: - chunk = wrapped.read(self.CHUNK_SIZE) - if not chunk: - break - - result = self.oss_service.append_object( - to_info.path, cursor, chunk - ) - cursor = result.next_position + def _upload_fobj(self, fobj, to_info): + cursor = 0 + while True: + chunk = fobj.read(self.CHUNK_SIZE) + if not chunk: + break + + result = self.oss_service.append_object( + to_info.path, cursor, chunk + ) + cursor = result.next_position def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/s3.py b/dvc/tree/s3.py index 1aee7de6de..0f1469e121 100644 --- a/dvc/tree/s3.py +++ b/dvc/tree/s3.py @@ -353,19 +353,7 @@ def get_file_hash(self, path_info): size=obj.content_length, ) - def _upload( - self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs - ): - with self._get_obj(to_info) as obj: - total = os.path.getsize(from_file) - with Tqdm( - disable=no_progress_bar, total=total, bytes=True, desc=name - ) as pbar: - obj.upload_file( - from_file, Callback=pbar.update, ExtraArgs=self.extra_args, - ) - - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): + def _upload_fobj(self, fobj, to_info): from boto3.s3.transfer import TransferConfig config = TransferConfig( @@ -375,15 +363,20 @@ def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): use_threads=False, ) with self._get_s3() as s3: + s3.meta.client.upload_fileobj( + fobj, to_info.bucket, to_info.path, Config=config + ) + + def _upload( + self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs + ): + with self._get_obj(to_info) as obj: + total = os.path.getsize(from_file) with Tqdm( - disable=no_progress_bar, bytes=True, **pbar_args + disable=no_progress_bar, total=total, bytes=True, desc=name ) as pbar: - s3.meta.client.upload_fileobj( - fobj, - to_info.bucket, - to_info.path, - Config=config, - Callback=pbar.update, + obj.upload_file( + from_file, Callback=pbar.update, ExtraArgs=self.extra_args, ) def _download(self, from_info, to_file, name=None, no_progress_bar=False): diff --git a/dvc/tree/ssh/__init__.py b/dvc/tree/ssh/__init__.py index 9b45a6942f..7a69b856b2 100644 --- a/dvc/tree/ssh/__init__.py +++ b/dvc/tree/ssh/__init__.py @@ -251,6 +251,10 @@ def getsize(self, path_info): with self.ssh(path_info) as ssh: return ssh.getsize(path_info.path) + def _upload_fobj(self, fobj, to_info): + with self.open(to_info, mode="wb") as fdest: + shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) + def _download(self, from_info, to_file, name=None, no_progress_bar=False): with self.ssh(from_info) as ssh: ssh.download( @@ -260,14 +264,6 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=False): no_progress_bar=no_progress_bar, ) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - from dvc.progress import Tqdm - - with Tqdm(bytes=True, disable=no_progress_bar, **pbar_args) as pbar: - with pbar.wrapattr(fobj, "read") as fobj: - with self.open(to_info, mode="wb") as fdest: - shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) - def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): diff --git a/dvc/tree/webdav.py b/dvc/tree/webdav.py index 285abe22f1..9828d52686 100644 --- a/dvc/tree/webdav.py +++ b/dvc/tree/webdav.py @@ -192,22 +192,11 @@ def move(self, from_info, to_info): # Webdav client move self._client.move(from_info.path, to_info.path) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - # this is not exactly a chunked upload, since it doesn't quite work - # in the way. internally it uses requests, so in theory we should be - # able to pass a generator that yields chunks though it is not working - # for some reason. + def _upload_fobj(self, fobj, to_info): + # In contrast to other upload_fobj implementations, this one does not + # exactly do a chunked-upload but rather put everything in one request. self.makedirs(to_info.parent) - - with Tqdm.wrapattr( - fobj, - "read", - bytes=True, - leave=False, - disable=no_progress_bar, - **pbar_args, - ) as wrapped: - self._client.upload_to(buff=wrapped, remote_path=to_info.path) + self._client.upload_to(buff=fobj, remote_path=to_info.path) # Downloads file from remote to file def _download(self, from_info, to_file, name=None, no_progress_bar=False): diff --git a/dvc/tree/webhdfs.py b/dvc/tree/webhdfs.py index f07c94ebd4..bee1bf84fb 100644 --- a/dvc/tree/webhdfs.py +++ b/dvc/tree/webhdfs.py @@ -143,12 +143,9 @@ def move(self, from_info, to_info): self.hdfs_client.makedirs(to_info.parent.path) self.hdfs_client.rename(from_info.path, to_info.path) - def upload_fobj(self, fobj, to_info, no_progress_bar=False, **pbar_args): - with Tqdm.wrapattr( - fobj, "read", disable=no_progress_bar, **pbar_args - ) as wrapped: - with self.hdfs_client.write(to_info.path) as fdest: - shutil.copyfileobj(wrapped, fdest, length=self.CHUNK_SIZE) + def _upload_fobj(self, fobj, to_info): + with self.hdfs_client.write(to_info.path) as fdest: + shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs From 8a38cb9721ef8ceaf8eb476b827263e2a908a30b Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 25 Jan 2021 17:30:37 +0300 Subject: [PATCH 11/17] Lazily evaluate the file size, for http uploads --- dvc/tree/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dvc/tree/http.py b/dvc/tree/http.py index faf67ff391..f3b762291e 100644 --- a/dvc/tree/http.py +++ b/dvc/tree/http.py @@ -211,7 +211,7 @@ def _upload( to_info, no_progress_bar=no_progress_bar, desc=name or to_info.url, - total=os.path.getsize(from_file), + total=None if no_progress_bar else os.path.getsize(from_file), ) @staticmethod From 80f29e1501c6742ff7f8db1571c34b1458a15340 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 25 Jan 2021 18:08:43 +0300 Subject: [PATCH 12/17] Remove copy_fobj protocol and _transfer_file_as_whole --- dvc/cache/base.py | 33 ++++----------------------------- dvc/tree/base.py | 3 --- dvc/tree/local.py | 7 ++----- 3 files changed, 6 insertions(+), 37 deletions(-) diff --git a/dvc/cache/base.py b/dvc/cache/base.py index 2fcbb3858f..694bfff4b0 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -249,33 +249,16 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs): if not ( tree.isdvc(path_info, strict=False) and tree.fetch ): - self.tree.copy_fobj(fobj, cache_info) + self.tree.upload_fobj( + fobj, cache_info, no_progress_bar=True + ) callback = kwargs.get("download_callback") if callback: callback(1) self.tree.state.save(cache_info, hash_info) - def _transfer_file_as_whole(self, from_tree, from_info): - from dvc.utils import tmp_fname - - # When we can't use the chunked upload, we have to first download - # and then calculate the hash as if it were a local file and then - # upload it. - local_tree = self.repo.cache.local.tree - local_info = local_tree.path_info / tmp_fname() - - from_tree.download(from_info, local_info) - hash_info = local_tree.get_file_hash(local_info) - - self.tree.upload( - local_info, - self.tree.hash_to_path_info(hash_info.value), - name=from_info.name, - ) - return hash_info - - def _transfer_file_as_chunked(self, from_tree, from_info): + def _transfer_file(self, from_tree, from_info): from dvc.utils import tmp_fname from dvc.utils.stream import HashedStreamReader @@ -298,14 +281,6 @@ def _transfer_file_as_chunked(self, from_tree, from_info): self.move(tmp_info, self.tree.hash_to_path_info(hash_info.value)) return hash_info - def _transfer_file(self, from_tree, from_info): - try: - hash_info = self._transfer_file_as_chunked(from_tree, from_info) - except RemoteActionNotImplemented: - hash_info = self._transfer_file_as_whole(from_tree, from_info) - - return hash_info - def _transfer_directory_contents(self, from_tree, from_info, jobs, pbar): rel_path_infos = {} from_infos = from_tree.walk_files(from_info) diff --git a/dvc/tree/base.py b/dvc/tree/base.py index af78b8698b..37b68f96cb 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -226,9 +226,6 @@ def move(self, from_info, to_info): def copy(self, from_info, to_info): raise RemoteActionNotImplemented("copy", self.scheme) - def copy_fobj(self, fobj, to_info, chunk_size=None): - raise RemoteActionNotImplemented("copy_fobj", self.scheme) - def symlink(self, from_info, to_info): raise RemoteActionNotImplemented("symlink", self.scheme) diff --git a/dvc/tree/local.py b/dvc/tree/local.py index 5cbd0d03c8..7b94d2eebe 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -2,7 +2,6 @@ import logging import os import stat -from functools import partialmethod from funcy import cached_property @@ -167,11 +166,11 @@ def copy(self, from_info, to_info): self.remove(tmp_info) raise - def copy_fobj(self, fobj, to_info, chunk_size=None): + def _upload_fobj(self, fobj, to_info): self.makedirs(to_info.parent) tmp_info = to_info.parent / tmp_fname("") try: - copy_fobj_to_file(fobj, tmp_info, chunk_size=chunk_size) + copy_fobj_to_file(fobj, tmp_info, chunk_size=self.CHUNK_SIZE) os.rename(tmp_info, to_info) except Exception: self.remove(tmp_info) @@ -243,8 +242,6 @@ def get_file_hash(self, path_info): def getsize(path_info): return os.path.getsize(path_info) - _upload_fobj = partialmethod(copy_fobj, chunk_size=BaseTree.CHUNK_SIZE) - def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs, ): From 5be166edb46b01e4c69103a372723e84d7fa477b Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 25 Jan 2021 18:41:26 +0300 Subject: [PATCH 13/17] show progress bar on cache transfer --- dvc/cache/base.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dvc/cache/base.py b/dvc/cache/base.py index 694bfff4b0..ad4ebc8f98 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -249,9 +249,7 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs): if not ( tree.isdvc(path_info, strict=False) and tree.fetch ): - self.tree.upload_fobj( - fobj, cache_info, no_progress_bar=True - ) + self.tree.upload_fobj(fobj, cache_info) callback = kwargs.get("download_callback") if callback: callback(1) From 8ae1efca0df95e49553962c83e0713d1304cbb1f Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 25 Jan 2021 22:27:35 +0300 Subject: [PATCH 14/17] Simplify s3 upload --- dvc/tree/s3.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dvc/tree/s3.py b/dvc/tree/s3.py index 0f1469e121..792d00f215 100644 --- a/dvc/tree/s3.py +++ b/dvc/tree/s3.py @@ -362,10 +362,8 @@ def _upload_fobj(self, fobj, to_info): max_concurrency=1, use_threads=False, ) - with self._get_s3() as s3: - s3.meta.client.upload_fileobj( - fobj, to_info.bucket, to_info.path, Config=config - ) + with self._get_obj(to_info) as obj: + obj.upload_fileobj(fobj, Config=config) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs From 1a43bfc29d49050b3e7f83d204d9105f9d615503 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 26 Jan 2021 10:39:56 +0300 Subject: [PATCH 15/17] Remove CHUNK_SIZE (except for some special cases) --- dvc/tree/azure.py | 17 +++++++---------- dvc/tree/base.py | 2 ++ dvc/tree/hdfs.py | 2 +- dvc/tree/local.py | 2 +- dvc/tree/oss.py | 1 + dvc/tree/s3.py | 7 +------ dvc/tree/ssh/__init__.py | 2 +- dvc/tree/webhdfs.py | 2 +- dvc/utils/fs.py | 4 ++-- 9 files changed, 17 insertions(+), 22 deletions(-) diff --git a/dvc/tree/azure.py b/dvc/tree/azure.py index 410b3a6d88..c87dde0599 100644 --- a/dvc/tree/azure.py +++ b/dvc/tree/azure.py @@ -174,23 +174,20 @@ def _upload_fobj(self, fobj, to_info): blob_client = self.blob_service.get_blob_client( to_info.bucket, to_info.path ) - blob_client.max_block_size = self.CHUNK_SIZE - blob_client.min_large_block_upload_threshold = self.CHUNK_SIZE blob_client.upload_blob(fobj, overwrite=True, max_concurrency=1) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs ): - - blob_client = self.blob_service.get_blob_client( - to_info.bucket, to_info.path - ) total = os.path.getsize(from_file) with open(from_file, "rb") as fobj: - with Tqdm.wrapattr( - fobj, "read", desc=name, total=total, disable=no_progress_bar - ) as wrapped: - blob_client.upload_blob(wrapped, overwrite=True) + self.upload_fobj( + fobj, + to_info, + desc=name, + total=total, + no_progress_bar=no_progress_bar, + ) def _download( self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/base.py b/dvc/tree/base.py index 37b68f96cb..b2861117b6 100644 --- a/dvc/tree/base.py +++ b/dvc/tree/base.py @@ -59,6 +59,8 @@ class BaseTree: TRAVERSE_PREFIX_LEN = 3 TRAVERSE_THRESHOLD_SIZE = 500000 CAN_TRAVERSE = True + + # Needed for some providers, and http open() CHUNK_SIZE = 64 * 1024 * 1024 # 64 MiB PARAM_CHECKSUM: ClassVar[Optional[str]] = None diff --git a/dvc/tree/hdfs.py b/dvc/tree/hdfs.py index a8f493bf22..e115ac65de 100644 --- a/dvc/tree/hdfs.py +++ b/dvc/tree/hdfs.py @@ -238,7 +238,7 @@ def get_file_hash(self, path_info): def _upload_fobj(self, fobj, to_info): with self.hdfs(to_info) as hdfs: with hdfs.open_output_stream(to_info.path) as fdest: - shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) + shutil.copyfileobj(fobj, fdest) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/local.py b/dvc/tree/local.py index 7b94d2eebe..ded370b38d 100644 --- a/dvc/tree/local.py +++ b/dvc/tree/local.py @@ -170,7 +170,7 @@ def _upload_fobj(self, fobj, to_info): self.makedirs(to_info.parent) tmp_info = to_info.parent / tmp_fname("") try: - copy_fobj_to_file(fobj, tmp_info, chunk_size=self.CHUNK_SIZE) + copy_fobj_to_file(fobj, tmp_info) os.rename(tmp_info, to_info) except Exception: self.remove(tmp_info) diff --git a/dvc/tree/oss.py b/dvc/tree/oss.py index 5dcd6b5184..6fef7028f3 100644 --- a/dvc/tree/oss.py +++ b/dvc/tree/oss.py @@ -38,6 +38,7 @@ class OSSTree(BaseTree): # pylint:disable=abstract-method PARAM_CHECKSUM = "etag" COPY_POLL_SECONDS = 5 LIST_OBJECT_PAGE_SIZE = 100 + CHUNK_SIZE = 64 * 1024 * 1024 def __init__(self, repo, config): super().__init__(repo, config) diff --git a/dvc/tree/s3.py b/dvc/tree/s3.py index 792d00f215..5e29ac7cc6 100644 --- a/dvc/tree/s3.py +++ b/dvc/tree/s3.py @@ -356,12 +356,7 @@ def get_file_hash(self, path_info): def _upload_fobj(self, fobj, to_info): from boto3.s3.transfer import TransferConfig - config = TransferConfig( - multipart_threshold=self.CHUNK_SIZE, - multipart_chunksize=self.CHUNK_SIZE, - max_concurrency=1, - use_threads=False, - ) + config = TransferConfig(max_concurrency=1, use_threads=False) with self._get_obj(to_info) as obj: obj.upload_fileobj(fobj, Config=config) diff --git a/dvc/tree/ssh/__init__.py b/dvc/tree/ssh/__init__.py index 7a69b856b2..8a2dee669b 100644 --- a/dvc/tree/ssh/__init__.py +++ b/dvc/tree/ssh/__init__.py @@ -253,7 +253,7 @@ def getsize(self, path_info): def _upload_fobj(self, fobj, to_info): with self.open(to_info, mode="wb") as fdest: - shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) + shutil.copyfileobj(fobj, fdest) def _download(self, from_info, to_file, name=None, no_progress_bar=False): with self.ssh(from_info) as ssh: diff --git a/dvc/tree/webhdfs.py b/dvc/tree/webhdfs.py index bee1bf84fb..cfb651309a 100644 --- a/dvc/tree/webhdfs.py +++ b/dvc/tree/webhdfs.py @@ -145,7 +145,7 @@ def move(self, from_info, to_info): def _upload_fobj(self, fobj, to_info): with self.hdfs_client.write(to_info.path) as fdest: - shutil.copyfileobj(fobj, fdest, length=self.CHUNK_SIZE) + shutil.copyfileobj(fobj, fdest) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/utils/fs.py b/dvc/utils/fs.py index 78ffb40de4..5f04b9436b 100644 --- a/dvc/utils/fs.py +++ b/dvc/utils/fs.py @@ -215,10 +215,10 @@ def copyfile(src, dest, no_progress_bar=False, name=None): fdest_wrapped.write(buf) -def copy_fobj_to_file(fsrc, dest, chunk_size=None): +def copy_fobj_to_file(fsrc, dest): """Copy contents of open file object to destination path.""" with open(dest, "wb+") as fdest: - shutil.copyfileobj(fsrc, fdest, length=chunk_size) + shutil.copyfileobj(fsrc, fdest) def walk_files(directory): From ffbc667031f11fc1515b95f7243c723252a43c0a Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 26 Jan 2021 13:31:57 +0300 Subject: [PATCH 16/17] remove concurrency restrictions --- dvc/tree/azure.py | 2 +- dvc/tree/oss.py | 11 +---------- dvc/tree/s3.py | 5 +---- 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/dvc/tree/azure.py b/dvc/tree/azure.py index c87dde0599..b80ad74c35 100644 --- a/dvc/tree/azure.py +++ b/dvc/tree/azure.py @@ -174,7 +174,7 @@ def _upload_fobj(self, fobj, to_info): blob_client = self.blob_service.get_blob_client( to_info.bucket, to_info.path ) - blob_client.upload_blob(fobj, overwrite=True, max_concurrency=1) + blob_client.upload_blob(fobj, overwrite=True) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/oss.py b/dvc/tree/oss.py index 6fef7028f3..135d63dca0 100644 --- a/dvc/tree/oss.py +++ b/dvc/tree/oss.py @@ -118,16 +118,7 @@ def remove(self, path_info): self.oss_service.delete_object(path_info.path) def _upload_fobj(self, fobj, to_info): - cursor = 0 - while True: - chunk = fobj.read(self.CHUNK_SIZE) - if not chunk: - break - - result = self.oss_service.append_object( - to_info.path, cursor, chunk - ) - cursor = result.next_position + self.oss_service.put_object(to_info.path, fobj) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs diff --git a/dvc/tree/s3.py b/dvc/tree/s3.py index 5e29ac7cc6..eb62786fd4 100644 --- a/dvc/tree/s3.py +++ b/dvc/tree/s3.py @@ -354,11 +354,8 @@ def get_file_hash(self, path_info): ) def _upload_fobj(self, fobj, to_info): - from boto3.s3.transfer import TransferConfig - - config = TransferConfig(max_concurrency=1, use_threads=False) with self._get_obj(to_info) as obj: - obj.upload_fileobj(fobj, Config=config) + obj.upload_fileobj(fobj) def _upload( self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs From 2d178ff5b12f71a81923963f3ec110463186a17e Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Tue, 26 Jan 2021 13:55:55 +0300 Subject: [PATCH 17/17] Update dvc/tree/oss.py Co-authored-by: Ruslan Kuprieiev --- dvc/tree/oss.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dvc/tree/oss.py b/dvc/tree/oss.py index 135d63dca0..302e4447b0 100644 --- a/dvc/tree/oss.py +++ b/dvc/tree/oss.py @@ -38,7 +38,6 @@ class OSSTree(BaseTree): # pylint:disable=abstract-method PARAM_CHECKSUM = "etag" COPY_POLL_SECONDS = 5 LIST_OBJECT_PAGE_SIZE = 100 - CHUNK_SIZE = 64 * 1024 * 1024 def __init__(self, repo, config): super().__init__(repo, config)