Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dvc/cache/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def __init__(self, repo):
ssh = _make_remote_property("ssh")
hdfs = _make_remote_property("hdfs")
azure = _make_remote_property("azure")
# TODO: don't yet understand this piece of code, do we need this for IPFS?
# ipfs = _make_remote_property("ipfs")


class NamedCacheItem:
Expand Down
8 changes: 8 additions & 0 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,11 @@ def set_dir_info(self, hash_info):

hash_info.dir_info = self._to_dict(self.get_dir_cache(hash_info))
hash_info.nfiles = len(hash_info.dir_info)

# stub before_ and after_ methods because this class is used where
# remotes are used, but it is not inherited from a remote.base.Remote
def before_transfer(self, *args, **kwargs):
pass

def after_transfer(self, *args, **kwargs):
pass
4 changes: 4 additions & 0 deletions dvc/cache/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ def _process(
status = STATUS_NEW
desc = "Uploading"

remote.before_transfer(download=download, upload=not download)

if jobs is None:
jobs = remote.tree.JOBS

Expand Down Expand Up @@ -405,6 +407,8 @@ def _process(
"'{}' nested files".format(dir_hash, len(file_hashes))
)
remote.index.update([dir_hash], file_hashes)
# notify remote of successful up / download
remote.after_transfer(download=download, upload=not download)

return len(dir_plans[0]) + len(file_plans[0])

Expand Down
3 changes: 3 additions & 0 deletions dvc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class RelPath(str):
"https": {**HTTP_COMMON, **REMOTE_COMMON},
"webdav": {**WEBDAV_COMMON, **REMOTE_COMMON},
"webdavs": {**WEBDAV_COMMON, **REMOTE_COMMON},
# TODO: Update documentation:
# https://dvc.org/doc/command-reference/remote/modify
"ipfs": {"mfs_path": str, **REMOTE_COMMON},
"remote": {str: object}, # Any of the above options are valid
}
)
Expand Down
3 changes: 3 additions & 0 deletions dvc/remote/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ..tree import get_cloud_tree
from .base import Remote
from .ipfs import IPFSRemote
from .local import LocalRemote
from .ssh import SSHRemote

Expand All @@ -10,4 +11,6 @@ def get_remote(repo, **kwargs):
return LocalRemote(tree)
if tree.scheme == "ssh":
return SSHRemote(tree)
if tree.scheme == "ipfs":
return IPFSRemote(tree)
return Remote(tree)
24 changes: 24 additions & 0 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ def hashes_exist(self, hashes, jobs=None, name=None):
)
return list(indexed_hashes) + list(hashes & set(remote_hashes))

def before_transfer(self, download=False, upload=False, gc=False):
"""Hook before work is done

Will be called before an upload, download or garbage collect
operation is attempted. Originally implemented to allow the
content addressed filesystem IPFS to copy the desired content
in it's MFS (mutable filesystem).
"""

def after_transfer(self, download=False, upload=False, gc=False):
"""Hook after work has been done

Will be called after all uploads, downloads or garbage collect
operations were successful.

Some remotes may need to do additional work after all uploads were
processed. This method was originally implemented to allow the final
hash calculation of the content addressed filesystem IPFS, but may
also be useful to trigger further post-upload actions.
"""

@classmethod
@index_locked
def gc(cls, named_cache, remote, jobs=None):
Expand All @@ -147,6 +168,7 @@ def gc(cls, named_cache, remote, jobs=None):
if tree.scheme != "":
used.update(named_cache.scheme_keys(tree.scheme))

remote.before_transfer(gc=True)
removed = False
# hashes must be sorted to ensure we always remove .dir files first
for hash_ in sorted(
Expand All @@ -166,4 +188,6 @@ def gc(cls, named_cache, remote, jobs=None):

if removed and hasattr(remote, "index"):
remote.index.clear()
if removed:
remote.after_transfer(gc=True)
return removed
77 changes: 77 additions & 0 deletions dvc/remote/ipfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging

from ..config import Config
from ..exceptions import DvcException
from ..progress import Tqdm
from .base import Remote

logger = logging.getLogger(__name__)


class IPFSRemote(Remote):
def before_transfer(self, download=False, upload=False, gc=False):
"""Make sure that the MFS is in the desired state"""
self._update_mfs_to_latest_cid()

def after_transfer(self, download=False, upload=False, gc=False):
"""Calculate the final CID after a successful upload

Changing files in our local MFS means that the content ID gets changed.
After doing any modifications, we therefore need to update .dvc/config
so it will always point to the latest content.
Though we get a new CID, other users won't need to download everything
again, since the existing files and subdirectories will keep their CID.
"""
if not (upload or gc):
return
path_info = self.tree.path_info
old_cid = path_info.cid
new_cid = self.tree.ipfs_client.files.stat(path_info.mfs_path)["Hash"]
logger.debug(f"Saving new CID ipfs://{new_cid}")
with Config().edit("repo") as repo_config:
section = None
for v in repo_config["remote"].values():
url = v.get("url")
if url == "ipfs://":
url = f"ipfs://{path_info.CID_EMPTY_DIRECTORY}"
if url == "ipfs://" + old_cid:
section = v
break
if not section:
raise DvcException("Could not find ipfs config in .dvc/config")
section["url"] = "ipfs://" + new_cid
path_info.cid = new_cid

def _update_mfs_to_latest_cid(self):
"""
This method makes sure that the hash of our working directory in the
MFS matches the desired CID which is defined in .dvc/config

It should be called before executing download operations to make sure
the content is available, and it should be called before uploading
new files to make sure no unwanted files get published.
"""
mfs = self.tree.path_info.mfs_path
files_api = self.tree.ipfs_client.files
cid = self.tree.path_info.cid

# doesn't change anything if the path already exists, but creates an
# empty directory if the path didn't exists, saving us from exceptions
files_api.mkdir(mfs, parents=True)

current_hash = files_api.stat(mfs)["Hash"]
if current_hash != cid:
logger.debug(f"Updating IPFS MFS path {mfs} to CID {cid}")
with Tqdm(
desc=f"Updating IPFS MFS at {mfs}. This may take a while. "
f"See progress at http://127.0.0.1:5001/webui"
):
# "cp" does not like overwriting files, so delete everything
# beforehand
# don't worry - IPFS still keeps a cache, so we won't actually
# downloading everything again
files_api.rm(mfs, recursive=True)
# this single command makes IPFS download everything.
# any chance to provide a useful progress bar?
# https://docs.ipfs.io/reference/http/api/#api-v0-files-cp
files_api.cp(f"/ipfs/{cid}", mfs)
4 changes: 4 additions & 0 deletions dvc/repo/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,9 @@ def gc(
if self.cache.azure:
_do_gc("azure", self.cache.azure, used, jobs)

# TODO: don't yet understand this piece of code, do we need this for IPFS?
# if self.cache.ipfs:
# _do_gc("ipfs", self.cache.ipfs, used, jobs)

if cloud:
_do_gc("remote", self.cloud.get_remote(remote, "gc -c"), used, jobs)
1 change: 1 addition & 0 deletions dvc/scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class Schemes:
OSS = "oss"
WEBDAV = "webdav"
WEBDAVS = "webdavs"
IPFS = "ipfs"
5 changes: 5 additions & 0 deletions dvc/tree/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import posixpath
from urllib.parse import urlparse

Expand All @@ -7,13 +8,16 @@
from .hdfs import HDFSTree
from .http import HTTPTree
from .https import HTTPSTree
from .ipfs import IPFSTree
from .local import LocalTree
from .oss import OSSTree
from .s3 import S3Tree
from .ssh import SSHTree
from .webdav import WebDAVTree
from .webdavs import WebDAVSTree

logger = logging.getLogger(__name__)

TREES = [
AzureTree,
GDriveTree,
Expand All @@ -26,6 +30,7 @@
OSSTree,
WebDAVTree,
WebDAVSTree,
IPFSTree,
# NOTE: LocalTree is the default
]

Expand Down
Loading