Skip to content

Commit

Permalink
FIX-#6948: Fix groupby when Modin dataframe has several column partit…
Browse files Browse the repository at this point in the history
…ions (#6951)

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Feb 22, 2024
1 parent c422c78 commit 6dfe13f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
32 changes: 27 additions & 5 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pandas.core.common as com
import pandas.core.groupby
from pandas._libs import lib
from pandas.api.types import is_scalar
from pandas.core.apply import reconstruct_func
from pandas.core.dtypes.common import (
is_datetime64_any_dtype,
Expand Down Expand Up @@ -894,19 +895,40 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs)

do_relabel = None
if isinstance(func, dict) or func is None:
relabeling_required, func_dict, new_columns, order = reconstruct_func(
# the order from `reconstruct_func` cannot be used correctly if there
# is more than one columnar partition, since for correct use all columns
# must be available within one partition.
old_kwargs = dict(kwargs)
relabeling_required, func_dict, new_columns, _ = reconstruct_func(
func, **kwargs
)

if relabeling_required:

def do_relabel(obj_to_relabel): # noqa: F811
new_order, new_columns_idx = order, pandas.Index(new_columns)
# unwrap nested labels into one level tuple
result_labels = [None] * len(old_kwargs)
for idx, labels in enumerate(old_kwargs.values()):
if is_scalar(labels) or callable(labels):
result_labels[idx] = (
labels if not callable(labels) else labels.__name__
)
continue
new_elem = []
for label in labels:
if is_scalar(label) or callable(label):
new_elem.append(
label if not callable(label) else label.__name__
)
else:
new_elem.extend(label)
result_labels[idx] = tuple(new_elem)

new_order = obj_to_relabel.columns.get_indexer(result_labels)
new_columns_idx = pandas.Index(new_columns)
if not self._as_index:
nby_cols = len(obj_to_relabel.columns) - len(new_columns_idx)
new_order = np.concatenate(
[np.arange(nby_cols), new_order + nby_cols]
)
new_order = np.concatenate([np.arange(nby_cols), new_order])
by_cols = obj_to_relabel.columns[:nby_cols]
if by_cols.nlevels != new_columns_idx.nlevels:
by_cols = by_cols.remove_unused_levels()
Expand Down
54 changes: 54 additions & 0 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -3087,6 +3087,60 @@ def test_groupby_named_aggregation():
)


def test_groupby_several_column_partitions():
# see details in #6948
columns = [
"l_returnflag",
"l_linestatus",
"l_discount",
"l_extendedprice",
"l_quantity",
]
modin_df, pandas_df = create_test_dfs(
np.random.randint(0, 100, size=(1000, len(columns))), columns=columns
)

pandas_df["a"] = (pandas_df.l_extendedprice) * (1 - (pandas_df.l_discount))
# to create another column partition
modin_df["a"] = (modin_df.l_extendedprice) * (1 - (modin_df.l_discount))

eval_general(
modin_df,
pandas_df,
lambda df: df.groupby(["l_returnflag", "l_linestatus"])
.agg(
sum_qty=("l_quantity", "sum"),
sum_base_price=("l_extendedprice", "sum"),
sum_disc_price=("a", "sum"),
# sum_charge=("b", "sum"),
avg_qty=("l_quantity", "mean"),
avg_price=("l_extendedprice", "mean"),
avg_disc=("l_discount", "mean"),
count_order=("l_returnflag", "count"),
)
.reset_index(),
)


def test_groupby_named_agg():
# from pandas docs

data = {
"A": [1, 1, 2, 2],
"B": [1, 2, 3, 4],
"C": [0.362838, 0.227877, 1.267767, -0.562860],
}
modin_df, pandas_df = create_test_dfs(data)
eval_general(
modin_df,
pandas_df,
lambda df: df.groupby("A").agg(
b_min=pd.NamedAgg(column="B", aggfunc="min"),
c_sum=pd.NamedAgg(column="C", aggfunc="sum"),
),
)


### TEST GROUPBY WARNINGS ###


Expand Down

0 comments on commit 6dfe13f

Please sign in to comment.