Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read_csv when parse_dates and index_col are the same #548

Merged
merged 2 commits into from Apr 14, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions modin/engines/ray/generic/io.py
Expand Up @@ -228,6 +228,19 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
Returns:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use this opportunity to move the _read_csv_file_from_pandas_on_ray away from generic/io.py and to the pandas on ray specific io file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be renamed, but this defines the general functionality for how the arguments are handled and how the index is built. I think it should stay because it is also used by #502 for the arrow backend.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok let's rename the function in #502 then?

DataFrame or Series constructed from CSV file.
"""
names = kwargs.get("names", None)
index_col = kwargs.get("index_col", None)
if names is None:
# For the sake of the empty df, we assume no `index_col` to get the correct
# column names before we build the index. Because we pass `names` in, this
# step has to happen without removing the `index_col` otherwise it will not
# be assigned correctly
kwargs["index_col"] = None
names = pandas.read_csv(
open_file(filepath, "rb"), **dict(kwargs, nrows=0, skipfooter=0)
).columns
kwargs["index_col"] = index_col

empty_pd_df = pandas.read_csv(
open_file(filepath, "rb"), **dict(kwargs, nrows=0, skipfooter=0)
)
Expand All @@ -238,7 +251,7 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
partition_kwargs = dict(
kwargs,
header=None,
names=column_names,
names=names,
skipfooter=0,
skiprows=None,
parse_dates=parse_dates,
Expand Down Expand Up @@ -288,7 +301,6 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
)
index_ids.append(partition_id[-1])

index_col = kwargs.get("index_col", None)
if index_col is None:
new_index = pandas.RangeIndex(sum(ray.get(index_ids)))
else:
Expand Down
2 changes: 1 addition & 1 deletion modin/engines/ray/pandas_on_ray/io.py
Expand Up @@ -78,7 +78,7 @@ def _read_csv_with_offset_pandas_on_ray(
This is used to determine the total length of the DataFrame to build a
default Index.
"""
index_col = kwargs.pop("index_col", None)
index_col = kwargs.get("index_col", None)
bio = open_file(fname, "rb")
bio.seek(start)
to_read = header + bio.read(end - start)
Expand Down
4 changes: 4 additions & 0 deletions modin/pandas/test/data/test_time_parsing.csv
@@ -0,0 +1,4 @@
timestamp,symbol,high,low,open,close,spread,volume
2010-04-01 00:00:00,USD/JPY,93.52600,93.36100,93.51800,93.38200,0.00500,3049
2010-04-01 00:30:00,USD/JPY,93.47500,93.35200,93.38500,93.39100,0.00600,2251
2010-04-01 01:00:00,USD/JPY,93.42100,93.32600,93.39100,93.38400,0.00600,1577
112 changes: 112 additions & 0 deletions modin/pandas/test/test_io.py
Expand Up @@ -528,6 +528,118 @@ def test_from_csv(make_csv_file):
assert modin_df_equals_pandas(modin_df, pandas_df)


def test_parse_dates_read_csv():
pandas_df = pandas.read_csv("modin/pandas/test/data/test_time_parsing.csv")
modin_df = pd.read_csv("modin/pandas/test/data/test_time_parsing.csv")
modin_df_equals_pandas(modin_df, pandas_df)

pandas_df = pandas.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
encoding="utf-8",
)
modin_df = pd.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
encoding="utf-8",
)
modin_df_equals_pandas(modin_df, pandas_df)

pandas_df = pandas.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df = pd.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df_equals_pandas(modin_df, pandas_df)

pandas_df = pandas.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=2,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df = pd.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=2,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df_equals_pandas(modin_df, pandas_df)


class FakeS3FS:
def exists(self, path):
return "s3://bucket/path.csv" == path
Expand Down