Skip to content
Permalink
Browse files

Added read_csv support for S3 (#505)

* Added read_csv support for S3, no test cases yet

* Posix path and buffers should work again

* Formatting

* Test cases for reading from s3

* Comment to resolve

* Fixed the regex for s3

* Skipping s3 test on python backend

* Fixed merge conflicts

* formatting

* Update io.py

* Update test_io.py
  • Loading branch information
wuisawesome authored and devin-petersohn committed Mar 29, 2019
1 parent b98bbe9 commit 9ff79d3bece18b7aab4b981a1ff50bbd86ab922a
Showing with 62 additions and 7 deletions.
  1. +30 −5 modin/engines/ray/generic/io.py
  2. +2 −2 modin/engines/ray/pandas_on_ray/io.py
  3. +29 −0 modin/pandas/test/test_io.py
  4. +1 −0 requirements.txt
@@ -5,6 +5,7 @@
import pandas
from pandas.io.common import _infer_compression

import s3fs as S3FS
import inspect
import os
import py
@@ -17,6 +18,24 @@
from modin.engines.base.io import BaseIO

PQ_INDEX_REGEX = re.compile("__index_level_\d+__") # noqa W605
S3_ADDRESS_REGEX = re.compile("s3://(.*?)/(.*)")
s3fs = S3FS.S3FileSystem(anon=False)


def file_exists(file_path):
if isinstance(file_path, str):
match = S3_ADDRESS_REGEX.search(file_path)
if match:
s3fs.exists(file_path)
return os.path.exists(file_path)


def open_file(file_path, mode="rb"):
if isinstance(file_path, str):
match = S3_ADDRESS_REGEX.search(file_path)
if match:
return s3fs.open(file_path, mode=mode)
return open(file_path, mode=mode)


@ray.remote
@@ -209,7 +228,9 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
Returns:
DataFrame or Series constructed from CSV file.
"""
empty_pd_df = pandas.read_csv(filepath, **dict(kwargs, nrows=0, skipfooter=0))
empty_pd_df = pandas.read_csv(
open_file(filepath, "rb"), **dict(kwargs, nrows=0, skipfooter=0)
)
column_names = empty_pd_df.columns
skipfooter = kwargs.get("skipfooter", None)
skiprows = kwargs.pop("skiprows", None)
@@ -222,7 +243,7 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
skiprows=None,
parse_dates=parse_dates,
)
with open(filepath, "rb") as f:
with open_file(filepath, "rb") as f:
# Get the BOM if necessary
prefix = b""
if kwargs.get("encoding", None) is not None:
@@ -303,7 +324,11 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):

@classmethod
def _read_csv_from_pandas(cls, filepath_or_buffer, kwargs):
pd_obj = pandas.read_csv(filepath_or_buffer, **kwargs)
# TODO: Should we try to be smart about how we load files here, or naively default to pandas?
if isinstance(filepath_or_buffer, str):
pd_obj = pandas.read_csv(open_file(filepath_or_buffer, "rb"), **kwargs)
else:
pd_obj = pandas.read_csv(filepath_or_buffer, **kwargs)
if isinstance(pd_obj, pandas.DataFrame):
return cls.from_pandas(pd_obj)
elif isinstance(pd_obj, pandas.io.parsers.TextFileReader):
@@ -453,8 +478,8 @@ def _read(cls, filepath_or_buffer, **kwargs):
filtered_kwargs = kwargs

if isinstance(filepath_or_buffer, str):
if not os.path.exists(filepath_or_buffer):
ErrorMessage.default_to_pandas("File not found on disk")
if not file_exists(filepath_or_buffer):
ErrorMessage.default_to_pandas("File path could not be resolved")
return cls._read_csv_from_pandas(filepath_or_buffer, filtered_kwargs)
elif not isinstance(filepath_or_buffer, py.path.local):
read_from_pandas = True
@@ -8,7 +8,7 @@
import ray
from modin.data_management.utils import split_result_of_axis_func_pandas
from modin.backends.pandas.query_compiler import PandasQueryCompiler
from modin.engines.ray.generic.io import RayIO
from modin.engines.ray.generic.io import RayIO, open_file
from modin.engines.ray.pandas_on_ray.frame.partition_manager import (
PandasOnRayFrameManager,
)
@@ -79,7 +79,7 @@ def _read_csv_with_offset_pandas_on_ray(
default Index.
"""
index_col = kwargs.pop("index_col", None)
bio = open(fname, "rb")
bio = open_file(fname, "rb")
bio.seek(start)
to_read = header + bio.read(end - start)
bio.close()
@@ -13,6 +13,8 @@

from .utils import df_equals

from modin import __execution_engine__

if os.environ.get("MODIN_BACKEND", "Pandas").lower() == "pandas":
import modin.pandas as pd
else:
@@ -508,6 +510,33 @@ def test_from_csv(make_csv_file):
assert modin_df_equals_pandas(modin_df, pandas_df)


class FakeS3FS:
def exists(self, path):
return "s3://bucket/path.csv" == path

def open(self, path, mode="rb"):
if "s3://bucket/path.csv" == path:
return open(TEST_CSV_FILENAME, mode=mode)
else:
raise Exception("You shouldn't access that! (%s)" % path)


@pytest.mark.skipif(
__execution_engine__.lower() == "python", reason="Using pandas implementation"
)
def test_from_csv_s3(make_csv_file):
from modin.engines.ray.generic import io

io.s3fs = FakeS3FS()

make_csv_file()

pandas_df = pandas.read_csv(TEST_CSV_FILENAME)
modin_df = pd.read_csv("s3://bucket/path.csv")

assert modin_df_equals_pandas(modin_df, pandas_df)


def test_from_csv_chunksize(make_csv_file):
make_csv_file()

@@ -11,6 +11,7 @@ Jinja2
pathlib
tables==3.4.4
scipy
s3fs
pytest==3.9.3
coverage
pytest-cov

0 comments on commit 9ff79d3

Please sign in to comment.
You can’t perform that action at this time.