Skip to content

Commit

Permalink
Fix MultiIndex handling in read_parquet when engine='pyarrow' (r…
Browse files Browse the repository at this point in the history
…apidsai#141)

Fixes: rapidsai/cudf#14352

This PR switches the `engine='pyarrow'` call flow in `cudf.read_parquet` to `pd.read_parquet` because there is additional `MultiIndex` handling required which pyarrow will not perform and is being handled in pandas.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Matthew Roeschke (https://github.com/mroeschke)

URL: rapidsai/cudf-private#141
  • Loading branch information
galipremsagar committed Nov 1, 2023
1 parent e1cd273 commit 7feefc0
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
21 changes: 17 additions & 4 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,10 @@ def read_parquet(
**kwargs,
):
"""{docstring}"""

if engine not in {"cudf", "pyarrow"}:
raise ValueError(
f"Only supported engines are {{'cudf', 'pyarrow'}}, got {engine=}"
)
# Do not allow the user to set file-opening options
# when `use_python_file_object=False` is specified
if use_python_file_object is False:
Expand Down Expand Up @@ -825,9 +828,19 @@ def _read_parquet(
use_pandas_metadata=use_pandas_metadata,
)
else:
return cudf.DataFrame.from_arrow(
pq.ParquetDataset(filepaths_or_buffers).read_pandas(
columns=columns, *args, **kwargs
if (
isinstance(filepaths_or_buffers, list)
and len(filepaths_or_buffers) == 1
):
filepaths_or_buffers = filepaths_or_buffers[0]

return cudf.DataFrame.from_pandas(
pd.read_parquet(
filepaths_or_buffers,
columns=columns,
engine=engine,
*args,
**kwargs,
)
)

Expand Down
24 changes: 18 additions & 6 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,6 @@ def large_int64_gdf():
def test_parquet_reader_basic(parquet_file, columns, engine):
expect = pd.read_parquet(parquet_file, columns=columns)
got = cudf.read_parquet(parquet_file, engine=engine, columns=columns)
if len(expect) == 0:
expect = expect.reset_index(drop=True)
got = got.reset_index(drop=True)
if "col_category" in expect.columns:
expect["col_category"] = expect["col_category"].astype("category")

# PANDAS returns category objects whereas cuDF returns hashes
if engine == "cudf":
Expand All @@ -278,7 +273,7 @@ def test_parquet_reader_basic(parquet_file, columns, engine):
if "col_category" in got.columns:
got = got.drop(columns=["col_category"])

assert_eq(expect, got, check_categorical=False)
assert_eq(expect, got)


@pytest.mark.filterwarnings("ignore:Using CPU")
Expand Down Expand Up @@ -2868,3 +2863,20 @@ def test_parquet_read_filter_and_project():
# Check result
expected = df[(df.a == 5) & (df.c > 20)][columns].reset_index(drop=True)
assert_eq(got, expected)


def test_parquet_reader_multiindex():
expected = pd.DataFrame(
{"A": [1, 2, 3]},
index=pd.MultiIndex.from_tuples([("a", 1), ("a", 2), ("b", 1)]),
)
file_obj = BytesIO()
expected.to_parquet(file_obj, engine="pyarrow")
with pytest.warns(UserWarning):
actual = cudf.read_parquet(file_obj, engine="pyarrow")
assert_eq(actual, expected)


def test_parquet_reader_engine_error():
with pytest.raises(ValueError):
cudf.read_parquet(BytesIO(), engine="abc")

0 comments on commit 7feefc0

Please sign in to comment.