From 8639b694437adfdd2160f203a1433e84d29f97ba Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Thu, 15 Oct 2020 03:57:24 +0200 Subject: [PATCH 01/12] ipfs: add stub for IPFSTree implementation Related to #930 --- dvc/config.py | 1 + dvc/scheme.py | 1 + dvc/tree/__init__.py | 6 +++++ dvc/tree/ipfs.py | 43 ++++++++++++++++++++++++++++++++++++ tests/unit/tree/test_ipfs.py | 9 ++++++++ 5 files changed, 60 insertions(+) create mode 100644 dvc/tree/ipfs.py create mode 100644 tests/unit/tree/test_ipfs.py diff --git a/dvc/config.py b/dvc/config.py index 090d065cd9..4698073164 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -216,6 +216,7 @@ class RelPath(str): "https": {**HTTP_COMMON, **REMOTE_COMMON}, "webdav": {**WEBDAV_COMMON, **REMOTE_COMMON}, "webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON}, + "ipfs": {"ipfs_daemon_ip": str, **REMOTE_COMMON}, "remote": {str: object}, # Any of the above options are valid } ) diff --git a/dvc/scheme.py b/dvc/scheme.py index 76c6d7a497..6cdec835ba 100644 --- a/dvc/scheme.py +++ b/dvc/scheme.py @@ -11,3 +11,4 @@ class Schemes: OSS = "oss" WEBDAV = "webdav" WEBDAVS = "webdavs" + IPFS = "ipfs" diff --git a/dvc/tree/__init__.py b/dvc/tree/__init__.py index cbd1595a69..79d31b3193 100644 --- a/dvc/tree/__init__.py +++ b/dvc/tree/__init__.py @@ -1,3 +1,4 @@ +import logging import posixpath from urllib.parse import urlparse @@ -13,6 +14,9 @@ from .ssh import SSHTree from .webdav import WebDAVTree from .webdavs import WebDAVSTree +from .ipfs import IPFSTree + +logger = logging.getLogger(__name__) TREES = [ AzureTree, @@ -26,6 +30,7 @@ OSSTree, WebDAVTree, WebDAVSTree, + IPFSTree, # NOTE: LocalTree is the default ] @@ -34,6 +39,7 @@ def _get_tree(remote_conf): for tree_cls in TREES: if tree_cls.supported(remote_conf): return tree_cls + logger.debug(f"Falling back to LocalTree for remote config {remote_conf}") return LocalTree diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py new file mode 100644 index 0000000000..28eb1aeb06 --- /dev/null +++ b/dvc/tree/ipfs.py @@ -0,0 +1,43 @@ +import logging + +from .base import BaseTree +from ..path_info import CloudURLInfo +from ..scheme import Schemes + +logger = logging.getLogger(__name__) + + +class IPFSTree(BaseTree): + scheme = Schemes.IPFS + PATH_CLS = CloudURLInfo + + def __init__(self, repo, config): + super().__init__(repo, config) + logger.debug(config["url"]) + self.path_info = IPFSTree.PATH_CLS(config["url"]) + logger.debug(self.path_info) + + def exists(self, path_info, use_dvcignore=True): + logger.debug(f"Checking if {path_info} exists on IPFS") + return False + + def walk_files(self, path_info, **kwargs): + """Return a generator with `PathInfo`s to all the files. + + Optional kwargs: + prefix (bool): If true `path_info` will be treated as a prefix + rather than directory path. + """ + logger.debug(f"Walking files in {path_info} (kwargs={kwargs})") + for file_name in self._list_paths(path_info, **kwargs): + if file_name.endswith("/"): + continue + + yield path_info.replace(path=file_name) + + def _upload(self, from_file, to_info, name=None, no_progress_bar=False): + logger.debug(f"Uploading {from_file} (to_info={to_info}, name={name})") + return + + def _list_paths(self, path_info, **kwargs): + return ["foo/", "foo/bar", "foo/batz"] diff --git a/tests/unit/tree/test_ipfs.py b/tests/unit/tree/test_ipfs.py new file mode 100644 index 0000000000..c7a056d850 --- /dev/null +++ b/tests/unit/tree/test_ipfs.py @@ -0,0 +1,9 @@ +from dvc.tree import IPFSTree + + +def test_init(dvc): + url = "ipfs://TODO" + config = {"url": url} + tree = IPFSTree(dvc, config) + + assert tree.path_info == url From abdfd1ec2641962c5d2c4a374e0f772f10b25148 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Sat, 17 Oct 2020 17:47:29 +0200 Subject: [PATCH 02/12] ipfs: basic implementation allowing pulling and pushing of test files --- dvc/tree/ipfs.py | 98 +++++++++++++++++++++++++++++++++++++----------- setup.py | 5 ++- 2 files changed, 81 insertions(+), 22 deletions(-) diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index 28eb1aeb06..72d4b43867 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -1,43 +1,99 @@ import logging +from pathlib import Path +from typing import Optional + +import ipfshttpclient from .base import BaseTree -from ..path_info import CloudURLInfo +from ..exceptions import DvcException +from ..path_info import URLInfo from ..scheme import Schemes logger = logging.getLogger(__name__) +# TODO: As long as we don't get the IPFS CID, we need to fake to be able to implement the rest +TMP_IPFS_CID_MAP = { + "/6c/9fb857427b459ebf0a363c9319d259": "QmNg6VqLGsAcgZ1bTMniA5CamqE1bCXYMHRkao4USHnqzv", + "/75/ee30f52010c1149d1e950b33d3adf5": "QmSgm31h9vfAn6ZKXCpVuysk45eucuG2fMoTn1dBEeRUzn", + "/51/93c4f0e82207a00e6596f679cbdb74": "Qmaz5yXazz6mjFY5575jhb9s9RVCzPY2AHyCYY2WgPmw3V", + "/ec/b3e4644128e3b3cf72e139ba2365c1.dir": "QmP7aqxACrAnbfFimjuciAf2pEYbHe5UQjWaZkJ6qo5j8c", +} + class IPFSTree(BaseTree): scheme = Schemes.IPFS - PATH_CLS = CloudURLInfo + PATH_CLS = URLInfo + REQUIRES = {"ipfshttpclient": "ipfshttpclient"} def __init__(self, repo, config): super().__init__(repo, config) logger.debug(config["url"]) self.path_info = IPFSTree.PATH_CLS(config["url"]) - logger.debug(self.path_info) + self._ipfs_client: Optional[ipfshttpclient.Client] = None + try: + self._ipfs_client = ipfshttpclient.connect(session=True) + except ipfshttpclient.exceptions.VersionMismatch as e: + raise DvcException(f"Unsupported IPFS daemon ({e})") from e + except ipfshttpclient.exceptions.ConnectionError as e: + raise DvcException( + "Could not connect to ipfs daemon. Install ipfs on your machine and run `ipfs daemon`" + ) - def exists(self, path_info, use_dvcignore=True): - logger.debug(f"Checking if {path_info} exists on IPFS") - return False + def __del__(self): + if self._ipfs_client is not None: + self._ipfs_client.close() - def walk_files(self, path_info, **kwargs): - """Return a generator with `PathInfo`s to all the files. + def exists(self, path_info: PATH_CLS, use_dvcignore=True): + logger.debug(f"Checking if {path_info} exists") + # TODO: we need more information than the md5 path, since IPFS is only addressable via + # the sha256 hash of the desired file + # Dig deeper into https://docs.ipfs.io/concepts/content-addressing/#identifier-formats + # (uses sha-256, but there is some additional processing for the final Content Identifier (CID)) + ipfs_cid = TMP_IPFS_CID_MAP[path_info.path] - Optional kwargs: - prefix (bool): If true `path_info` will be treated as a prefix - rather than directory path. - """ - logger.debug(f"Walking files in {path_info} (kwargs={kwargs})") - for file_name in self._list_paths(path_info, **kwargs): - if file_name.endswith("/"): - continue + # Is there a method that checks directly the existence of a pin? + try: + self._ipfs_client.pin.ls(ipfs_cid) + except ipfshttpclient.exceptions.ErrorResponse: + return False + else: + return True - yield path_info.replace(path=file_name) + def walk_files(self, path_info, **kwargs): + logger.debug(f"Walking files in {path_info} (kwargs={kwargs})") + # TODO: walking a file path is not possible in IPFS. We could generate a directory listing with all content + # of our project. For example, this is a list of all xkcd comics until Comic #1862: + # https://ipfs.io/ipfs/QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm + # This would be possible to walk, but any change on any file generates a new CID. Therefore, we need to + # generate a new directory listing on every update and save that CID somewhere in our project. Not sure if + # this is still in scope of DVC. + # + # Therefore, we return an empty tree for now + return iter(()) def _upload(self, from_file, to_info, name=None, no_progress_bar=False): - logger.debug(f"Uploading {from_file} (to_info={to_info}, name={name})") - return + # TODO: find a way to get notified about upload process for progress bar + # https://github.com/encode/httpx seems to be used in the background. + # Maybe httpx is configurable via kwarg "params" + ipfs_cid = self._ipfs_client.add(from_file)["Hash"] + logger.debug(f"Stored {from_file} at ipfs://{ipfs_cid}") + # TODO: the ipfs_cid needs to be returned and persisted by DVC + + def _download( + self, + from_info: PATH_CLS, + to_file: str, + name=None, + no_progress_bar=False, + ): + logger.debug(f"Download {from_info} to {to_file}") + # TODO: fake mapping from path to ipfs CID + ipfs_cid = TMP_IPFS_CID_MAP[from_info.path] - def _list_paths(self, path_info, **kwargs): - return ["foo/", "foo/bar", "foo/batz"] + # ipfs client downloads the file to the given directory and the filename is always the CID + # https://github.com/ipfs-shipyard/py-ipfs-http-client/issues/48 + # Workaround by saving it to the parent directory and renaming if afterwards to the DVC expected name + to_directory = Path(to_file).parent + # TODO: find a way to get notified about download process for progress bar + self._ipfs_client.get(ipfs_cid, to_directory) + (to_directory / ipfs_cid).rename(to_file) diff --git a/setup.py b/setup.py index c57ecb93fb..2bc2f4cbd8 100644 --- a/setup.py +++ b/setup.py @@ -98,12 +98,14 @@ def run(self): # Remove the env marker if/when pyarrow is available for Python3.9 hdfs = ["pyarrow>=2.0.0; python_version < '3.9'"] webdav = ["webdavclient3>=3.14.5"] +# TODO: update ipfshttpclient to stable when 0.7.0 has been released +ipfs = ["ipfshttpclient==0.7.0a1"] # gssapi should not be included in all_remotes, because it doesn't have wheels # for linux and mac, so it will fail to compile if user doesn't have all the # requirements, including kerberos itself. Once all the wheels are available, # we can start shipping it by default. ssh_gssapi = ["paramiko[invoke,gssapi]>=2.7.0"] -all_remotes = gs + s3 + azure + ssh + oss + gdrive + hdfs + webdav +all_remotes = gs + s3 + azure + ssh + oss + gdrive + hdfs + webdav + ipfs # Extra dependecies to run tests tests_requirements = [ @@ -166,6 +168,7 @@ def run(self): "ssh_gssapi": ssh_gssapi, "hdfs": hdfs, "webdav": webdav, + "ipfs": ipfs, "tests": tests_requirements, }, keywords="data-science data-version-control machine-learning git" From f26aab1f2476346da00454ec040810e92aaa819a Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Sat, 17 Oct 2020 17:53:39 +0200 Subject: [PATCH 03/12] ipfs: add hint about ipfs_daemon_ip setting --- dvc/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dvc/config.py b/dvc/config.py index 4698073164..53a4ceee2f 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -216,6 +216,8 @@ class RelPath(str): "https": {**HTTP_COMMON, **REMOTE_COMMON}, "webdav": {**WEBDAV_COMMON, **REMOTE_COMMON}, "webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON}, + # TODO: ipfs_daemon_ip is not yet supported but may be useful. Add more options for authentication + # update this as well: https://dvc.org/doc/command-reference/remote/modify "ipfs": {"ipfs_daemon_ip": str, **REMOTE_COMMON}, "remote": {str: object}, # Any of the above options are valid } From 5dbc44e322bda36d73c175568a6317a37d955503 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Sat, 17 Oct 2020 18:55:02 +0200 Subject: [PATCH 04/12] ipfs: remove logger.debug statement which caused tests to fail --- dvc/tree/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dvc/tree/__init__.py b/dvc/tree/__init__.py index 79d31b3193..c13e56cf30 100644 --- a/dvc/tree/__init__.py +++ b/dvc/tree/__init__.py @@ -39,7 +39,6 @@ def _get_tree(remote_conf): for tree_cls in TREES: if tree_cls.supported(remote_conf): return tree_cls - logger.debug(f"Falling back to LocalTree for remote config {remote_conf}") return LocalTree From 17b2055384345b2e1e6924d1bd0b92539cc87488 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Tue, 10 Nov 2020 04:04:57 +0100 Subject: [PATCH 05/12] ipfs: wip: make use of mfs as it allows working with ipfs like a regular filesystem --- dvc/config.py | 6 ++++- dvc/tree/ipfs.py | 63 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/dvc/config.py b/dvc/config.py index 53a4ceee2f..742894f858 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -218,7 +218,11 @@ class RelPath(str): "webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON}, # TODO: ipfs_daemon_ip is not yet supported but may be useful. Add more options for authentication # update this as well: https://dvc.org/doc/command-reference/remote/modify - "ipfs": {"ipfs_daemon_ip": str, **REMOTE_COMMON}, + "ipfs": { + "ipfs_daemon_ip": str, + "mfs_path": str, + **REMOTE_COMMON, + }, "remote": {str: object}, # Any of the above options are valid } ) diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index 72d4b43867..dda1c73fb2 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -1,12 +1,15 @@ import logging +from funcy import cached_property from pathlib import Path from typing import Optional +from urllib.parse import urlparse import ipfshttpclient from .base import BaseTree +from ..config import Config from ..exceptions import DvcException -from ..path_info import URLInfo +from ..path_info import _BasePath from ..scheme import Schemes logger = logging.getLogger(__name__) @@ -20,15 +23,37 @@ } +class IPFSPathInfo(_BasePath): + def __init__(self, url): + p = urlparse(url) + self.cid = p.netloc + self.path = p.path.rstrip("/") + self.scheme = p.scheme + + @cached_property + def url(self): + return f"{self.scheme}://{self.cid}{self.path}" + + def __div__(self, other): + url = f"{self.scheme}://{self.cid}{self.path}/{other}" + return IPFSPathInfo(url) + + def __str__(self): + return self.url + + __truediv__ = __div__ + + class IPFSTree(BaseTree): scheme = Schemes.IPFS - PATH_CLS = URLInfo + PATH_CLS = IPFSPathInfo REQUIRES = {"ipfshttpclient": "ipfshttpclient"} def __init__(self, repo, config): super().__init__(repo, config) logger.debug(config["url"]) self.path_info = IPFSTree.PATH_CLS(config["url"]) + self.config = Config() self._ipfs_client: Optional[ipfshttpclient.Client] = None try: self._ipfs_client = ipfshttpclient.connect(session=True) @@ -75,8 +100,17 @@ def _upload(self, from_file, to_info, name=None, no_progress_bar=False): # TODO: find a way to get notified about upload process for progress bar # https://github.com/encode/httpx seems to be used in the background. # Maybe httpx is configurable via kwarg "params" - ipfs_cid = self._ipfs_client.add(from_file)["Hash"] - logger.debug(f"Stored {from_file} at ipfs://{ipfs_cid}") + + # TODO: mfs_path should probably go into IPFSPathInfo + mfs_project_dir = "/TODO" + mfs_path = f"{mfs_project_dir}{to_info.path}" + with open(from_file, "rb") as f: + # "parents" might get a kwarg in future versions of py-ipfs-http-client? If so, change the opts param here + self._ipfs_client.files.write( + mfs_path, f, create=True, opts={"parents": True} + ) + # we changed the content of the MFS, the CID will now be different + self._update_dvc_config() # TODO: the ipfs_cid needs to be returned and persisted by DVC def _download( @@ -97,3 +131,24 @@ def _download( # TODO: find a way to get notified about download process for progress bar self._ipfs_client.get(ipfs_cid, to_directory) (to_directory / ipfs_cid).rename(to_file) + + def _update_dvc_config(self): + """ + Changing content in IPFS means that the CID gets changed. After doing any modifications, we need to + update .dvc/config so it will always point to the latest content for every user. + + Returns: + + """ + old_cid = self.path_info.cid + new_cid = "TODO_NEW_CID" + with self.config.edit("repo") as repo_config: + section = None + for v in repo_config["remote"].values(): + if v.get("url") == "ipfs://" + old_cid: + section = v + break + if not section: + raise DvcException("Could not find ipfs config in .dvc/config") + section["url"] = "ipfs://" + new_cid + self.path_info.cid = new_cid From 604ae6e16c176d3c4643cc1689670a2337e6cef0 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Wed, 11 Nov 2020 02:41:09 +0100 Subject: [PATCH 06/12] ipfs: add a remote for ipfs The remote allows for updating .dvc/config after a successful IPFS upload. IPFS always needs the latest CID, so a new method for the `Remote` class was introduced to allow doing work after the upload --- dvc/cache/local.py | 2 ++ dvc/remote/__init__.py | 3 ++ dvc/remote/base.py | 9 +++++ dvc/remote/ipfs.py | 35 +++++++++++++++++++ dvc/tree/ipfs.py | 77 ++++++++++++++++++------------------------ 5 files changed, 81 insertions(+), 45 deletions(-) create mode 100644 dvc/remote/ipfs.py diff --git a/dvc/cache/local.py b/dvc/cache/local.py index 547c890895..9a8cd5ea5b 100644 --- a/dvc/cache/local.py +++ b/dvc/cache/local.py @@ -405,6 +405,8 @@ def _process( "'{}' nested files".format(dir_hash, len(file_hashes)) ) remote.index.update([dir_hash], file_hashes) + # notify remote of successful upload + remote.after_upload() return len(dir_plans[0]) + len(file_plans[0]) diff --git a/dvc/remote/__init__.py b/dvc/remote/__init__.py index 0c622d4b40..680b6309d1 100644 --- a/dvc/remote/__init__.py +++ b/dvc/remote/__init__.py @@ -1,3 +1,4 @@ +from .ipfs import IPFSRemote from ..tree import get_cloud_tree from .base import Remote from .local import LocalRemote @@ -10,4 +11,6 @@ def get_remote(repo, **kwargs): return LocalRemote(tree) if tree.scheme == "ssh": return SSHRemote(tree) + if tree.scheme == "ipfs": + return IPFSRemote(tree) return Remote(tree) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 8b2bd90943..dac0562a60 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -138,6 +138,15 @@ def hashes_exist(self, hashes, jobs=None, name=None): ) return list(indexed_hashes) + list(hashes & set(remote_hashes)) + def after_upload(self): + """Will be called after all uploads were successful + + Some remotes may need to do additional work after all uploads were processed. This method was originally + implemented to allow the final hash calculation of the content addressed filesystem IPFS, but may also be + useful to trigger further post-upload actions. + """ + pass + @classmethod @index_locked def gc(cls, named_cache, remote, jobs=None): diff --git a/dvc/remote/ipfs.py b/dvc/remote/ipfs.py new file mode 100644 index 0000000000..6fa7b1e988 --- /dev/null +++ b/dvc/remote/ipfs.py @@ -0,0 +1,35 @@ +import logging + +from .base import Remote +from ..config import Config +from ..exceptions import DvcException + +logger = logging.getLogger(__name__) + + +class IPFSRemote(Remote): + def after_upload(self): + """Calculate the final CID after a successful upload + + Changing files in our local MFS means that the content ID gets changed. After doing any modifications, + we therefore need to update .dvc/config so it will always point to the latest content. + Though we get a new CID, other users won't need to download everything again, since the existing files + and subdirectories will keep their CID. + """ + path_info = self.tree.path_info + old_cid = path_info.cid + new_cid = self.tree.ipfs_client.files.stat(path_info.mfs_path)["Hash"] + logger.debug(f"Saving new CID ipfs://{new_cid}") + with Config().edit("repo") as repo_config: + section = None + for v in repo_config["remote"].values(): + url = v.get("url") + if url == "ipfs://": + url = f"ipfs://{path_info.CID_EMPTY_DIRECTORY}" + if url == "ipfs://" + old_cid: + section = v + break + if not section: + raise DvcException("Could not find ipfs config in .dvc/config") + section["url"] = "ipfs://" + new_cid + path_info.cid = new_cid diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index dda1c73fb2..14d76349cc 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -24,10 +24,26 @@ class IPFSPathInfo(_BasePath): - def __init__(self, url): + + # This is the content id of an empty directory. It will be used when the user doesn't provide a CID + CID_EMPTY_DIRECTORY = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn" + + def __init__(self, url, mfs_path=None): p = urlparse(url) - self.cid = p.netloc + self.cid = p.netloc or IPFSPathInfo.CID_EMPTY_DIRECTORY self.path = p.path.rstrip("/") + # Get the name of the project directory to provide a sane default + project_dir_name = Path(Config().dvc_dir).parent.name + self.mfs_path = ( + mfs_path.rstrip("/") if mfs_path else f"/dvc/{project_dir_name}" + ) + if not self.mfs_path: + # if mfs_path was a /, it was removed by .rstrip(). It will also clutter / if it would actually be used, + # so just disallow it + raise DvcException( + "You may not use / as your IPFS MFS path. " + "Choose another with `dvc remote modify mfs_path `" + ) self.scheme = p.scheme @cached_property @@ -36,10 +52,10 @@ def url(self): def __div__(self, other): url = f"{self.scheme}://{self.cid}{self.path}/{other}" - return IPFSPathInfo(url) + return IPFSPathInfo(url, self.mfs_path) def __str__(self): - return self.url + return self.mfs_path + self.path __truediv__ = __div__ @@ -51,12 +67,13 @@ class IPFSTree(BaseTree): def __init__(self, repo, config): super().__init__(repo, config) - logger.debug(config["url"]) - self.path_info = IPFSTree.PATH_CLS(config["url"]) - self.config = Config() - self._ipfs_client: Optional[ipfshttpclient.Client] = None + self.ipfs_client: Optional[ipfshttpclient.Client] = None + self.path_info = IPFSTree.PATH_CLS( + config["url"], config.get("mfs_path") + ) try: - self._ipfs_client = ipfshttpclient.connect(session=True) + # TODO: support remote IPFS daemons with credentials + self.ipfs_client = ipfshttpclient.connect(session=True) except ipfshttpclient.exceptions.VersionMismatch as e: raise DvcException(f"Unsupported IPFS daemon ({e})") from e except ipfshttpclient.exceptions.ConnectionError as e: @@ -65,8 +82,8 @@ def __init__(self, repo, config): ) def __del__(self): - if self._ipfs_client is not None: - self._ipfs_client.close() + if self.ipfs_client is not None: + self.ipfs_client.close() def exists(self, path_info: PATH_CLS, use_dvcignore=True): logger.debug(f"Checking if {path_info} exists") @@ -78,7 +95,7 @@ def exists(self, path_info: PATH_CLS, use_dvcignore=True): # Is there a method that checks directly the existence of a pin? try: - self._ipfs_client.pin.ls(ipfs_cid) + self.ipfs_client.pin.ls(ipfs_cid) except ipfshttpclient.exceptions.ErrorResponse: return False else: @@ -97,21 +114,12 @@ def walk_files(self, path_info, **kwargs): return iter(()) def _upload(self, from_file, to_info, name=None, no_progress_bar=False): - # TODO: find a way to get notified about upload process for progress bar - # https://github.com/encode/httpx seems to be used in the background. - # Maybe httpx is configurable via kwarg "params" - - # TODO: mfs_path should probably go into IPFSPathInfo - mfs_project_dir = "/TODO" - mfs_path = f"{mfs_project_dir}{to_info.path}" + mfs_path = f"{self.path_info.mfs_path}/{to_info.path}" with open(from_file, "rb") as f: # "parents" might get a kwarg in future versions of py-ipfs-http-client? If so, change the opts param here - self._ipfs_client.files.write( + self.ipfs_client.files.write( mfs_path, f, create=True, opts={"parents": True} ) - # we changed the content of the MFS, the CID will now be different - self._update_dvc_config() - # TODO: the ipfs_cid needs to be returned and persisted by DVC def _download( self, @@ -129,26 +137,5 @@ def _download( # Workaround by saving it to the parent directory and renaming if afterwards to the DVC expected name to_directory = Path(to_file).parent # TODO: find a way to get notified about download process for progress bar - self._ipfs_client.get(ipfs_cid, to_directory) + self.ipfs_client.get(ipfs_cid, to_directory) (to_directory / ipfs_cid).rename(to_file) - - def _update_dvc_config(self): - """ - Changing content in IPFS means that the CID gets changed. After doing any modifications, we need to - update .dvc/config so it will always point to the latest content for every user. - - Returns: - - """ - old_cid = self.path_info.cid - new_cid = "TODO_NEW_CID" - with self.config.edit("repo") as repo_config: - section = None - for v in repo_config["remote"].values(): - if v.get("url") == "ipfs://" + old_cid: - section = v - break - if not section: - raise DvcException("Could not find ipfs config in .dvc/config") - section["url"] = "ipfs://" + new_cid - self.path_info.cid = new_cid From 300420c408a18ebcfc3c215f1343afb6bb6a5668 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Thu, 12 Nov 2020 00:49:18 +0100 Subject: [PATCH 07/12] ipfs: remmove ipfs_daemon_ip option After giving it some more thoughts, I don't think it would be desirable to allow talking to a remote IPFS daemon. It will inevitably happen that multiple users are working on the same MFS. Scenario: User A will push a file. User B will push some other file. User B does not know about the new content from User A. User B will check in a CID to the DVC repo which contains unnecessary data. --- dvc/config.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dvc/config.py b/dvc/config.py index 742894f858..6b09ca043a 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -216,10 +216,8 @@ class RelPath(str): "https": {**HTTP_COMMON, **REMOTE_COMMON}, "webdav": {**WEBDAV_COMMON, **REMOTE_COMMON}, "webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON}, - # TODO: ipfs_daemon_ip is not yet supported but may be useful. Add more options for authentication - # update this as well: https://dvc.org/doc/command-reference/remote/modify + # TODO: Update this: https://dvc.org/doc/command-reference/remote/modify "ipfs": { - "ipfs_daemon_ip": str, "mfs_path": str, **REMOTE_COMMON, }, From c89e5923b76d4ca20f8d06661c45e3490b3f15fb Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Thu, 12 Nov 2020 03:23:27 +0100 Subject: [PATCH 08/12] ipfs: implement download and exists --- dvc/cache/local.py | 6 ++++-- dvc/config.py | 5 +---- dvc/remote/base.py | 8 ++++++-- dvc/remote/ipfs.py | 41 +++++++++++++++++++++++++++++++++++++++-- dvc/tree/ipfs.py | 46 +++++++++++++++------------------------------- 5 files changed, 65 insertions(+), 41 deletions(-) diff --git a/dvc/cache/local.py b/dvc/cache/local.py index 9a8cd5ea5b..87254b3d07 100644 --- a/dvc/cache/local.py +++ b/dvc/cache/local.py @@ -316,6 +316,8 @@ def _process( status = STATUS_NEW desc = "Uploading" + remote.before_process(download) + if jobs is None: jobs = remote.tree.JOBS @@ -405,8 +407,8 @@ def _process( "'{}' nested files".format(dir_hash, len(file_hashes)) ) remote.index.update([dir_hash], file_hashes) - # notify remote of successful upload - remote.after_upload() + # notify remote of successful up / download + remote.after_process(download) return len(dir_plans[0]) + len(file_plans[0]) diff --git a/dvc/config.py b/dvc/config.py index 6b09ca043a..d1a3d125ec 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -217,10 +217,7 @@ class RelPath(str): "webdav": {**WEBDAV_COMMON, **REMOTE_COMMON}, "webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON}, # TODO: Update this: https://dvc.org/doc/command-reference/remote/modify - "ipfs": { - "mfs_path": str, - **REMOTE_COMMON, - }, + "ipfs": {"mfs_path": str, **REMOTE_COMMON}, "remote": {str: object}, # Any of the above options are valid } ) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index dac0562a60..21012eb5cb 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -138,8 +138,12 @@ def hashes_exist(self, hashes, jobs=None, name=None): ) return list(indexed_hashes) + list(hashes & set(remote_hashes)) - def after_upload(self): - """Will be called after all uploads were successful + def before_process(self, download=False): + """Will be called before an upload or download is attempted.""" + pass + + def after_process(self, download=False): + """Will be called after all uploads or downloads were successful Some remotes may need to do additional work after all uploads were processed. This method was originally implemented to allow the final hash calculation of the content addressed filesystem IPFS, but may also be diff --git a/dvc/remote/ipfs.py b/dvc/remote/ipfs.py index 6fa7b1e988..b188a8b196 100644 --- a/dvc/remote/ipfs.py +++ b/dvc/remote/ipfs.py @@ -3,19 +3,26 @@ from .base import Remote from ..config import Config from ..exceptions import DvcException +from ..progress import Tqdm logger = logging.getLogger(__name__) class IPFSRemote(Remote): - def after_upload(self): + def before_process(self, download=False): + """Make sure that the MFS is in the desired state""" + self._update_mfs_to_latest_cid() + + def after_process(self, download=False): """Calculate the final CID after a successful upload Changing files in our local MFS means that the content ID gets changed. After doing any modifications, we therefore need to update .dvc/config so it will always point to the latest content. Though we get a new CID, other users won't need to download everything again, since the existing files and subdirectories will keep their CID. - """ + """ + if download: + return path_info = self.tree.path_info old_cid = path_info.cid new_cid = self.tree.ipfs_client.files.stat(path_info.mfs_path)["Hash"] @@ -33,3 +40,33 @@ def after_upload(self): raise DvcException("Could not find ipfs config in .dvc/config") section["url"] = "ipfs://" + new_cid path_info.cid = new_cid + + def _update_mfs_to_latest_cid(self): + """ + This method makes sure that the hash of our working directory in the MFS matches the desired CID which + is defined in .dvc/config + + It should be called before executing download operations to make sure the content is available, and it + could be called before uploading new files to make sure no unwanted files get published. + """ + # doesn't change anything if the path already exists, but creates an empty directory if the path didn't exists + mfs = self.tree.path_info.mfs_path + files_api = self.tree.ipfs_client.files + cid = self.tree.path_info.cid + + files_api.mkdir(mfs, parents=True) + current_hash = files_api.stat(mfs)["Hash"] + if current_hash != cid: + logger.debug( + f"Updating IPFS MFS path {mfs} to CID {cid}" + ) + with Tqdm( + desc=f"Updating IPFS MFS at {mfs}. This may take a while. See progress at http://127.0.0.1:5001/webui" + ): + # "cp" does not like overwriting files, so delete everything beforehand + # Don't worry - IPFS still keeps a cache, so we won't actually downloading everything again + # Still should investigate when the IPFS cache gets cleared + files_api.rm(mfs, recursive=True) + # This single command makes IPFS download everything. Any chance to provide a useful progress bar? + # https://docs.ipfs.io/reference/http/api/#api-v0-files-cp + files_api.cp(f"/ipfs/{cid}", mfs) diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index 14d76349cc..63061188ca 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -14,14 +14,6 @@ logger = logging.getLogger(__name__) -# TODO: As long as we don't get the IPFS CID, we need to fake to be able to implement the rest -TMP_IPFS_CID_MAP = { - "/6c/9fb857427b459ebf0a363c9319d259": "QmNg6VqLGsAcgZ1bTMniA5CamqE1bCXYMHRkao4USHnqzv", - "/75/ee30f52010c1149d1e950b33d3adf5": "QmSgm31h9vfAn6ZKXCpVuysk45eucuG2fMoTn1dBEeRUzn", - "/51/93c4f0e82207a00e6596f679cbdb74": "Qmaz5yXazz6mjFY5575jhb9s9RVCzPY2AHyCYY2WgPmw3V", - "/ec/b3e4644128e3b3cf72e139ba2365c1.dir": "QmP7aqxACrAnbfFimjuciAf2pEYbHe5UQjWaZkJ6qo5j8c", -} - class IPFSPathInfo(_BasePath): @@ -86,20 +78,16 @@ def __del__(self): self.ipfs_client.close() def exists(self, path_info: PATH_CLS, use_dvcignore=True): - logger.debug(f"Checking if {path_info} exists") - # TODO: we need more information than the md5 path, since IPFS is only addressable via - # the sha256 hash of the desired file - # Dig deeper into https://docs.ipfs.io/concepts/content-addressing/#identifier-formats - # (uses sha-256, but there is some additional processing for the final Content Identifier (CID)) - ipfs_cid = TMP_IPFS_CID_MAP[path_info.path] - - # Is there a method that checks directly the existence of a pin? + self.ipfs_client.files.mkdir(path_info.mfs_path, parents=True) try: - self.ipfs_client.pin.ls(ipfs_cid) - except ipfshttpclient.exceptions.ErrorResponse: + self.ipfs_client.files.stat( + path_info.mfs_path + path_info.path + ) + except ipfshttpclient.exceptions.ErrorResponse as e: + if e.args[0] != "file does not exist": + raise e return False - else: - return True + return True def walk_files(self, path_info, **kwargs): logger.debug(f"Walking files in {path_info} (kwargs={kwargs})") @@ -128,14 +116,10 @@ def _download( name=None, no_progress_bar=False, ): - logger.debug(f"Download {from_info} to {to_file}") - # TODO: fake mapping from path to ipfs CID - ipfs_cid = TMP_IPFS_CID_MAP[from_info.path] - - # ipfs client downloads the file to the given directory and the filename is always the CID - # https://github.com/ipfs-shipyard/py-ipfs-http-client/issues/48 - # Workaround by saving it to the parent directory and renaming if afterwards to the DVC expected name - to_directory = Path(to_file).parent - # TODO: find a way to get notified about download process for progress bar - self.ipfs_client.get(ipfs_cid, to_directory) - (to_directory / ipfs_cid).rename(to_file) + logger.debug(f"Downloading {from_info} to {to_file}") + with open(to_file, "wb") as f: + f.write( + self.ipfs_client.files.read( + f"{from_info.mfs_path}/{from_info.path}" + ) + ) From 9af50879afc81a20de9332f9a5e191dc7199af86 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Thu, 12 Nov 2020 05:12:26 +0100 Subject: [PATCH 09/12] ipfs: implement walk_files --- dvc/cache/__init__.py | 2 ++ dvc/cache/base.py | 8 ++++++++ dvc/cache/local.py | 4 ++-- dvc/remote/base.py | 11 +++++++---- dvc/remote/ipfs.py | 6 +++--- dvc/repo/gc.py | 4 ++++ dvc/tree/ipfs.py | 45 +++++++++++++++++++++++++++++-------------- 7 files changed, 57 insertions(+), 23 deletions(-) diff --git a/dvc/cache/__init__.py b/dvc/cache/__init__.py index 888bb2ddd4..0e4435559d 100644 --- a/dvc/cache/__init__.py +++ b/dvc/cache/__init__.py @@ -81,6 +81,8 @@ def __init__(self, repo): ssh = _make_remote_property("ssh") hdfs = _make_remote_property("hdfs") azure = _make_remote_property("azure") + # TODO: don't yet understand this piece of code, do we need this for IPFS? + # ipfs = _make_remote_property("ipfs") class NamedCacheItem: diff --git a/dvc/cache/base.py b/dvc/cache/base.py index 29ad0e9db8..8257d0f245 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -730,3 +730,11 @@ def set_dir_info(self, hash_info): hash_info.dir_info = self._to_dict(self.get_dir_cache(hash_info)) hash_info.nfiles = len(hash_info.dir_info) + + # stub before_ and after_ methods because this class is used where remotes are used, + # but it is not inherited from a remote.base.Remote + def before_transfer(self, *args, **kwargs): + pass + + def after_transfer(self, *args, **kwargs): + pass diff --git a/dvc/cache/local.py b/dvc/cache/local.py index 87254b3d07..e8f1b85065 100644 --- a/dvc/cache/local.py +++ b/dvc/cache/local.py @@ -316,7 +316,7 @@ def _process( status = STATUS_NEW desc = "Uploading" - remote.before_process(download) + remote.before_transfer(download=download, upload=not download) if jobs is None: jobs = remote.tree.JOBS @@ -408,7 +408,7 @@ def _process( ) remote.index.update([dir_hash], file_hashes) # notify remote of successful up / download - remote.after_process(download) + remote.after_transfer(download=download, upload=not download) return len(dir_plans[0]) + len(file_plans[0]) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 21012eb5cb..36bc1724f9 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -138,12 +138,12 @@ def hashes_exist(self, hashes, jobs=None, name=None): ) return list(indexed_hashes) + list(hashes & set(remote_hashes)) - def before_process(self, download=False): - """Will be called before an upload or download is attempted.""" + def before_transfer(self, download=False, upload=False, gc=False): + """Will be called before an upload, download or garbage collect is attempted.""" pass - def after_process(self, download=False): - """Will be called after all uploads or downloads were successful + def after_transfer(self, download=False, upload=False, gc=False): + """Will be called after all uploads, downloads or garbage collect operations were successful Some remotes may need to do additional work after all uploads were processed. This method was originally implemented to allow the final hash calculation of the content addressed filesystem IPFS, but may also be @@ -160,6 +160,7 @@ def gc(cls, named_cache, remote, jobs=None): if tree.scheme != "": used.update(named_cache.scheme_keys(tree.scheme)) + remote.before_transfer(gc=True) removed = False # hashes must be sorted to ensure we always remove .dir files first for hash_ in sorted( @@ -179,4 +180,6 @@ def gc(cls, named_cache, remote, jobs=None): if removed and hasattr(remote, "index"): remote.index.clear() + if removed: + remote.after_transfer(gc=True) return removed diff --git a/dvc/remote/ipfs.py b/dvc/remote/ipfs.py index b188a8b196..195c9ba056 100644 --- a/dvc/remote/ipfs.py +++ b/dvc/remote/ipfs.py @@ -9,11 +9,11 @@ class IPFSRemote(Remote): - def before_process(self, download=False): + def before_transfer(self, download=False, upload=False, gc=False): """Make sure that the MFS is in the desired state""" self._update_mfs_to_latest_cid() - def after_process(self, download=False): + def after_transfer(self, download=False, upload=False, gc=False): """Calculate the final CID after a successful upload Changing files in our local MFS means that the content ID gets changed. After doing any modifications, @@ -21,7 +21,7 @@ def after_process(self, download=False): Though we get a new CID, other users won't need to download everything again, since the existing files and subdirectories will keep their CID. """ - if download: + if not (upload or gc): return path_info = self.tree.path_info old_cid = path_info.cid diff --git a/dvc/repo/gc.py b/dvc/repo/gc.py index 44a64e29b5..912bd60620 100644 --- a/dvc/repo/gc.py +++ b/dvc/repo/gc.py @@ -94,5 +94,9 @@ def gc( if self.cache.azure: _do_gc("azure", self.cache.azure, used, jobs) + # TODO: don't yet understand this piece of code, do we need this for IPFS? + # if self.cache.ipfs: + # _do_gc("ipfs", self.cache.ipfs, used, jobs) + if cloud: _do_gc("remote", self.cloud.get_remote(remote, "gc -c"), used, jobs) diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index 63061188ca..cc208c9a83 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -1,4 +1,6 @@ import logging +from collections import deque + from funcy import cached_property from pathlib import Path from typing import Optional @@ -42,6 +44,10 @@ def __init__(self, url, mfs_path=None): def url(self): return f"{self.scheme}://{self.cid}{self.path}" + @property + def mfs_file_path(self): + return self.mfs_path + self.path + def __div__(self, other): url = f"{self.scheme}://{self.cid}{self.path}/{other}" return IPFSPathInfo(url, self.mfs_path) @@ -89,17 +95,30 @@ def exists(self, path_info: PATH_CLS, use_dvcignore=True): return False return True - def walk_files(self, path_info, **kwargs): - logger.debug(f"Walking files in {path_info} (kwargs={kwargs})") - # TODO: walking a file path is not possible in IPFS. We could generate a directory listing with all content - # of our project. For example, this is a list of all xkcd comics until Comic #1862: - # https://ipfs.io/ipfs/QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm - # This would be possible to walk, but any change on any file generates a new CID. Therefore, we need to - # generate a new directory listing on every update and save that CID somewhere in our project. Not sure if - # this is still in scope of DVC. - # - # Therefore, we return an empty tree for now - return iter(()) + def walk_files(self, path_info: PATH_CLS, **kwargs): + dirs = deque([path_info]) + + while dirs: + dir_path = dirs.pop() + try: + entries = self.ipfs_client.files.ls(dir_path.mfs_file_path)['Entries'] + except ipfshttpclient.exceptions.ErrorResponse as e: + if e.args[0] != 'file does not exist': + raise e + continue + for entry in entries: + entry_path_info = dir_path / entry['Name'] + type_ = self.ipfs_client.files.stat(entry_path_info.mfs_file_path)['Type'] + if type_ == 'directory': + dirs.append(entry_path_info) + elif type_ == 'file': + logger.debug(entry_path_info.mfs_file_path) + yield entry_path_info + else: + raise DvcException(f"Unexpected file type ({type_}) in IPFS at {entry_path_info.mfs_file_path}") + + def path_to_hash(self, path): + return path.replace('/', '') def _upload(self, from_file, to_info, name=None, no_progress_bar=False): mfs_path = f"{self.path_info.mfs_path}/{to_info.path}" @@ -119,7 +138,5 @@ def _download( logger.debug(f"Downloading {from_info} to {to_file}") with open(to_file, "wb") as f: f.write( - self.ipfs_client.files.read( - f"{from_info.mfs_path}/{from_info.path}" - ) + self.ipfs_client.files.read(from_info.mfs_file_path) ) From 00ffe843dcba62a9f92f168119449c394bdb01b7 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Fri, 13 Nov 2020 01:00:51 +0100 Subject: [PATCH 10/12] ipfs: implement removing files, allows using dvc gc --- dvc/tree/ipfs.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index cc208c9a83..2f079c4a6e 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -46,6 +46,7 @@ def url(self): @property def mfs_file_path(self): + """Full filepath inside the mfs""" return self.mfs_path + self.path def __div__(self, other): @@ -95,6 +96,10 @@ def exists(self, path_info: PATH_CLS, use_dvcignore=True): return False return True + def remove(self, path_info: PATH_CLS): + logger.debug(f"Removing {path_info} from MFS") + self.ipfs_client.files.rm(path_info.mfs_file_path, recursive=True) + def walk_files(self, path_info: PATH_CLS, **kwargs): dirs = deque([path_info]) @@ -112,7 +117,6 @@ def walk_files(self, path_info: PATH_CLS, **kwargs): if type_ == 'directory': dirs.append(entry_path_info) elif type_ == 'file': - logger.debug(entry_path_info.mfs_file_path) yield entry_path_info else: raise DvcException(f"Unexpected file type ({type_}) in IPFS at {entry_path_info.mfs_file_path}") From 6680e7cfed2099884f5e9fb4fd31f367fc6e24c7 Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Fri, 13 Nov 2020 01:34:55 +0100 Subject: [PATCH 11/12] ipfs: stylefixes --- dvc/cache/base.py | 4 ++-- dvc/config.py | 3 ++- dvc/remote/__init__.py | 2 +- dvc/remote/base.py | 20 ++++++++++++---- dvc/remote/ipfs.py | 41 ++++++++++++++++++-------------- dvc/tree/__init__.py | 2 +- dvc/tree/ipfs.py | 53 ++++++++++++++++++++++++------------------ 7 files changed, 74 insertions(+), 51 deletions(-) diff --git a/dvc/cache/base.py b/dvc/cache/base.py index 8257d0f245..932a9e505d 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -731,8 +731,8 @@ def set_dir_info(self, hash_info): hash_info.dir_info = self._to_dict(self.get_dir_cache(hash_info)) hash_info.nfiles = len(hash_info.dir_info) - # stub before_ and after_ methods because this class is used where remotes are used, - # but it is not inherited from a remote.base.Remote + # stub before_ and after_ methods because this class is used where + # remotes are used, but it is not inherited from a remote.base.Remote def before_transfer(self, *args, **kwargs): pass diff --git a/dvc/config.py b/dvc/config.py index d1a3d125ec..771c8c332e 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -216,7 +216,8 @@ class RelPath(str): "https": {**HTTP_COMMON, **REMOTE_COMMON}, "webdav": {**WEBDAV_COMMON, **REMOTE_COMMON}, "webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON}, - # TODO: Update this: https://dvc.org/doc/command-reference/remote/modify + # TODO: Update documentation: + # https://dvc.org/doc/command-reference/remote/modify "ipfs": {"mfs_path": str, **REMOTE_COMMON}, "remote": {str: object}, # Any of the above options are valid } diff --git a/dvc/remote/__init__.py b/dvc/remote/__init__.py index 680b6309d1..78005bd793 100644 --- a/dvc/remote/__init__.py +++ b/dvc/remote/__init__.py @@ -1,6 +1,6 @@ -from .ipfs import IPFSRemote from ..tree import get_cloud_tree from .base import Remote +from .ipfs import IPFSRemote from .local import LocalRemote from .ssh import SSHRemote diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 36bc1724f9..b4965ba299 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -139,15 +139,25 @@ def hashes_exist(self, hashes, jobs=None, name=None): return list(indexed_hashes) + list(hashes & set(remote_hashes)) def before_transfer(self, download=False, upload=False, gc=False): - """Will be called before an upload, download or garbage collect is attempted.""" + """Hook before work is done + + Will be called before an upload, download or garbage collect + operation is attempted. Originally implemented to allow the + content addressed filesystem IPFS to copy the desired content + in it's MFS (mutable filesystem). + """ pass def after_transfer(self, download=False, upload=False, gc=False): - """Will be called after all uploads, downloads or garbage collect operations were successful + """Hook after work has been done + + Will be called after all uploads, downloads or garbage collect + operations were successful. - Some remotes may need to do additional work after all uploads were processed. This method was originally - implemented to allow the final hash calculation of the content addressed filesystem IPFS, but may also be - useful to trigger further post-upload actions. + Some remotes may need to do additional work after all uploads were + processed. This method was originally implemented to allow the final + hash calculation of the content addressed filesystem IPFS, but may + also be useful to trigger further post-upload actions. """ pass diff --git a/dvc/remote/ipfs.py b/dvc/remote/ipfs.py index 195c9ba056..cf46d8f7e8 100644 --- a/dvc/remote/ipfs.py +++ b/dvc/remote/ipfs.py @@ -1,9 +1,9 @@ import logging -from .base import Remote from ..config import Config from ..exceptions import DvcException from ..progress import Tqdm +from .base import Remote logger = logging.getLogger(__name__) @@ -16,10 +16,11 @@ def before_transfer(self, download=False, upload=False, gc=False): def after_transfer(self, download=False, upload=False, gc=False): """Calculate the final CID after a successful upload - Changing files in our local MFS means that the content ID gets changed. After doing any modifications, - we therefore need to update .dvc/config so it will always point to the latest content. - Though we get a new CID, other users won't need to download everything again, since the existing files - and subdirectories will keep their CID. + Changing files in our local MFS means that the content ID gets changed. + After doing any modifications, we therefore need to update .dvc/config + so it will always point to the latest content. + Though we get a new CID, other users won't need to download everything + again, since the existing files and subdirectories will keep their CID. """ if not (upload or gc): return @@ -43,30 +44,34 @@ def after_transfer(self, download=False, upload=False, gc=False): def _update_mfs_to_latest_cid(self): """ - This method makes sure that the hash of our working directory in the MFS matches the desired CID which - is defined in .dvc/config + This method makes sure that the hash of our working directory in the + MFS matches the desired CID which is defined in .dvc/config - It should be called before executing download operations to make sure the content is available, and it - could be called before uploading new files to make sure no unwanted files get published. + It should be called before executing download operations to make sure + the content is available, and it should be called before uploading + new files to make sure no unwanted files get published. """ - # doesn't change anything if the path already exists, but creates an empty directory if the path didn't exists mfs = self.tree.path_info.mfs_path files_api = self.tree.ipfs_client.files cid = self.tree.path_info.cid + # doesn't change anything if the path already exists, but creates an + # empty directory if the path didn't exists, saving us from exceptions files_api.mkdir(mfs, parents=True) + current_hash = files_api.stat(mfs)["Hash"] if current_hash != cid: - logger.debug( - f"Updating IPFS MFS path {mfs} to CID {cid}" - ) + logger.debug(f"Updating IPFS MFS path {mfs} to CID {cid}") with Tqdm( - desc=f"Updating IPFS MFS at {mfs}. This may take a while. See progress at http://127.0.0.1:5001/webui" + desc=f"Updating IPFS MFS at {mfs}. This may take a while. " + f"See progress at http://127.0.0.1:5001/webui" ): - # "cp" does not like overwriting files, so delete everything beforehand - # Don't worry - IPFS still keeps a cache, so we won't actually downloading everything again - # Still should investigate when the IPFS cache gets cleared + # "cp" does not like overwriting files, so delete everything + # beforehand + # don't worry - IPFS still keeps a cache, so we won't actually + # downloading everything again files_api.rm(mfs, recursive=True) - # This single command makes IPFS download everything. Any chance to provide a useful progress bar? + # this single command makes IPFS download everything. + # any chance to provide a useful progress bar? # https://docs.ipfs.io/reference/http/api/#api-v0-files-cp files_api.cp(f"/ipfs/{cid}", mfs) diff --git a/dvc/tree/__init__.py b/dvc/tree/__init__.py index c13e56cf30..1052813bfd 100644 --- a/dvc/tree/__init__.py +++ b/dvc/tree/__init__.py @@ -8,13 +8,13 @@ from .hdfs import HDFSTree from .http import HTTPTree from .https import HTTPSTree +from .ipfs import IPFSTree from .local import LocalTree from .oss import OSSTree from .s3 import S3Tree from .ssh import SSHTree from .webdav import WebDAVTree from .webdavs import WebDAVSTree -from .ipfs import IPFSTree logger = logging.getLogger(__name__) diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index 2f079c4a6e..9f04d328ac 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -1,25 +1,25 @@ import logging from collections import deque - -from funcy import cached_property from pathlib import Path from typing import Optional from urllib.parse import urlparse import ipfshttpclient +from funcy import cached_property -from .base import BaseTree from ..config import Config from ..exceptions import DvcException from ..path_info import _BasePath from ..scheme import Schemes +from .base import BaseTree logger = logging.getLogger(__name__) class IPFSPathInfo(_BasePath): - # This is the content id of an empty directory. It will be used when the user doesn't provide a CID + # this is the content id of an empty directory + # it will be used when the user doesn't provide a CID CID_EMPTY_DIRECTORY = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn" def __init__(self, url, mfs_path=None): @@ -32,11 +32,13 @@ def __init__(self, url, mfs_path=None): mfs_path.rstrip("/") if mfs_path else f"/dvc/{project_dir_name}" ) if not self.mfs_path: - # if mfs_path was a /, it was removed by .rstrip(). It will also clutter / if it would actually be used, + # if mfs_path was a /, it was removed by .rstrip(). + # it will also clutter root directory if it would actually be used, # so just disallow it raise DvcException( "You may not use / as your IPFS MFS path. " - "Choose another with `dvc remote modify mfs_path `" + "Choose another with " + "`dvc remote modify mfs_path `" ) self.scheme = p.scheme @@ -75,9 +77,10 @@ def __init__(self, repo, config): self.ipfs_client = ipfshttpclient.connect(session=True) except ipfshttpclient.exceptions.VersionMismatch as e: raise DvcException(f"Unsupported IPFS daemon ({e})") from e - except ipfshttpclient.exceptions.ConnectionError as e: + except ipfshttpclient.exceptions.ConnectionError: raise DvcException( - "Could not connect to ipfs daemon. Install ipfs on your machine and run `ipfs daemon`" + "Could not connect to ipfs daemon. " + "Install ipfs on your machine and run `ipfs daemon`" ) def __del__(self): @@ -87,9 +90,7 @@ def __del__(self): def exists(self, path_info: PATH_CLS, use_dvcignore=True): self.ipfs_client.files.mkdir(path_info.mfs_path, parents=True) try: - self.ipfs_client.files.stat( - path_info.mfs_path + path_info.path - ) + self.ipfs_client.files.stat(path_info.mfs_path + path_info.path) except ipfshttpclient.exceptions.ErrorResponse as e: if e.args[0] != "file does not exist": raise e @@ -106,28 +107,36 @@ def walk_files(self, path_info: PATH_CLS, **kwargs): while dirs: dir_path = dirs.pop() try: - entries = self.ipfs_client.files.ls(dir_path.mfs_file_path)['Entries'] + entries = self.ipfs_client.files.ls(dir_path.mfs_file_path)[ + "Entries" + ] except ipfshttpclient.exceptions.ErrorResponse as e: - if e.args[0] != 'file does not exist': + if e.args[0] != "file does not exist": raise e continue for entry in entries: - entry_path_info = dir_path / entry['Name'] - type_ = self.ipfs_client.files.stat(entry_path_info.mfs_file_path)['Type'] - if type_ == 'directory': + entry_path_info = dir_path / entry["Name"] + type_ = self.ipfs_client.files.stat( + entry_path_info.mfs_file_path + )["Type"] + if type_ == "directory": dirs.append(entry_path_info) - elif type_ == 'file': + elif type_ == "file": yield entry_path_info else: - raise DvcException(f"Unexpected file type ({type_}) in IPFS at {entry_path_info.mfs_file_path}") + raise DvcException( + f"Unexpected file type ({type_}) " + f"in IPFS at {entry_path_info.mfs_file_path}" + ) def path_to_hash(self, path): - return path.replace('/', '') + return path.replace("/", "") def _upload(self, from_file, to_info, name=None, no_progress_bar=False): mfs_path = f"{self.path_info.mfs_path}/{to_info.path}" with open(from_file, "rb") as f: - # "parents" might get a kwarg in future versions of py-ipfs-http-client? If so, change the opts param here + # "parents" might get a kwarg in future versions of + # py-ipfs-http-client? If so, change the opts param here self.ipfs_client.files.write( mfs_path, f, create=True, opts={"parents": True} ) @@ -141,6 +150,4 @@ def _download( ): logger.debug(f"Downloading {from_info} to {to_file}") with open(to_file, "wb") as f: - f.write( - self.ipfs_client.files.read(from_info.mfs_file_path) - ) + f.write(self.ipfs_client.files.read(from_info.mfs_file_path)) From e9442b830a2aec9d89f2ac80eda7ef7156c0dc8f Mon Sep 17 00:00:00 2001 From: Soeren Wegener Date: Fri, 13 Nov 2020 02:49:14 +0100 Subject: [PATCH 12/12] ipfs: implement progress bar for up- and download --- dvc/remote/base.py | 2 -- dvc/tree/ipfs.py | 40 ++++++++++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index b4965ba299..3bc943c483 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -146,7 +146,6 @@ def before_transfer(self, download=False, upload=False, gc=False): content addressed filesystem IPFS to copy the desired content in it's MFS (mutable filesystem). """ - pass def after_transfer(self, download=False, upload=False, gc=False): """Hook after work has been done @@ -159,7 +158,6 @@ def after_transfer(self, download=False, upload=False, gc=False): hash calculation of the content addressed filesystem IPFS, but may also be useful to trigger further post-upload actions. """ - pass @classmethod @index_locked diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py index 9f04d328ac..0e492efca5 100644 --- a/dvc/tree/ipfs.py +++ b/dvc/tree/ipfs.py @@ -1,4 +1,5 @@ import logging +import os from collections import deque from pathlib import Path from typing import Optional @@ -10,6 +11,7 @@ from ..config import Config from ..exceptions import DvcException from ..path_info import _BasePath +from ..progress import Tqdm from ..scheme import Schemes from .base import BaseTree @@ -97,6 +99,10 @@ def exists(self, path_info: PATH_CLS, use_dvcignore=True): return False return True + def get_file_hash(self, path_info: PATH_CLS): + # TODO + pass + def remove(self, path_info: PATH_CLS): logger.debug(f"Removing {path_info} from MFS") self.ipfs_client.files.rm(path_info.mfs_file_path, recursive=True) @@ -107,9 +113,8 @@ def walk_files(self, path_info: PATH_CLS, **kwargs): while dirs: dir_path = dirs.pop() try: - entries = self.ipfs_client.files.ls(dir_path.mfs_file_path)[ - "Entries" - ] + stat = self.ipfs_client.files.ls(dir_path.mfs_file_path) + entries = stat["Entries"] or [] except ipfshttpclient.exceptions.ErrorResponse as e: if e.args[0] != "file does not exist": raise e @@ -135,11 +140,18 @@ def path_to_hash(self, path): def _upload(self, from_file, to_info, name=None, no_progress_bar=False): mfs_path = f"{self.path_info.mfs_path}/{to_info.path}" with open(from_file, "rb") as f: - # "parents" might get a kwarg in future versions of - # py-ipfs-http-client? If so, change the opts param here - self.ipfs_client.files.write( - mfs_path, f, create=True, opts={"parents": True} - ) + with Tqdm.wrapattr( + f, + "read", + desc=name, + total=os.path.getsize(from_file), + disable=no_progress_bar, + ) as wrapped_f: + # "parents" might get a kwarg in future versions of + # py-ipfs-http-client? If so, change the opts param here + self.ipfs_client.files.write( + mfs_path, wrapped_f, create=True, opts={"parents": True} + ) def _download( self, @@ -148,6 +160,14 @@ def _download( name=None, no_progress_bar=False, ): - logger.debug(f"Downloading {from_info} to {to_file}") + from_file = from_info.mfs_file_path with open(to_file, "wb") as f: - f.write(self.ipfs_client.files.read(from_info.mfs_file_path)) + # there is "Size" and "CumulativeSize". Both do not reflect + # filesize after download, not sure which one is more accurate + # https://docs.ipfs.io/reference/http/api/#api-v0-files-stat + size = self.ipfs_client.files.stat(from_file)["CumulativeSize"] + s = self.ipfs_client.files.read(from_file, stream=True) + with Tqdm(s, desc=name, total=size, disable=no_progress_bar,) as t: + for chunk in s: + f.write(chunk) + t.update(len(chunk))