Skip to content

Commit

Permalink
Fix read_csv when parse_dates and index_col are the same (#548)
Browse files Browse the repository at this point in the history
* Resolves #537
* Now moves all `names` into the partition
  * This ensures that `parse_dates` and similar parameters have the
    correct columns names to find
  * Required that we also compute `index_col` inside the partitions
  (Previously, we were `pop`ing the parameter
* Adds tests to ensure that this use-case is supported
  • Loading branch information
devin-petersohn authored and williamma12 committed Apr 14, 2019
1 parent 9551e7a commit dd1a62e
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 3 deletions.
16 changes: 14 additions & 2 deletions modin/engines/ray/generic/io.py
Expand Up @@ -228,6 +228,19 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
Returns:
DataFrame or Series constructed from CSV file.
"""
names = kwargs.get("names", None)
index_col = kwargs.get("index_col", None)
if names is None:
# For the sake of the empty df, we assume no `index_col` to get the correct
# column names before we build the index. Because we pass `names` in, this
# step has to happen without removing the `index_col` otherwise it will not
# be assigned correctly
kwargs["index_col"] = None
names = pandas.read_csv(
open_file(filepath, "rb"), **dict(kwargs, nrows=0, skipfooter=0)
).columns
kwargs["index_col"] = index_col

empty_pd_df = pandas.read_csv(
open_file(filepath, "rb"), **dict(kwargs, nrows=0, skipfooter=0)
)
Expand All @@ -238,7 +251,7 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
partition_kwargs = dict(
kwargs,
header=None,
names=column_names,
names=names,
skipfooter=0,
skiprows=None,
parse_dates=parse_dates,
Expand Down Expand Up @@ -288,7 +301,6 @@ def _read_csv_from_file_pandas_on_ray(cls, filepath, kwargs={}):
)
index_ids.append(partition_id[-1])

index_col = kwargs.get("index_col", None)
if index_col is None:
new_index = pandas.RangeIndex(sum(ray.get(index_ids)))
else:
Expand Down
2 changes: 1 addition & 1 deletion modin/engines/ray/pandas_on_ray/io.py
Expand Up @@ -78,7 +78,7 @@ def _read_csv_with_offset_pandas_on_ray(
This is used to determine the total length of the DataFrame to build a
default Index.
"""
index_col = kwargs.pop("index_col", None)
index_col = kwargs.get("index_col", None)
bio = open_file(fname, "rb")
bio.seek(start)
to_read = header + bio.read(end - start)
Expand Down
4 changes: 4 additions & 0 deletions modin/pandas/test/data/test_time_parsing.csv
@@ -0,0 +1,4 @@
timestamp,symbol,high,low,open,close,spread,volume
2010-04-01 00:00:00,USD/JPY,93.52600,93.36100,93.51800,93.38200,0.00500,3049
2010-04-01 00:30:00,USD/JPY,93.47500,93.35200,93.38500,93.39100,0.00600,2251
2010-04-01 01:00:00,USD/JPY,93.42100,93.32600,93.39100,93.38400,0.00600,1577
112 changes: 112 additions & 0 deletions modin/pandas/test/test_io.py
Expand Up @@ -528,6 +528,118 @@ def test_from_csv(make_csv_file):
assert modin_df_equals_pandas(modin_df, pandas_df)


def test_parse_dates_read_csv():
pandas_df = pandas.read_csv("modin/pandas/test/data/test_time_parsing.csv")
modin_df = pd.read_csv("modin/pandas/test/data/test_time_parsing.csv")
modin_df_equals_pandas(modin_df, pandas_df)

pandas_df = pandas.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
encoding="utf-8",
)
modin_df = pd.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
encoding="utf-8",
)
modin_df_equals_pandas(modin_df, pandas_df)

pandas_df = pandas.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df = pd.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=0,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df_equals_pandas(modin_df, pandas_df)

pandas_df = pandas.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=2,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df = pd.read_csv(
"modin/pandas/test/data/test_time_parsing.csv",
names=[
"timestamp",
"symbol",
"high",
"low",
"open",
"close",
"spread",
"volume",
],
header=0,
index_col=2,
parse_dates=["timestamp"],
encoding="utf-8",
)
modin_df_equals_pandas(modin_df, pandas_df)


def test_from_table(make_csv_file):
make_csv_file(delimiter="\t")

Expand Down

0 comments on commit dd1a62e

Please sign in to comment.