Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

oss: migrate to ossfs #6307

Merged
merged 23 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ jobs:
GDRIVE_CREDENTIALS_DATA: ${{ secrets.GDRIVE_CREDENTIALS_DATA }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
OSS_ACCESS_KEY_ID: ${{ secrets.OSS_ACCESS_KEY_ID}}
OSS_ACCESS_KEY_SECRET: ${{ secrets.OSS_ACCESS_KEY_SECRET}}
OSS_ENDPOINT: ${{ secrets.OSS_ENDPOINT}}
run: >-
python -m tests -n=4
--cov-report=xml --cov-report=term
Expand Down
135 changes: 42 additions & 93 deletions dvc/fs/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,124 +8,73 @@
from dvc.progress import Tqdm
from dvc.scheme import Schemes

from .base import BaseFileSystem
from .fsspec_wrapper import ObjectFSWrapper

logger = logging.getLogger(__name__)


class OSSFileSystem(BaseFileSystem): # pylint:disable=abstract-method
"""
oss2 document:
https://www.alibabacloud.com/help/doc-detail/32026.htm


Examples
----------
$ dvc remote add myremote oss://my-bucket/path
Set key id, key secret and endpoint using modify command
$ dvc remote modify myremote oss_key_id my-key-id
$ dvc remote modify myremote oss_key_secret my-key-secret
$ dvc remote modify myremote oss_endpoint endpoint
or environment variables
$ export OSS_ACCESS_KEY_ID="my-key-id"
$ export OSS_ACCESS_KEY_SECRET="my-key-secret"
$ export OSS_ENDPOINT="endpoint"
"""

# pylint:disable=abstract-method
class OSSFileSystem(ObjectFSWrapper):
scheme = Schemes.OSS
PATH_CLS = CloudURLInfo
REQUIRES = {"oss2": "oss2"}
REQUIRES = {"ossfs": "ossfs"}
PARAM_CHECKSUM = "etag"
COPY_POLL_SECONDS = 5
LIST_OBJECT_PAGE_SIZE = 100
DETAIL_FIELDS = frozenset(("etag", "size"))

def __init__(self, **config):
super().__init__(**config)

self.endpoint = config.get("oss_endpoint") or os.getenv("OSS_ENDPOINT")

self.key_id = (
config.get("oss_key_id")
or os.getenv("OSS_ACCESS_KEY_ID")
or "defaultId"
def _prepare_credentials(self, **config):
login_info = {}
login_info["key"] = config.get("oss_key_id") or os.getenv(
"OSS_ACCESS_KEY_ID"
)

self.key_secret = (
config.get("oss_key_secret")
or os.getenv("OSS_ACCESS_KEY_SECRET")
or "defaultSecret"
login_info["secret"] = config.get("oss_key_secret") or os.getenv(
"OSS_ACCESS_KEY_SECRET"
)
login_info["endpoint"] = config.get("oss_endpoint")
return login_info

@wrap_prop(threading.Lock())
@cached_property
def oss_service(self):
import oss2

logger.debug(f"key id: {self.key_id}")
logger.debug(f"key secret: {self.key_secret}")

return oss2.Auth(self.key_id, self.key_secret)

def _get_bucket(self, bucket):
import oss2

return oss2.Bucket(self.oss_service, self.endpoint, bucket)
def fs(self):
from ossfs import OSSFileSystem as _OSSFileSystem

def _generate_download_url(self, path_info, expires=3600):
return self._get_bucket(path_info.bucket).sign_url(
"GET", path_info.path, expires
)

def exists(self, path_info) -> bool:
paths = self._list_paths(path_info)
return any(path_info.path == path for path in paths)

def _list_paths(self, path_info):
import oss2

for blob in oss2.ObjectIterator(
self._get_bucket(path_info.bucket), prefix=path_info.path
):
yield blob.key

def walk_files(self, path_info, **kwargs):
if not kwargs.pop("prefix", False):
path_info = path_info / ""
for fname in self._list_paths(path_info):
if fname.endswith("/"):
continue

yield path_info.replace(path=fname)
return _OSSFileSystem(**self.fs_args)

def remove(self, path_info):
if path_info.scheme != self.scheme:
raise NotImplementedError

logger.debug(f"Removing oss://{path_info}")
self._get_bucket(path_info.bucket).delete_object(path_info.path)

def _upload_fobj(self, fobj, to_info, **kwargs):
self._get_bucket(to_info.bucket).put_object(to_info.path, fobj)
self.fs.rm_file(self._with_bucket(path_info))
isidentical marked this conversation as resolved.
Show resolved Hide resolved

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
self, from_file, to_info, name=None, no_progress_bar=False, **kwargs
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
bucket = self._get_bucket(to_info.bucket)
bucket.put_object_from_file(
to_info.path, from_file, progress_callback=pbar.update_to
total = os.path.getsize(from_file)
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc=name,
**kwargs,
) as pbar:
self.fs.put_file(
from_file,
self._with_bucket(to_info),
progress_callback=pbar.update_to,
)
self.fs.invalidate_cache(self._with_bucket(to_info.parent))

def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
self, from_info, to_file, name=None, no_progress_bar=False, **pbar_args
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
import oss2

bucket = self._get_bucket(from_info.bucket)
oss2.resumable_download(
bucket,
from_info.path,
total = self.fs.size(self._with_bucket(from_info))
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc=name,
**pbar_args,
) as pbar:
self.fs.get_file(
self._with_bucket(from_info),
to_file,
progress_callback=pbar.update_to,
)
4 changes: 4 additions & 0 deletions dvc/objects/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ def get_odb(fs, path_info, **config):
from .base import ObjectDB
from .gdrive import GDriveObjectDB
from .local import LocalObjectDB
from .oss import OSSObjectDB
from .ssh import SSHObjectDB

if fs.scheme == Schemes.LOCAL:
Expand All @@ -16,6 +17,9 @@ def get_odb(fs, path_info, **config):
if fs.scheme == Schemes.GDRIVE:
return GDriveObjectDB(fs, path_info, **config)

if fs.scheme == Schemes.OSS:
return OSSObjectDB(fs, path_info, **config)

return ObjectDB(fs, path_info, **config)


Expand Down
9 changes: 9 additions & 0 deletions dvc/objects/db/oss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .base import ObjectDB


class OSSObjectDB(ObjectDB):
"""
Temporary extra verification
"""

DEFAULT_VERIFY = True
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def run(self):
s3 = ["s3fs==2021.6.1", "aiobotocore[boto3]==1.3.0"]
azure = ["adlfs==0.7.1", "azure-identity>=1.4.0", "knack"]
# https://github.com/Legrandin/pycryptodome/issues/465
oss = ["oss2==2.6.1", "pycryptodome>=3.10"]
oss = ["ossfs==2021.7.3"]
ssh = ["paramiko[invoke]>=2.7.0"]

# Remove the env marker if/when pyarrow is available for Python3.9
Expand Down
8 changes: 1 addition & 7 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,8 @@
"hdfs",
"webdav",
"webhdfs",
"oss",
]
] + [
pytest.param(
pytest.lazy_fixture("oss"),
marks=pytest.mark.xfail(
reason="https://github.com/iterative/dvc/issues/4633"
),
)
]

# Clouds that implement the general methods that can be tested
Expand Down
19 changes: 6 additions & 13 deletions tests/remotes/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .base import Base

TEST_OSS_REPO_BUCKET = "dvc-test"
TEST_OSS_REPO_BUCKET = "dvc-test-github"
EMULATOR_OSS_ENDPOINT = "127.0.0.1:{port}"
EMULATOR_OSS_ACCESS_KEY_ID = "AccessKeyID"
EMULATOR_OSS_ACCESS_KEY_SECRET = "AccessKeySecret"
Expand Down Expand Up @@ -63,22 +63,15 @@ def _check():


@pytest.fixture
def oss(oss_server):
def oss(real_oss):
import oss2

url = OSS.get_url()
ret = OSS(url)
ret.config = {
"url": url,
"oss_key_id": EMULATOR_OSS_ACCESS_KEY_ID,
"oss_key_secret": EMULATOR_OSS_ACCESS_KEY_SECRET,
"oss_endpoint": oss_server,
}
ret = real_oss

auth = oss2.Auth(
EMULATOR_OSS_ACCESS_KEY_ID, EMULATOR_OSS_ACCESS_KEY_SECRET
auth = oss2.Auth(ret.config["oss_key_id"], ret.config["oss_key_secret"])
bucket = oss2.Bucket(
auth, ret.config["oss_endpoint"], TEST_OSS_REPO_BUCKET
)
bucket = oss2.Bucket(auth, oss_server, TEST_OSS_REPO_BUCKET)
try:
bucket.get_bucket_info()
except oss2.exceptions.NoSuchBucket:
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/remote/test_oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ def test_init(dvc):
"oss_endpoint": endpoint,
}
fs = OSSFileSystem(**config)
assert fs.endpoint == endpoint
assert fs.key_id == key_id
assert fs.key_secret == key_secret
assert fs.fs._endpoint == endpoint
assert fs.fs._auth.id == key_id
assert fs.fs._auth.secret == key_secret