Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEST-#6830: Use local s3 server instead of public s3 buckets #6863

Merged
merged 7 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 17 additions & 1 deletion modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,26 @@ def s3_resource(s3_base):
raise RuntimeError("Could not create bucket")

s3fs.S3FileSystem.clear_instance_cache()
yield conn

s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})

test_s3_files = [
("modin-bugs/multiple_csv/", "modin/pandas/test/data/multiple_csv/"),
(
"modin-bugs/test_data_dir.parquet/",
"modin/pandas/test/data/test_data_dir.parquet/",
),
("modin-bugs/test_data.parquet", "modin/pandas/test/data/test_data.parquet"),
("modin-bugs/test_data.json", "modin/pandas/test/data/test_data.json"),
("modin-bugs/test_data.fwf", "modin/pandas/test/data/test_data.fwf"),
("modin-bugs/test_data.feather", "modin/pandas/test/data/test_data.feather"),
("modin-bugs/issue5159.parquet/", "modin/pandas/test/data/issue5159.parquet/"),
]
for s3_key, file_name in test_s3_files:
s3.put(file_name, f"{bucket}/{s3_key}", recursive=s3_key.endswith("/"))

yield conn

s3.rm(bucket, recursive=True)
for _ in range(20):
# We want to wait until the deletion finishes.
Expand Down
6 changes: 5 additions & 1 deletion modin/core/io/column_stores/feather_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,9 @@ def _read(cls, path, columns=None, **kwargs):
# Filtering out the columns that describe the frame's index
columns = [col for col in reader.schema.names if col not in index_cols]
return cls.build_query_compiler(
path, columns, use_threads=False, dtype_backend=kwargs["dtype_backend"]
path,
columns,
use_threads=False,
storage_options=kwargs["storage_options"],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of storage_options parameter was missed

dtype_backend=kwargs["dtype_backend"],
)
17 changes: 14 additions & 3 deletions modin/core/io/text/json_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def _read(cls, path_or_buf, **kwargs):
path_or_buf = stringify_path(path_or_buf)
path_or_buf = cls.get_path_or_buffer(path_or_buf)
if isinstance(path_or_buf, str):
if not cls.file_exists(path_or_buf):
if not cls.file_exists(
path_or_buf, storage_options=kwargs.get("storage_options")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of storage_options parameter was missed

):
return cls.single_worker_read(
path_or_buf, reason=cls._file_not_found_msg(path_or_buf), **kwargs
)
Expand All @@ -60,12 +62,21 @@ def _read(cls, path_or_buf, **kwargs):
return cls.single_worker_read(
path_or_buf, reason="`lines` argument not supported", **kwargs
)
with OpenFile(path_or_buf, "rb") as f:
with OpenFile(
path_or_buf,
"rb",
**(kwargs.get("storage_options", None) or {}),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of storage_options parameter was missed

) as f:
columns = pandas.read_json(BytesIO(b"" + f.readline()), lines=True).columns
kwargs["columns"] = columns
empty_pd_df = pandas.DataFrame(columns=columns)

with OpenFile(path_or_buf, "rb", kwargs.get("compression", "infer")) as f:
with OpenFile(
path_or_buf,
"rb",
kwargs.get("compression", "infer"),
**(kwargs.get("storage_options", None) or {}),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of storage_options parameter was missed

) as f:
column_widths, num_splits = cls._define_metadata(empty_pd_df, columns)
args = {"fname": path_or_buf, "num_splits": num_splits, **kwargs}
splits, _ = cls.partitioned_file(
Expand Down
18 changes: 14 additions & 4 deletions modin/experimental/core/io/text/csv_glob_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ def _read(cls, filepath_or_buffer, **kwargs):
reason=cls._file_not_found_msg(filepath_or_buffer),
**kwargs,
)
filepath_or_buffer = cls.get_path(filepath_or_buffer)
filepath_or_buffer = cls.get_path(
filepath_or_buffer, kwargs.get("storage_options")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of storage_options parameter was missed

)
elif not cls.pathlib_or_pypath(filepath_or_buffer):
return cls.single_worker_read(
filepath_or_buffer,
Expand Down Expand Up @@ -314,14 +316,16 @@ def file_exists(cls, file_path: str, storage_options=None) -> bool:
return exists or len(fs.glob(file_path)) > 0

@classmethod
def get_path(cls, file_path: str) -> list:
def get_path(cls, file_path: str, storage_options=None) -> list:
"""
Return the path of the file(s).

Parameters
----------
file_path : str
String representing a path.
storage_options : dict, optional
Keyword from `read_*` functions.

Returns
-------
Expand Down Expand Up @@ -363,11 +367,17 @@ def get_file_path(fs_handle) -> List[str]:
fs_addresses = [fs_handle.unstrip_protocol(path) for path in file_paths]
return fs_addresses

fs, _ = fsspec.core.url_to_fs(file_path)
if storage_options is not None:
new_storage_options = dict(storage_options)
new_storage_options.pop("anon", None)
else:
new_storage_options = {}

fs, _ = fsspec.core.url_to_fs(file_path, **new_storage_options)
Comment on lines +370 to +376
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made by analogy with other functions

try:
return get_file_path(fs)
except credential_error_type:
fs, _ = fsspec.core.url_to_fs(file_path, anon=True)
fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options)
return get_file_path(fs)

@classmethod
Expand Down
28 changes: 17 additions & 11 deletions modin/experimental/pandas/test/test_io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,15 @@ def test_read_single_csv_with_parse_dates(self, parse_dates):
@pytest.mark.parametrize(
"path",
[
"s3://modin-datasets/testing/multiple_csv/test_data*.csv",
"s3://modin-test/modin-bugs/multiple_csv/test_data*.csv",
"gs://modin-testing/testing/multiple_csv/test_data*.csv",
],
)
def test_read_multiple_csv_cloud_store(path):
def test_read_multiple_csv_cloud_store(path, s3_resource, s3_storage_options):
storage_options_new = {"anon": True}
if path.startswith("s3"):
storage_options_new = s3_storage_options

def _pandas_read_csv_glob(path, storage_options):
pandas_dfs = [
pandas.read_csv(
Expand All @@ -200,7 +204,7 @@ def _pandas_read_csv_glob(path, storage_options):
lambda module, **kwargs: pd.read_csv_glob(path, **kwargs).reset_index(drop=True)
if hasattr(module, "read_csv_glob")
else _pandas_read_csv_glob(path, **kwargs),
storage_options={"anon": True},
storage_options=storage_options_new,
)


Expand All @@ -212,17 +216,19 @@ def _pandas_read_csv_glob(path, storage_options):
reason=f"{Engine.get()} does not have experimental API",
)
@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use storage_options=None (because of our S3 server), but the test coverage hasn't decreased, so I guess there's no problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we want to add a test case for storage_options=None in future? What are we supposed to do?

Copy link
Collaborator Author

@anmyachev anmyachev Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, do not use our own s3 server, since in this case we always need to explicitly specify endpoint.

"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_multiple_csv_s3_storage_opts(storage_options):
path = "s3://modin-datasets/testing/multiple_csv/"
def test_read_multiple_csv_s3_storage_opts(
s3_resource, s3_storage_options, storage_options_extra
):
s3_path = "s3://modin-test/modin-bugs/multiple_csv/"

def _pandas_read_csv_glob(path, storage_options):
pandas_df = pandas.concat(
[
pandas.read_csv(
f"{path}test_data{i}.csv",
f"{s3_path}test_data{i}.csv",
storage_options=storage_options,
)
for i in range(2)
Expand All @@ -233,10 +239,10 @@ def _pandas_read_csv_glob(path, storage_options):
eval_general(
pd,
pandas,
lambda module, **kwargs: pd.read_csv_glob(path, **kwargs)
lambda module, **kwargs: pd.read_csv_glob(s3_path, **kwargs)
if hasattr(module, "read_csv_glob")
else _pandas_read_csv_glob(path, **kwargs),
storage_options=storage_options,
else _pandas_read_csv_glob(s3_path, **kwargs),
storage_options=s3_storage_options | storage_options_extra,
)


Expand Down
Binary file not shown.
Binary file not shown.
5 changes: 5 additions & 0 deletions modin/pandas/test/data/multiple_csv/test_data0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
a,b,c
0,True,x
1,False,y
2,True,z
3,False,w
5 changes: 5 additions & 0 deletions modin/pandas/test/data/multiple_csv/test_data1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
a,b,c
4,True,m
5,False,n
6,True,t
7,True,l
Binary file added modin/pandas/test/data/test_data.feather
Binary file not shown.
6 changes: 6 additions & 0 deletions modin/pandas/test/data/test_data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"Duration":60,"Pulse":110,"Maxpulse":130,"Calories":409}
{"Duration":60,"Pulse":117,"Maxpulse":145,"Calories":479}
{"Duration":60,"Pulse":103,"Maxpulse":135,"Calories":340}
{"Duration":45,"Pulse":109,"Maxpulse":175,"Calories":282}
{"Duration":45,"Pulse":117,"Maxpulse":148,"Calories":406}
{"Duration":60,"Pulse":102,"Maxpulse":127,"Calories":300}
Binary file added modin/pandas/test/data/test_data.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
65 changes: 37 additions & 28 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1939,26 +1939,29 @@ def test_read_parquet_hdfs(self, engine):
"path_type",
["object", "directory", "url"],
)
def test_read_parquet_s3(self, path_type, engine):
dataset_url = "s3://modin-datasets/testing/test_data.parquet"
def test_read_parquet_s3(self, s3_resource, path_type, engine, s3_storage_options):
s3_path = "s3://modin-test/modin-bugs/test_data.parquet"
if path_type == "object":
import s3fs

fs = s3fs.S3FileSystem(anon=True)
with fs.open(dataset_url, "rb") as file_obj:
fs = s3fs.S3FileSystem(
endpoint_url=s3_storage_options["client_kwargs"]["endpoint_url"]
)
with fs.open(s3_path, "rb") as file_obj:
eval_io("read_parquet", path=file_obj, engine=engine)
elif path_type == "directory":
s3_path = "s3://modin-test/modin-bugs/test_data_dir.parquet"
eval_io(
"read_parquet",
path="s3://modin-datasets/test_data_dir.parquet",
storage_options={"anon": True},
path=s3_path,
storage_options=s3_storage_options,
engine=engine,
)
else:
eval_io(
"read_parquet",
path=dataset_url,
storage_options={"anon": True},
path=s3_path,
storage_options=s3_storage_options,
engine=engine,
)

Expand Down Expand Up @@ -2078,15 +2081,16 @@ def test_read_parquet_5767(self, tmp_path, engine):
# both Modin and pandas read column "b" as a category
df_equals(test_df, read_df.astype("int64"))

def test_read_parquet_s3_with_column_partitioning(self, engine):
# This test case comes from
def test_read_parquet_s3_with_column_partitioning(
self, s3_resource, engine, s3_storage_options
):
# https://github.com/modin-project/modin/issues/4636
dataset_url = "s3://modin-datasets/modin-bugs/modin_bug_5159_parquet/df.parquet"
s3_path = "s3://modin-test/modin-bugs/issue5159.parquet"
eval_io(
fn_name="read_parquet",
path=dataset_url,
path=s3_path,
engine=engine,
storage_options={"anon": True},
storage_options=s3_storage_options,
)


Expand Down Expand Up @@ -2134,16 +2138,17 @@ def comparator(df1, df2):
)

@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_json_s3(self, storage_options):
def test_read_json_s3(self, s3_resource, s3_storage_options, storage_options_extra):
s3_path = "s3://modin-test/modin-bugs/test_data.json"
eval_io(
fn_name="read_json",
path_or_buf="s3://modin-datasets/testing/test_data.json",
path_or_buf=s3_path,
lines=True,
orient="records",
storage_options=storage_options,
storage_options=s3_storage_options | storage_options_extra,
)

def test_read_json_categories(self):
Expand Down Expand Up @@ -2929,14 +2934,15 @@ def test_read_fwf_empty_frame(self, make_fwf_file):
df_equals(modin_df, pandas_df)

@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_fwf_s3(self, storage_options):
def test_read_fwf_s3(self, s3_resource, s3_storage_options, storage_options_extra):
s3_path = "s3://modin-test/modin-bugs/test_data.fwf"
eval_io(
fn_name="read_fwf",
filepath_or_buffer="s3://modin-datasets/testing/test_data.fwf",
storage_options=storage_options,
filepath_or_buffer=s3_path,
storage_options=s3_storage_options | storage_options_extra,
)


Expand Down Expand Up @@ -3027,14 +3033,17 @@ def comparator(df1, df2):
)

@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_feather_s3(self, storage_options):
def test_read_feather_s3(
self, s3_resource, s3_storage_options, storage_options_extra
):
s3_path = "s3://modin-test/modin-bugs/test_data.feather"
eval_io(
fn_name="read_feather",
path="s3://modin-datasets/testing/test_data.feather",
storage_options=storage_options,
path=s3_path,
storage_options=s3_storage_options | storage_options_extra,
)

def test_read_feather_path_object(self, make_feather_file):
Expand Down
2 changes: 2 additions & 0 deletions modin/test/test_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def test_line_endings():
if any(i in subdir for i in [".git", ".idea", "__pycache__"]):
continue
for file in files:
if file.endswith(".parquet"):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In binary format there is no need to check these characters (\r\n).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sequence of characters may have a different meaning for binary formats. If, for example, you try to replace \r\n with \n, the arrow will give a message that parquet file is corrupted.

continue
filepath = os.path.join(subdir, file)
with open(filepath, "rb+") as f:
file_contents = f.read()
Expand Down