diff --git a/dvc/cache/__init__.py b/dvc/cache/__init__.py index 888bb2ddd4..0e4435559d 100644 --- a/dvc/cache/__init__.py +++ b/dvc/cache/__init__.py @@ -81,6 +81,8 @@ def __init__(self, repo): ssh = _make_remote_property("ssh") hdfs = _make_remote_property("hdfs") azure = _make_remote_property("azure") + # TODO: don't yet understand this piece of code, do we need this for IPFS? + # ipfs = _make_remote_property("ipfs") class NamedCacheItem: diff --git a/dvc/cache/base.py b/dvc/cache/base.py index 29ad0e9db8..932a9e505d 100644 --- a/dvc/cache/base.py +++ b/dvc/cache/base.py @@ -730,3 +730,11 @@ def set_dir_info(self, hash_info): hash_info.dir_info = self._to_dict(self.get_dir_cache(hash_info)) hash_info.nfiles = len(hash_info.dir_info) + + # stub before_ and after_ methods because this class is used where + # remotes are used, but it is not inherited from a remote.base.Remote + def before_transfer(self, *args, **kwargs): + pass + + def after_transfer(self, *args, **kwargs): + pass diff --git a/dvc/cache/local.py b/dvc/cache/local.py index 547c890895..e8f1b85065 100644 --- a/dvc/cache/local.py +++ b/dvc/cache/local.py @@ -316,6 +316,8 @@ def _process( status = STATUS_NEW desc = "Uploading" + remote.before_transfer(download=download, upload=not download) + if jobs is None: jobs = remote.tree.JOBS @@ -405,6 +407,8 @@ def _process( "'{}' nested files".format(dir_hash, len(file_hashes)) ) remote.index.update([dir_hash], file_hashes) + # notify remote of successful up / download + remote.after_transfer(download=download, upload=not download) return len(dir_plans[0]) + len(file_plans[0]) diff --git a/dvc/config.py b/dvc/config.py index 090d065cd9..771c8c332e 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -216,6 +216,9 @@ class RelPath(str): "https": {**HTTP_COMMON, **REMOTE_COMMON}, "webdav": {**WEBDAV_COMMON, **REMOTE_COMMON}, "webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON}, + # TODO: Update documentation: + # https://dvc.org/doc/command-reference/remote/modify + "ipfs": {"mfs_path": str, **REMOTE_COMMON}, "remote": {str: object}, # Any of the above options are valid } ) diff --git a/dvc/remote/__init__.py b/dvc/remote/__init__.py index 0c622d4b40..78005bd793 100644 --- a/dvc/remote/__init__.py +++ b/dvc/remote/__init__.py @@ -1,5 +1,6 @@ from ..tree import get_cloud_tree from .base import Remote +from .ipfs import IPFSRemote from .local import LocalRemote from .ssh import SSHRemote @@ -10,4 +11,6 @@ def get_remote(repo, **kwargs): return LocalRemote(tree) if tree.scheme == "ssh": return SSHRemote(tree) + if tree.scheme == "ipfs": + return IPFSRemote(tree) return Remote(tree) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 8b2bd90943..3bc943c483 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -138,6 +138,27 @@ def hashes_exist(self, hashes, jobs=None, name=None): ) return list(indexed_hashes) + list(hashes & set(remote_hashes)) + def before_transfer(self, download=False, upload=False, gc=False): + """Hook before work is done + + Will be called before an upload, download or garbage collect + operation is attempted. Originally implemented to allow the + content addressed filesystem IPFS to copy the desired content + in it's MFS (mutable filesystem). + """ + + def after_transfer(self, download=False, upload=False, gc=False): + """Hook after work has been done + + Will be called after all uploads, downloads or garbage collect + operations were successful. + + Some remotes may need to do additional work after all uploads were + processed. This method was originally implemented to allow the final + hash calculation of the content addressed filesystem IPFS, but may + also be useful to trigger further post-upload actions. + """ + @classmethod @index_locked def gc(cls, named_cache, remote, jobs=None): @@ -147,6 +168,7 @@ def gc(cls, named_cache, remote, jobs=None): if tree.scheme != "": used.update(named_cache.scheme_keys(tree.scheme)) + remote.before_transfer(gc=True) removed = False # hashes must be sorted to ensure we always remove .dir files first for hash_ in sorted( @@ -166,4 +188,6 @@ def gc(cls, named_cache, remote, jobs=None): if removed and hasattr(remote, "index"): remote.index.clear() + if removed: + remote.after_transfer(gc=True) return removed diff --git a/dvc/remote/ipfs.py b/dvc/remote/ipfs.py new file mode 100644 index 0000000000..cf46d8f7e8 --- /dev/null +++ b/dvc/remote/ipfs.py @@ -0,0 +1,77 @@ +import logging + +from ..config import Config +from ..exceptions import DvcException +from ..progress import Tqdm +from .base import Remote + +logger = logging.getLogger(__name__) + + +class IPFSRemote(Remote): + def before_transfer(self, download=False, upload=False, gc=False): + """Make sure that the MFS is in the desired state""" + self._update_mfs_to_latest_cid() + + def after_transfer(self, download=False, upload=False, gc=False): + """Calculate the final CID after a successful upload + + Changing files in our local MFS means that the content ID gets changed. + After doing any modifications, we therefore need to update .dvc/config + so it will always point to the latest content. + Though we get a new CID, other users won't need to download everything + again, since the existing files and subdirectories will keep their CID. + """ + if not (upload or gc): + return + path_info = self.tree.path_info + old_cid = path_info.cid + new_cid = self.tree.ipfs_client.files.stat(path_info.mfs_path)["Hash"] + logger.debug(f"Saving new CID ipfs://{new_cid}") + with Config().edit("repo") as repo_config: + section = None + for v in repo_config["remote"].values(): + url = v.get("url") + if url == "ipfs://": + url = f"ipfs://{path_info.CID_EMPTY_DIRECTORY}" + if url == "ipfs://" + old_cid: + section = v + break + if not section: + raise DvcException("Could not find ipfs config in .dvc/config") + section["url"] = "ipfs://" + new_cid + path_info.cid = new_cid + + def _update_mfs_to_latest_cid(self): + """ + This method makes sure that the hash of our working directory in the + MFS matches the desired CID which is defined in .dvc/config + + It should be called before executing download operations to make sure + the content is available, and it should be called before uploading + new files to make sure no unwanted files get published. + """ + mfs = self.tree.path_info.mfs_path + files_api = self.tree.ipfs_client.files + cid = self.tree.path_info.cid + + # doesn't change anything if the path already exists, but creates an + # empty directory if the path didn't exists, saving us from exceptions + files_api.mkdir(mfs, parents=True) + + current_hash = files_api.stat(mfs)["Hash"] + if current_hash != cid: + logger.debug(f"Updating IPFS MFS path {mfs} to CID {cid}") + with Tqdm( + desc=f"Updating IPFS MFS at {mfs}. This may take a while. " + f"See progress at http://127.0.0.1:5001/webui" + ): + # "cp" does not like overwriting files, so delete everything + # beforehand + # don't worry - IPFS still keeps a cache, so we won't actually + # downloading everything again + files_api.rm(mfs, recursive=True) + # this single command makes IPFS download everything. + # any chance to provide a useful progress bar? + # https://docs.ipfs.io/reference/http/api/#api-v0-files-cp + files_api.cp(f"/ipfs/{cid}", mfs) diff --git a/dvc/repo/gc.py b/dvc/repo/gc.py index 44a64e29b5..912bd60620 100644 --- a/dvc/repo/gc.py +++ b/dvc/repo/gc.py @@ -94,5 +94,9 @@ def gc( if self.cache.azure: _do_gc("azure", self.cache.azure, used, jobs) + # TODO: don't yet understand this piece of code, do we need this for IPFS? + # if self.cache.ipfs: + # _do_gc("ipfs", self.cache.ipfs, used, jobs) + if cloud: _do_gc("remote", self.cloud.get_remote(remote, "gc -c"), used, jobs) diff --git a/dvc/scheme.py b/dvc/scheme.py index 76c6d7a497..6cdec835ba 100644 --- a/dvc/scheme.py +++ b/dvc/scheme.py @@ -11,3 +11,4 @@ class Schemes: OSS = "oss" WEBDAV = "webdav" WEBDAVS = "webdavs" + IPFS = "ipfs" diff --git a/dvc/tree/__init__.py b/dvc/tree/__init__.py index cbd1595a69..1052813bfd 100644 --- a/dvc/tree/__init__.py +++ b/dvc/tree/__init__.py @@ -1,3 +1,4 @@ +import logging import posixpath from urllib.parse import urlparse @@ -7,6 +8,7 @@ from .hdfs import HDFSTree from .http import HTTPTree from .https import HTTPSTree +from .ipfs import IPFSTree from .local import LocalTree from .oss import OSSTree from .s3 import S3Tree @@ -14,6 +16,8 @@ from .webdav import WebDAVTree from .webdavs import WebDAVSTree +logger = logging.getLogger(__name__) + TREES = [ AzureTree, GDriveTree, @@ -26,6 +30,7 @@ OSSTree, WebDAVTree, WebDAVSTree, + IPFSTree, # NOTE: LocalTree is the default ] diff --git a/dvc/tree/ipfs.py b/dvc/tree/ipfs.py new file mode 100644 index 0000000000..0e492efca5 --- /dev/null +++ b/dvc/tree/ipfs.py @@ -0,0 +1,173 @@ +import logging +import os +from collections import deque +from pathlib import Path +from typing import Optional +from urllib.parse import urlparse + +import ipfshttpclient +from funcy import cached_property + +from ..config import Config +from ..exceptions import DvcException +from ..path_info import _BasePath +from ..progress import Tqdm +from ..scheme import Schemes +from .base import BaseTree + +logger = logging.getLogger(__name__) + + +class IPFSPathInfo(_BasePath): + + # this is the content id of an empty directory + # it will be used when the user doesn't provide a CID + CID_EMPTY_DIRECTORY = "QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn" + + def __init__(self, url, mfs_path=None): + p = urlparse(url) + self.cid = p.netloc or IPFSPathInfo.CID_EMPTY_DIRECTORY + self.path = p.path.rstrip("/") + # Get the name of the project directory to provide a sane default + project_dir_name = Path(Config().dvc_dir).parent.name + self.mfs_path = ( + mfs_path.rstrip("/") if mfs_path else f"/dvc/{project_dir_name}" + ) + if not self.mfs_path: + # if mfs_path was a /, it was removed by .rstrip(). + # it will also clutter root directory if it would actually be used, + # so just disallow it + raise DvcException( + "You may not use / as your IPFS MFS path. " + "Choose another with " + "`dvc remote modify mfs_path `" + ) + self.scheme = p.scheme + + @cached_property + def url(self): + return f"{self.scheme}://{self.cid}{self.path}" + + @property + def mfs_file_path(self): + """Full filepath inside the mfs""" + return self.mfs_path + self.path + + def __div__(self, other): + url = f"{self.scheme}://{self.cid}{self.path}/{other}" + return IPFSPathInfo(url, self.mfs_path) + + def __str__(self): + return self.mfs_path + self.path + + __truediv__ = __div__ + + +class IPFSTree(BaseTree): + scheme = Schemes.IPFS + PATH_CLS = IPFSPathInfo + REQUIRES = {"ipfshttpclient": "ipfshttpclient"} + + def __init__(self, repo, config): + super().__init__(repo, config) + self.ipfs_client: Optional[ipfshttpclient.Client] = None + self.path_info = IPFSTree.PATH_CLS( + config["url"], config.get("mfs_path") + ) + try: + # TODO: support remote IPFS daemons with credentials + self.ipfs_client = ipfshttpclient.connect(session=True) + except ipfshttpclient.exceptions.VersionMismatch as e: + raise DvcException(f"Unsupported IPFS daemon ({e})") from e + except ipfshttpclient.exceptions.ConnectionError: + raise DvcException( + "Could not connect to ipfs daemon. " + "Install ipfs on your machine and run `ipfs daemon`" + ) + + def __del__(self): + if self.ipfs_client is not None: + self.ipfs_client.close() + + def exists(self, path_info: PATH_CLS, use_dvcignore=True): + self.ipfs_client.files.mkdir(path_info.mfs_path, parents=True) + try: + self.ipfs_client.files.stat(path_info.mfs_path + path_info.path) + except ipfshttpclient.exceptions.ErrorResponse as e: + if e.args[0] != "file does not exist": + raise e + return False + return True + + def get_file_hash(self, path_info: PATH_CLS): + # TODO + pass + + def remove(self, path_info: PATH_CLS): + logger.debug(f"Removing {path_info} from MFS") + self.ipfs_client.files.rm(path_info.mfs_file_path, recursive=True) + + def walk_files(self, path_info: PATH_CLS, **kwargs): + dirs = deque([path_info]) + + while dirs: + dir_path = dirs.pop() + try: + stat = self.ipfs_client.files.ls(dir_path.mfs_file_path) + entries = stat["Entries"] or [] + except ipfshttpclient.exceptions.ErrorResponse as e: + if e.args[0] != "file does not exist": + raise e + continue + for entry in entries: + entry_path_info = dir_path / entry["Name"] + type_ = self.ipfs_client.files.stat( + entry_path_info.mfs_file_path + )["Type"] + if type_ == "directory": + dirs.append(entry_path_info) + elif type_ == "file": + yield entry_path_info + else: + raise DvcException( + f"Unexpected file type ({type_}) " + f"in IPFS at {entry_path_info.mfs_file_path}" + ) + + def path_to_hash(self, path): + return path.replace("/", "") + + def _upload(self, from_file, to_info, name=None, no_progress_bar=False): + mfs_path = f"{self.path_info.mfs_path}/{to_info.path}" + with open(from_file, "rb") as f: + with Tqdm.wrapattr( + f, + "read", + desc=name, + total=os.path.getsize(from_file), + disable=no_progress_bar, + ) as wrapped_f: + # "parents" might get a kwarg in future versions of + # py-ipfs-http-client? If so, change the opts param here + self.ipfs_client.files.write( + mfs_path, wrapped_f, create=True, opts={"parents": True} + ) + + def _download( + self, + from_info: PATH_CLS, + to_file: str, + name=None, + no_progress_bar=False, + ): + from_file = from_info.mfs_file_path + with open(to_file, "wb") as f: + # there is "Size" and "CumulativeSize". Both do not reflect + # filesize after download, not sure which one is more accurate + # https://docs.ipfs.io/reference/http/api/#api-v0-files-stat + size = self.ipfs_client.files.stat(from_file)["CumulativeSize"] + s = self.ipfs_client.files.read(from_file, stream=True) + with Tqdm(s, desc=name, total=size, disable=no_progress_bar,) as t: + for chunk in s: + f.write(chunk) + t.update(len(chunk)) diff --git a/setup.py b/setup.py index c57ecb93fb..2bc2f4cbd8 100644 --- a/setup.py +++ b/setup.py @@ -98,12 +98,14 @@ def run(self): # Remove the env marker if/when pyarrow is available for Python3.9 hdfs = ["pyarrow>=2.0.0; python_version < '3.9'"] webdav = ["webdavclient3>=3.14.5"] +# TODO: update ipfshttpclient to stable when 0.7.0 has been released +ipfs = ["ipfshttpclient==0.7.0a1"] # gssapi should not be included in all_remotes, because it doesn't have wheels # for linux and mac, so it will fail to compile if user doesn't have all the # requirements, including kerberos itself. Once all the wheels are available, # we can start shipping it by default. ssh_gssapi = ["paramiko[invoke,gssapi]>=2.7.0"] -all_remotes = gs + s3 + azure + ssh + oss + gdrive + hdfs + webdav +all_remotes = gs + s3 + azure + ssh + oss + gdrive + hdfs + webdav + ipfs # Extra dependecies to run tests tests_requirements = [ @@ -166,6 +168,7 @@ def run(self): "ssh_gssapi": ssh_gssapi, "hdfs": hdfs, "webdav": webdav, + "ipfs": ipfs, "tests": tests_requirements, }, keywords="data-science data-version-control machine-learning git" diff --git a/tests/unit/tree/test_ipfs.py b/tests/unit/tree/test_ipfs.py new file mode 100644 index 0000000000..c7a056d850 --- /dev/null +++ b/tests/unit/tree/test_ipfs.py @@ -0,0 +1,9 @@ +from dvc.tree import IPFSTree + + +def test_init(dvc): + url = "ipfs://TODO" + config = {"url": url} + tree = IPFSTree(dvc, config) + + assert tree.path_info == url