Skip to content

Commit

Permalink
oss: migrate to ossfs (#6307)
Browse files Browse the repository at this point in the history
* Migrate to ossfs

fix 6177

* Add verification to object stored on oss

* Change the oss remote's dependency.

pass all of the tests

* Performance optimaztion and ossfs version update

* Remove the argument to set the resumable get/put

1. Experiments shows resumable get/put has little influnence on performance.
2. Just use resumable ones as default

* Use a new api for performance optimization

* Without filesize progress bar version

* With progress bar version

* Bump and add comments

1. Bump ossfs from 2021.6.2 to 2021.7.0
2. add some comments to the verification trigger

* Change the name of kwrags following parent class.

* Bump to new ossfs and rewrite upload_fobj

Because some problem with oss-emulater
1. bump to ossfs 2021.7.1
2. rewrite upload_fobj

* Optimization on rm_files

* Some optimization follow the old version.

* Update dvc/fs/oss.py

Co-authored-by: Ruslan Kuprieiev <kupruser@gmail.com>

* Update dvc/objects/db/oss.py

Co-authored-by: Ruslan Kuprieiev <kupruser@gmail.com>

* bump into ossfs 2021.7.2

* Bump into ossfs 2021.7.3OC

* Switch to real_oss_server

* Switch to a real oss server

* Use the oss bucket located in US.

* Remove some emulator related code

* Add envs to ci

* remove some oss pytest XFAIL

Because we didn't use emulator, we can remove some pytest XFAILs

Co-authored-by: Ruslan Kuprieiev <kupruser@gmail.com>
  • Loading branch information
karajan1001 and efiop committed Jul 13, 2021
1 parent 58f0911 commit 93f36d9
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 117 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ jobs:
GDRIVE_CREDENTIALS_DATA: ${{ secrets.GDRIVE_CREDENTIALS_DATA }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
OSS_ACCESS_KEY_ID: ${{ secrets.OSS_ACCESS_KEY_ID}}
OSS_ACCESS_KEY_SECRET: ${{ secrets.OSS_ACCESS_KEY_SECRET}}
OSS_ENDPOINT: ${{ secrets.OSS_ENDPOINT}}
run: >-
python -m tests -n=4
--cov-report=xml --cov-report=term
Expand Down
135 changes: 42 additions & 93 deletions dvc/fs/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,124 +8,73 @@
from dvc.progress import Tqdm
from dvc.scheme import Schemes

from .base import BaseFileSystem
from .fsspec_wrapper import ObjectFSWrapper

logger = logging.getLogger(__name__)


class OSSFileSystem(BaseFileSystem): # pylint:disable=abstract-method
"""
oss2 document:
https://www.alibabacloud.com/help/doc-detail/32026.htm
Examples
----------
$ dvc remote add myremote oss://my-bucket/path
Set key id, key secret and endpoint using modify command
$ dvc remote modify myremote oss_key_id my-key-id
$ dvc remote modify myremote oss_key_secret my-key-secret
$ dvc remote modify myremote oss_endpoint endpoint
or environment variables
$ export OSS_ACCESS_KEY_ID="my-key-id"
$ export OSS_ACCESS_KEY_SECRET="my-key-secret"
$ export OSS_ENDPOINT="endpoint"
"""

# pylint:disable=abstract-method
class OSSFileSystem(ObjectFSWrapper):
scheme = Schemes.OSS
PATH_CLS = CloudURLInfo
REQUIRES = {"oss2": "oss2"}
REQUIRES = {"ossfs": "ossfs"}
PARAM_CHECKSUM = "etag"
COPY_POLL_SECONDS = 5
LIST_OBJECT_PAGE_SIZE = 100
DETAIL_FIELDS = frozenset(("etag", "size"))

def __init__(self, **config):
super().__init__(**config)

self.endpoint = config.get("oss_endpoint") or os.getenv("OSS_ENDPOINT")

self.key_id = (
config.get("oss_key_id")
or os.getenv("OSS_ACCESS_KEY_ID")
or "defaultId"
def _prepare_credentials(self, **config):
login_info = {}
login_info["key"] = config.get("oss_key_id") or os.getenv(
"OSS_ACCESS_KEY_ID"
)

self.key_secret = (
config.get("oss_key_secret")
or os.getenv("OSS_ACCESS_KEY_SECRET")
or "defaultSecret"
login_info["secret"] = config.get("oss_key_secret") or os.getenv(
"OSS_ACCESS_KEY_SECRET"
)
login_info["endpoint"] = config.get("oss_endpoint")
return login_info

@wrap_prop(threading.Lock())
@cached_property
def oss_service(self):
import oss2

logger.debug(f"key id: {self.key_id}")
logger.debug(f"key secret: {self.key_secret}")

return oss2.Auth(self.key_id, self.key_secret)

def _get_bucket(self, bucket):
import oss2

return oss2.Bucket(self.oss_service, self.endpoint, bucket)
def fs(self):
from ossfs import OSSFileSystem as _OSSFileSystem

def _generate_download_url(self, path_info, expires=3600):
return self._get_bucket(path_info.bucket).sign_url(
"GET", path_info.path, expires
)

def exists(self, path_info) -> bool:
paths = self._list_paths(path_info)
return any(path_info.path == path for path in paths)

def _list_paths(self, path_info):
import oss2

for blob in oss2.ObjectIterator(
self._get_bucket(path_info.bucket), prefix=path_info.path
):
yield blob.key

def walk_files(self, path_info, **kwargs):
if not kwargs.pop("prefix", False):
path_info = path_info / ""
for fname in self._list_paths(path_info):
if fname.endswith("/"):
continue

yield path_info.replace(path=fname)
return _OSSFileSystem(**self.fs_args)

def remove(self, path_info):
if path_info.scheme != self.scheme:
raise NotImplementedError

logger.debug(f"Removing oss://{path_info}")
self._get_bucket(path_info.bucket).delete_object(path_info.path)

def _upload_fobj(self, fobj, to_info, **kwargs):
self._get_bucket(to_info.bucket).put_object(to_info.path, fobj)
self.fs.rm_file(self._with_bucket(path_info))

def _upload(
self, from_file, to_info, name=None, no_progress_bar=False, **_kwargs
self, from_file, to_info, name=None, no_progress_bar=False, **kwargs
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
bucket = self._get_bucket(to_info.bucket)
bucket.put_object_from_file(
to_info.path, from_file, progress_callback=pbar.update_to
total = os.path.getsize(from_file)
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc=name,
**kwargs,
) as pbar:
self.fs.put_file(
from_file,
self._with_bucket(to_info),
progress_callback=pbar.update_to,
)
self.fs.invalidate_cache(self._with_bucket(to_info.parent))

def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
self, from_info, to_file, name=None, no_progress_bar=False, **pbar_args
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
import oss2

bucket = self._get_bucket(from_info.bucket)
oss2.resumable_download(
bucket,
from_info.path,
total = self.fs.size(self._with_bucket(from_info))
with Tqdm(
disable=no_progress_bar,
total=total,
bytes=True,
desc=name,
**pbar_args,
) as pbar:
self.fs.get_file(
self._with_bucket(from_info),
to_file,
progress_callback=pbar.update_to,
)
4 changes: 4 additions & 0 deletions dvc/objects/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ def get_odb(fs, path_info, **config):
from .base import ObjectDB
from .gdrive import GDriveObjectDB
from .local import LocalObjectDB
from .oss import OSSObjectDB
from .ssh import SSHObjectDB

if fs.scheme == Schemes.LOCAL:
Expand All @@ -16,6 +17,9 @@ def get_odb(fs, path_info, **config):
if fs.scheme == Schemes.GDRIVE:
return GDriveObjectDB(fs, path_info, **config)

if fs.scheme == Schemes.OSS:
return OSSObjectDB(fs, path_info, **config)

return ObjectDB(fs, path_info, **config)


Expand Down
9 changes: 9 additions & 0 deletions dvc/objects/db/oss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .base import ObjectDB


class OSSObjectDB(ObjectDB):
"""
Temporary extra verification
"""

DEFAULT_VERIFY = True
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def run(self):
s3 = ["s3fs==2021.6.1", "aiobotocore[boto3]==1.3.0"]
azure = ["adlfs==2021.7.0", "azure-identity>=1.4.0", "knack"]
# https://github.com/Legrandin/pycryptodome/issues/465
oss = ["oss2==2.6.1", "pycryptodome>=3.10"]
oss = ["ossfs==2021.7.3"]
ssh = ["paramiko[invoke]>=2.7.0"]

# Remove the env marker if/when pyarrow is available for Python3.9
Expand Down
8 changes: 1 addition & 7 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,8 @@
"hdfs",
"webdav",
"webhdfs",
"oss",
]
] + [
pytest.param(
pytest.lazy_fixture("oss"),
marks=pytest.mark.xfail(
reason="https://github.com/iterative/dvc/issues/4633"
),
)
]

# Clouds that implement the general methods that can be tested
Expand Down
19 changes: 6 additions & 13 deletions tests/remotes/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .base import Base

TEST_OSS_REPO_BUCKET = "dvc-test"
TEST_OSS_REPO_BUCKET = "dvc-test-github"
EMULATOR_OSS_ENDPOINT = "127.0.0.1:{port}"
EMULATOR_OSS_ACCESS_KEY_ID = "AccessKeyID"
EMULATOR_OSS_ACCESS_KEY_SECRET = "AccessKeySecret"
Expand Down Expand Up @@ -63,22 +63,15 @@ def _check():


@pytest.fixture
def oss(oss_server):
def oss(real_oss):
import oss2

url = OSS.get_url()
ret = OSS(url)
ret.config = {
"url": url,
"oss_key_id": EMULATOR_OSS_ACCESS_KEY_ID,
"oss_key_secret": EMULATOR_OSS_ACCESS_KEY_SECRET,
"oss_endpoint": oss_server,
}
ret = real_oss

auth = oss2.Auth(
EMULATOR_OSS_ACCESS_KEY_ID, EMULATOR_OSS_ACCESS_KEY_SECRET
auth = oss2.Auth(ret.config["oss_key_id"], ret.config["oss_key_secret"])
bucket = oss2.Bucket(
auth, ret.config["oss_endpoint"], TEST_OSS_REPO_BUCKET
)
bucket = oss2.Bucket(auth, oss_server, TEST_OSS_REPO_BUCKET)
try:
bucket.get_bucket_info()
except oss2.exceptions.NoSuchBucket:
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/remote/test_oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ def test_init(dvc):
"oss_endpoint": endpoint,
}
fs = OSSFileSystem(**config)
assert fs.endpoint == endpoint
assert fs.key_id == key_id
assert fs.key_secret == key_secret
assert fs.fs._endpoint == endpoint
assert fs.fs._auth.id == key_id
assert fs.fs._auth.secret == key_secret

0 comments on commit 93f36d9

Please sign in to comment.