Skip to content

Commit

Permalink
Add S3 support (#3258)
Browse files Browse the repository at this point in the history
Co-authored-by: 刘宝 <po.lb@antgroup.com>
  • Loading branch information
fyrestone and 刘宝 committed Sep 19, 2022
1 parent 010c758 commit c69eea9
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 2 deletions.
4 changes: 2 additions & 2 deletions mars/dataframe/datasource/read_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _tile_compressed(cls, op):
df = op.outputs[0]
chunk_op = op.copy().reset_key()
chunk_op.offset = 0
chunk_op.size = file_size(op.path)
chunk_op.size = file_size(op.path, storage_options=op.storage_options)
shape = df.shape
new_chunk = chunk_op.new_chunk(
None,
Expand Down Expand Up @@ -188,7 +188,7 @@ def _tile(cls, op: "DataFrameReadCSV"):
index_num = 0
for path in paths:
path = path_prefix + path
total_bytes = file_size(path)
total_bytes = file_size(path, storage_options=op.storage_options)
offset = 0
for _ in range(int(np.ceil(total_bytes * 1.0 / chunk_bytes))):
chunk_op = op.copy().reset_key()
Expand Down
1 change: 1 addition & 0 deletions mars/lib/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
# noinspection PyUnresolvedReferences
from .hdfs import HadoopFileSystem
from .azure import AzureBlobFileSystem
from .s3 import S3FileSystem
1 change: 1 addition & 0 deletions mars/lib/filesystem/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"hdfs": ["pyarrow"],
"az": ["fsspec", "adlfs"],
"abfs": ["fsspec", "adlfs"],
"s3": ["fsspec", "s3fs"],
}


Expand Down
67 changes: 67 additions & 0 deletions mars/lib/filesystem/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

"""
An example to read csv from s3
------------------------------
>>> import mars
>>> import mars.dataframe as md
>>>
>>> mars.new_session()
>>> # Pass endpoint_url / aws_access_key_id / aws_secret_access_key to read_csv.
>>> mdf = md.read_csv("s3://bucket/example.csv", index_col=0, storage_options={
>>> "client_kwargs": {
>>> "endpoint_url": "http://192.168.1.12:9000",
>>> "aws_access_key_id": "RwDeqMoctbLG3yly",
>>> "aws_secret_access_key": "uwinWm1hTAGJ6Wnipa4tbE4SwO3Mx6Ek",
>>> }})
>>> # Export environment vars AWS_ENDPOINT_URL / AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY.
>>> mdf = md.read_csv("s3://bucket/example.csv", index_col=0)
>>> r = mdf.head(1000).execute()
>>> print(r)
"""

try: # pragma: no cover
# make sure s3fs is installed
from s3fs import S3FileSystem as _S3FileSystem

# make sure fsspec is installed
from .fsspec_adapter import FsSpecAdapter

del _S3FileSystem
except ImportError:
FsSpecAdapter = None

if FsSpecAdapter is not None: # pragma: no cover
from .core import register_filesystem

class S3FileSystem(FsSpecAdapter):
def __init__(self, **kwargs):
super().__init__("s3", **kwargs)

@staticmethod
def parse_from_path(uri: str):
client_kwargs = {
"endpoint_url": os.environ.get("AWS_ENDPOINT_URL"),
"aws_access_key_id": os.environ.get("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": os.environ.get("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": os.environ.get("AWS_SESSION_TOKEN"),
}
client_kwargs = {k: v for k, v in client_kwargs.items() if v is not None}
return {"client_kwargs": client_kwargs}

register_filesystem("s3", S3FileSystem)
else:
S3FileSystem = None

0 comments on commit c69eea9

Please sign in to comment.