Skip to content

Commit

Permalink
FIX-#5158: Synchronize metadata before to_parquet (#5161)
Browse files Browse the repository at this point in the history
Signed-off-by: Karthik Velayutham <vkarthik@ponder.io>
  • Loading branch information
Karthik Velayutham committed Oct 26, 2022
1 parent 3cc33a2 commit 87c8f70
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ def func(df, **kw):
df.to_parquet(**kwargs)
return pandas.DataFrame()

# Ensure that the metadata is synchronized
qc._modin_frame._propagate_index_objs(axis=None)
result = qc._modin_frame._partition_mgr_cls.map_axis_partitions(
axis=1,
partitions=qc._modin_frame._partitions,
Expand Down
48 changes: 29 additions & 19 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,13 @@ def parquet_eval_to_file(modin_obj, pandas_obj, fn, extension, **fn_kwargs):
extension=extension, data_dir=dirname
)

engine = fn_kwargs.get("engine", "auto")

getattr(modin_obj, fn)(unique_filename_modin, **fn_kwargs)
getattr(pandas_obj, fn)(unique_filename_pandas, **fn_kwargs)

pandas_df = pandas.read_parquet(unique_filename_pandas)
modin_df = pd.read_parquet(unique_filename_modin)
pandas_df = pandas.read_parquet(unique_filename_pandas, engine=engine)
modin_df = pd.read_parquet(unique_filename_modin, engine=engine)
df_equals(pandas_df, modin_df)


Expand Down Expand Up @@ -1386,17 +1388,17 @@ def test_read_table_empty_frame(self, make_csv_file):
)


@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
class TestParquet:
@pytest.mark.parametrize("columns", [None, ["col1"]])
@pytest.mark.parametrize("row_group_size", [None, 100, 1000, 10_000])
@pytest.mark.parametrize("engine", ["auto", "pyarrow", "fastparquet"])
@pytest.mark.parametrize("path_type", [Path, str])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
)
def test_read_parquet(
self, make_parquet_file, columns, row_group_size, engine, path_type
self, engine, make_parquet_file, columns, row_group_size, path_type
):
with ensure_clean(".parquet") as unique_filename:
unique_filename = path_type(unique_filename)
Expand All @@ -1414,7 +1416,7 @@ def test_read_parquet(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
)
def test_read_parquet_indexing_by_column(self, make_parquet_file):
def test_read_parquet_indexing_by_column(self, engine, make_parquet_file):
# Test indexing into a column of Modin with various parquet file row lengths.
# Specifically, tests for https://github.com/modin-project/modin/issues/3527
# which fails when min_partition_size < nrows < min_partition_size * (num_partitions - 1)
Expand All @@ -1426,7 +1428,7 @@ def test_read_parquet_indexing_by_column(self, make_parquet_file):
unique_filename = get_unique_filename(extension="parquet", data_dir=dirname)
make_parquet_file(filename=unique_filename, nrows=nrows)

parquet_df = pd.read_parquet(unique_filename)
parquet_df = pd.read_parquet(unique_filename, engine=engine)
for col in parquet_df.columns:
parquet_df[col]

Expand All @@ -1435,13 +1437,12 @@ def test_read_parquet_indexing_by_column(self, make_parquet_file):
@pytest.mark.parametrize(
"rows_per_file", [[1000] * 40, [0, 0, 40_000], [10_000, 10_000] + [100] * 200]
)
@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
)
def test_read_parquet_directory(
self, make_parquet_dir, columns, row_group_size, rows_per_file, engine
self, engine, make_parquet_dir, columns, row_group_size, rows_per_file
):
num_cols = DATASET_SIZE_DICT.get(
TestDatasetSize.get(), DATASET_SIZE_DICT["Small"]
Expand Down Expand Up @@ -1471,7 +1472,6 @@ def test_read_parquet_directory(
columns=columns,
)

@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
@pytest.mark.parametrize("columns", [None, ["col1"]])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
Expand All @@ -1492,7 +1492,6 @@ def test_read_parquet_partitioned_directory(
columns=columns,
)

@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
Expand Down Expand Up @@ -1551,7 +1550,6 @@ def test_read_parquet_pandas_index(self, engine):
engine=engine,
)

@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
Expand All @@ -1577,7 +1575,6 @@ def test_read_parquet_pandas_index_partitioned(self, engine):
engine=engine,
)

@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
Expand All @@ -1604,7 +1601,6 @@ def test_read_parquet_hdfs(self, engine):
),
],
)
@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
Expand Down Expand Up @@ -1648,7 +1644,6 @@ def test_read_parquet_s3(self, path_type, engine):
engine=engine,
)

@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
Expand Down Expand Up @@ -1681,7 +1676,6 @@ def test_read_parquet_without_metadata(self, engine):
engine=engine,
)

@pytest.mark.parametrize("engine", ["pyarrow", "fastparquet"])
def test_read_empty_parquet_file(self, engine):
test_df = pandas.DataFrame()
with tempfile.TemporaryDirectory() as directory:
Expand All @@ -1694,27 +1688,43 @@ def test_read_empty_parquet_file(self, engine):
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
)
def test_to_parquet(self):
def test_to_parquet(self, engine):
modin_df, pandas_df = create_test_dfs(TEST_DATA)
parquet_eval_to_file(
modin_obj=modin_df,
pandas_obj=pandas_df,
fn="to_parquet",
extension="parquet",
engine=engine,
)

def test_to_parquet_keep_index(self, engine):
data = {"c0": [0, 1] * 1000, "c1": [2, 3] * 1000}
modin_df, pandas_df = create_test_dfs(data)
modin_df.index.name = "foo"
pandas_df.index.name = "foo"

parquet_eval_to_file(
modin_obj=modin_df,
pandas_obj=pandas_df,
fn="to_parquet",
extension="parquet",
index=True,
engine=engine,
)

@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
)
def test_read_parquet_2462(self):
def test_read_parquet_2462(self, engine):
test_df = pandas.DataFrame({"col1": [["ad_1", "ad_2"], ["ad_3"]]})

with tempfile.TemporaryDirectory() as directory:
path = f"{directory}/data"
os.makedirs(path)
test_df.to_parquet(path + "/part-00000.parquet")
read_df = pd.read_parquet(path)
test_df.to_parquet(path + "/part-00000.parquet", engine=engine)
read_df = pd.read_parquet(path, engine=engine)

df_equals(test_df, read_df)

Expand Down

0 comments on commit 87c8f70

Please sign in to comment.