Skip to content

Commit

Permalink
fsspec integration (#3253)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng committed Sep 15, 2022
1 parent 5ab5cad commit 0b20242
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 20 deletions.
2 changes: 2 additions & 0 deletions mars/dataframe/datastore/to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def to_parquet(
compression="snappy",
index=None,
partition_cols=None,
storage_options: dict = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -277,6 +278,7 @@ def to_parquet(
compression=compression,
index=index,
partition_cols=partition_cols,
storage_options=storage_options,
additional_kwargs=kwargs,
)
return op(df)
3 changes: 2 additions & 1 deletion mars/lib/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

from .base import FileSystem
from .core import file_size, glob, get_fs, open_file
from .core import file_size, glob, get_fs, open_file, register_filesystem
from .fsmap import FSMap
from .local import LocalFileSystem

# noinspection PyUnresolvedReferences
from .hdfs import HadoopFileSystem
from .azure import AzureBlobFileSystem
12 changes: 7 additions & 5 deletions mars/lib/filesystem/_glob.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ def _iglob(self, pathname, recursive, dironly):
glob_in_dir = self._glob0
for dirname in dirs:
for name in glob_in_dir(dirname, basename, dironly):
yield self._fs.path_join(dirname, name)
if dirname:
yield self._fs.path_join(dirname, name)
else:
yield name

# These 2 helper functions non-recursively glob inside a literal directory.
# They return a list of basenames. _glob1 accepts a pattern while _glob0
Expand Down Expand Up @@ -153,10 +156,9 @@ def _glob2(self, dirname, pattern, dironly): # pragma: no cover
# If dironly is true, yields only directory names.
def _iterdir(self, dirname, dironly):
if not dirname: # pragma: no cover
if isinstance(dirname, bytes):
dirname = bytes(os.curdir, "ASCII")
else:
dirname = os.curdir
dirname = ""
if not self._fs.isdir(dirname):
return iter(())
for entry in self._fs.ls(dirname):
if not dironly or self._fs.isdir(entry):
yield self._fs.path_split(entry)[-1]
Expand Down
36 changes: 36 additions & 0 deletions mars/lib/filesystem/azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2022 XProbe Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

try: # pragma: no cover
# make sure adlfs is installed
from adlfs import AzureBlobFileSystem as _AzureBlobFileSystem

# make sure fsspec is installed
from .fsspec_adapter import FsSpecAdapter

del _AzureBlobFileSystem
except ImportError:
FsSpecAdapter = None

if FsSpecAdapter is not None: # pragma: no cover
from .core import register_filesystem

class AzureBlobFileSystem(FsSpecAdapter):
def __init__(self, **kwargs):
super().__init__("az", **kwargs)

register_filesystem("az", AzureBlobFileSystem)
register_filesystem("abfs", AzureBlobFileSystem)
else:
AzureBlobFileSystem = None
19 changes: 18 additions & 1 deletion mars/lib/filesystem/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,24 @@ def path_join(self, *args):
return self.pathsep.join(args)

def path_split(self, path):
return path.rsplit(self.pathsep, 1)
"""
Split a pathname. Returns tuple "(head, tail)" where "tail" is everything after the final slash. Either part
may be empty.
Parameters
----------
path : string
Can be a file path or directory
Returns
-------
usage : int
"""
splits = path.rsplit(self.pathsep, 1)
if len(splits) == 1:
return "", splits[0]
else:
return splits

@abstractmethod
def stat(self, path: path_type) -> Dict:
Expand Down
31 changes: 18 additions & 13 deletions mars/lib/filesystem/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@


_filesystems = {"file": LocalFileSystem, "oss": OSSFileSystem}
_scheme_to_dependencies = {
"hdfs": ["pyarrow"],
"az": ["fsspec", "adlfs"],
"abfs": ["fsspec", "adlfs"],
}


def register_filesystem(name: str, fs):
Expand All @@ -42,24 +47,24 @@ def get_fs(path: path_type, storage_options: Dict = None) -> FileSystem:
if scheme == "" or len(scheme) == 1: # len == 1 for windows
scheme = "file"

try:
if scheme in _filesystems:
file_system_type = _filesystems[scheme]
except KeyError: # pragma: no cover
if scheme == "hdfs":
raise ImportError("Need to install `pyarrow` to connect to HDFS.")
if scheme == "file" or scheme == "oss":
# local file systems are singletons.
return file_system_type.get_instance()
else:
options = file_system_type.parse_from_path(path)
storage_options.update(options)
return file_system_type(**storage_options)
elif scheme in _scheme_to_dependencies: # pragma: no cover
dependencies = ", ".join(_scheme_to_dependencies[scheme])
raise ImportError(f"Need to install {dependencies} to access {scheme}.")
else:
raise ValueError(
f"Unknown file system type: {scheme}, "
f'available include: {", ".join(_filesystems)}'
f'available include: {", ".join(_scheme_to_dependencies.keys())}'
)

if scheme == "file" or scheme == "oss":
# local file system use an singleton.
return file_system_type.get_instance()
else:
options = file_system_type.parse_from_path(path)
storage_options.update(options)
return file_system_type(**storage_options)


def glob(path: path_type, storage_options: Dict = None) -> List[path_type]:
if "*" in path:
Expand Down
89 changes: 89 additions & 0 deletions mars/lib/filesystem/fsspec_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2022 XProbe Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Iterator, Tuple, Union, BinaryIO, TextIO, Dict

from fsspec import filesystem
from fsspec.core import stringify_path

from ...utils import implements
from .core import FileSystem
from .core import path_type


class FsSpecAdapter(FileSystem):
def __init__(self, scheme: str, **kwargs):
self._fs = filesystem(scheme, **kwargs)

@implements(FileSystem.cat)
def cat(self, path: path_type) -> bytes:
return self._fs.cat_file(stringify_path(path))

@implements(FileSystem.ls)
def ls(self, path: path_type) -> List[path_type]:
entries = []
for entry in self._fs.ls(stringify_path(path), detail=False):
if isinstance(entry, Dict):
entries.append(entry.get("name"))
elif isinstance(entry, str):
entries.append(entry)
else: # pragma: no cover
raise TypeError(f"Expect str or dict, but got {type(entry)}")
return entries

@implements(FileSystem.delete)
def delete(self, path: path_type, recursive: bool = False):
raise NotImplementedError

@implements(FileSystem.stat)
def stat(self, path: path_type) -> Dict:
return self._fs.info(stringify_path(path))

@implements(FileSystem.rename)
def rename(self, path: path_type, new_path: path_type):
raise NotImplementedError

@implements(FileSystem.mkdir)
def mkdir(self, path: path_type, create_parents: bool = True):
raise NotImplementedError

@implements(FileSystem.exists)
def exists(self, path: path_type):
return self._fs.exists(stringify_path(path))

@implements(FileSystem.isdir)
def isdir(self, path: path_type) -> bool:
return self._fs.isdir(stringify_path(path))

@implements(FileSystem.isfile)
def isfile(self, path: path_type) -> bool:
return self._fs.isfile(stringify_path(path))

@implements(FileSystem._isfilestore)
def _isfilestore(self) -> bool:
raise NotImplementedError

@implements(FileSystem.open)
def open(self, path: path_type, mode: str = "rb") -> Union[BinaryIO, TextIO]:
return self._fs.open(stringify_path(path), mode=mode)

@implements(FileSystem.walk)
def walk(self, path: path_type) -> Iterator[Tuple[str, List[str], List[str]]]:
raise NotImplementedError

@implements(FileSystem.glob)
def glob(self, path: path_type, recursive: bool = False) -> List[path_type]:
from ._glob import FileSystemGlob

return FileSystemGlob(self).glob(stringify_path(path), recursive=recursive)
23 changes: 23 additions & 0 deletions mars/lib/filesystem/tests/test_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
pa = None

from ....tests.core import require_hadoop
from ....utils import lazy_import
from .. import glob, FileSystem, LocalFileSystem, FSMap

if pa is not None:
from ..arrow import ArrowBasedLocalFileSystem, HadoopFileSystem
else: # pragma: no cover
ArrowBasedLocalFileSystem = None

fsspec_installed = lazy_import("fsspec") is not None


def test_path_parser():
path = "hdfs://user:password@localhost:8080/test"
Expand Down Expand Up @@ -198,3 +201,23 @@ def test_fsmap():
# create root
fs_map = FSMap(root + "/path2", fs, create=True)
assert len(fs_map) == 0


@pytest.mark.skipif(not fsspec_installed, reason="fsspec not installed")
def test_get_fs():
from .. import get_fs, register_filesystem
from ..fsspec_adapter import FsSpecAdapter

class InMemoryFileSystemAdapter(FsSpecAdapter):
def __init__(self, **kwargs):
super().__init__("memory", **kwargs)

register_filesystem("memory", InMemoryFileSystemAdapter)

assert isinstance(get_fs("file://"), LocalFileSystem)
assert isinstance(get_fs("memory://"), InMemoryFileSystemAdapter)

try:
get_fs("unknown://")
except ValueError as e:
assert "Unknown file system type" in e.__str__()

0 comments on commit 0b20242

Please sign in to comment.