Skip to content

Commit

Permalink
FIX-#6899: Avoid sending lazy categorical proxies to workers (#6900)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Jan 31, 2024
1 parent bfe77ed commit 7c4a665
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
17 changes: 16 additions & 1 deletion modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3852,7 +3852,22 @@ def apply_func(df): # pragma: no cover
# 2. The second one works slower, but only gathers light pandas.Index objects,
# so there should be less stress on the network.
if add_missing_cats or not IsRayCluster.get():
original_dtypes = self.dtypes if self.has_materialized_dtypes else None
if self.has_materialized_dtypes:
original_dtypes = pandas.Series(
{
# lazy proxies hold a reference to another modin's DataFrame which can be
# a problem during serialization, in this scenario we don't need actual
# categorical values, so a "category" string will be enough
name: (
"category"
if isinstance(dtype, LazyProxyCategoricalDtype)
else dtype
)
for name, dtype in self.dtypes.items()
}
)
else:
original_dtypes = None

def compute_aligned_columns(*dfs, initial_columns=None):
"""Take row partitions, filter empty ones, and return joined columns for them."""
Expand Down
19 changes: 10 additions & 9 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -3164,14 +3164,6 @@ def test_groupby_agg_provided_callable_warning():
pandas_groupby.agg(func)


def _apply_transform(df):
if len(df) == 0:
df = df.copy()
df.loc[0] = 10
return df.squeeze()
return df.sum()


@pytest.mark.parametrize(
"modify_config", [{RangePartitioningGroupby: True}], indirect=True
)
Expand All @@ -3183,7 +3175,16 @@ def _apply_transform(df):
pytest.param(lambda grp: grp.sum(), id="sum"),
pytest.param(lambda grp: grp.size(), id="size"),
pytest.param(lambda grp: grp.apply(lambda df: df.sum()), id="apply_sum"),
pytest.param(lambda grp: grp.apply(_apply_transform), id="apply_transform"),
pytest.param(
lambda grp: grp.apply(
lambda df: (
df.sum()
if len(df) > 0
else pandas.Series([10] * len(df.columns), index=df.columns)
)
),
id="apply_transform",
),
],
)
@pytest.mark.parametrize(
Expand Down

0 comments on commit 7c4a665

Please sign in to comment.