Skip to content

Commit

Permalink
FIX-#1900: Fix groupby for case by is None (#3579)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Prutskov <alexey.prutskov@intel.com>
  • Loading branch information
prutskov committed Nov 25, 2021
1 parent 1a1edfd commit a04d7b7
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 48 deletions.
4 changes: 2 additions & 2 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2661,7 +2661,7 @@ def is_reduce_fn(fn, deep_level=0):
by = [by]
else:
if not isinstance(by, list):
by = [by]
by = [by] if by is not None else []
internal_by = [o for o in by if hashable(o) and o in self.columns]
internal_qc = (
[self.getitem_column_array(internal_by)] if len(internal_by) else []
Expand Down Expand Up @@ -2755,7 +2755,7 @@ def compute_groupby(df, drop=False, partition_idx=0):
result_cols = result.columns
result.drop(columns=missmatched_cols, inplace=True, errors="ignore")

if not as_index:
if not as_index and len(by) > 0:
keep_index_levels = len(by) > 1 and any(
isinstance(x, pandas.CategoricalDtype)
for x in df[internal_by_cols].dtypes
Expand Down
6 changes: 1 addition & 5 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,7 @@ def groupby(
elif hashable(by) and not isinstance(by, pandas.Grouper):
drop = by in self.columns
idx_name = by
if (
self._query_compiler.has_multiindex(axis=axis)
and by in self._query_compiler.get_index_names(axis)
and by is not None
):
if by is not None and by in self._query_compiler.get_index_names(axis):
# In this case we pass the string value of the name through to the
# partitions. This is more efficient than broadcasting the values.
level, by = by, None
Expand Down
111 changes: 78 additions & 33 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import numpy as np
import pandas
import pandas.core.groupby
from pandas.core.dtypes.common import is_list_like
from pandas.core.dtypes.common import is_list_like, is_numeric_dtype
from pandas.core.aggregation import reconstruct_func
from pandas._libs.lib import no_default
import pandas.core.common as com
Expand Down Expand Up @@ -128,7 +128,7 @@ def sem(self, ddof=1):
return self._default_to_pandas(lambda df: df.sem(ddof=ddof))

def mean(self, *args, **kwargs):
return self._apply_agg_function(lambda df: df.mean(*args, **kwargs))
return self._apply_agg_function_check_index(lambda df: df.mean(*args, **kwargs))

def any(self, **kwargs):
return self._wrap_aggregation(
Expand Down Expand Up @@ -243,20 +243,18 @@ def _shift(data, periods, freq, axis, fill_value, is_set_nan_rows=True):
)
result = result.dropna(subset=self._by.columns).sort_index()
else:
result = self._apply_agg_function(
result = self._apply_agg_function_check_index_name(
lambda df: df.shift(periods, freq, axis, fill_value)
)
result._query_compiler.set_index_name(None)
return result

def nth(self, n, dropna=None):
return self._default_to_pandas(lambda df: df.nth(n, dropna=dropna))

def cumsum(self, axis=0, *args, **kwargs):
result = self._apply_agg_function(lambda df: df.cumsum(axis, *args, **kwargs))
# pandas does not name the index on cumsum
result._query_compiler.set_index_name(None)
return result
return self._apply_agg_function_check_index_name(
lambda df: df.cumsum(axis, *args, **kwargs)
)

@property
def indices(self):
Expand All @@ -271,15 +269,17 @@ def filter(self, func, dropna=True, *args, **kwargs):
)

def cummax(self, axis=0, **kwargs):
result = self._apply_agg_function(lambda df: df.cummax(axis, **kwargs))
# pandas does not name the index on cummax
result._query_compiler.set_index_name(None)
return result
return self._apply_agg_function_check_index_name(
lambda df: df.cummax(axis, **kwargs)
)

def apply(self, func, *args, **kwargs):
if not isinstance(func, BuiltinFunctionType):
func = wrap_udf_function(func)
return self._apply_agg_function(lambda df: df.apply(func, *args, **kwargs))

return self._apply_agg_function_check_index(
lambda df: df.apply(func, *args, **kwargs)
)

@property
def dtypes(self):
Expand Down Expand Up @@ -405,10 +405,9 @@ def __getitem__(self, key):
)

def cummin(self, axis=0, **kwargs):
result = self._apply_agg_function(lambda df: df.cummin(axis=axis, **kwargs))
# pandas does not name the index on cummin
result._query_compiler.set_index_name(None)
return result
return self._apply_agg_function_check_index_name(
lambda df: df.cummin(axis=axis, **kwargs)
)

def bfill(self, limit=None):
return self._default_to_pandas(lambda df: df.bfill(limit=limit))
Expand Down Expand Up @@ -466,7 +465,7 @@ def try_get_str_func(fn):
**kwargs,
)
elif callable(func):
return self._apply_agg_function(
return self._apply_agg_function_check_index(
lambda grp, *args, **kwargs: grp.aggregate(func, *args, **kwargs),
*args,
**kwargs,
Expand Down Expand Up @@ -631,22 +630,21 @@ def ngroup(self, ascending=True):
return self._default_to_pandas(lambda df: df.ngroup(ascending))

def nunique(self, dropna=True):
return self._apply_agg_function(lambda df: df.nunique(dropna))
return self._apply_agg_function_check_index(lambda df: df.nunique(dropna))

def resample(self, rule, *args, **kwargs):
return self._default_to_pandas(lambda df: df.resample(rule, *args, **kwargs))

def median(self, **kwargs):
return self._apply_agg_function(lambda df: df.median(**kwargs))
return self._apply_agg_function_check_index(lambda df: df.median(**kwargs))

def head(self, n=5):
return self._default_to_pandas(lambda df: df.head(n))

def cumprod(self, axis=0, *args, **kwargs):
result = self._apply_agg_function(lambda df: df.cumprod(axis, *args, **kwargs))
# pandas does not name the index on cumprod
result._query_compiler.set_index_name(None)
return result
return self._apply_agg_function_check_index_name(
lambda df: df.cumprod(axis, *args, **kwargs)
)

def __iter__(self):
return self._iter.__iter__()
Expand All @@ -655,12 +653,9 @@ def cov(self):
return self._default_to_pandas(lambda df: df.cov())

def transform(self, func, *args, **kwargs):
result = self._apply_agg_function(
return self._apply_agg_function_check_index_name(
lambda df: df.transform(func, *args, **kwargs)
)
# pandas does not name the index on transform
result._query_compiler.set_index_name(None)
return result

def corr(self, **kwargs):
return self._default_to_pandas(lambda df: df.corr(**kwargs))
Expand All @@ -677,10 +672,9 @@ def fillna(self, **kwargs):
squeeze=self._squeeze,
**new_groupby_kwargs,
)
result = work_object._apply_agg_function(lambda df: df.fillna(**kwargs))
# pandas does not name the index on fillna
result._query_compiler.set_index_name(None)
return result
return work_object._apply_agg_function_check_index_name(
lambda df: df.fillna(**kwargs)
)

def count(self, **kwargs):
result = self._wrap_aggregation(
Expand Down Expand Up @@ -721,7 +715,7 @@ def quantile(self, q=0.5, **kwargs):
if is_list_like(q):
return self._default_to_pandas(lambda df: df.quantile(q=q, **kwargs))

return self._apply_agg_function(lambda df: df.quantile(q, **kwargs))
return self._apply_agg_function_check_index(lambda df: df.quantile(q, **kwargs))

def diff(self):
return self._default_to_pandas(lambda df: df.diff())
Expand Down Expand Up @@ -914,6 +908,9 @@ def _wrap_aggregation(
else:
groupby_qc = self._query_compiler

if all(not is_numeric_dtype(dtype) for dtype in groupby_qc.dtypes):
numeric_only = False

result = type(self._df)(
query_compiler=qc_method(
groupby_qc,
Expand All @@ -930,6 +927,54 @@ def _wrap_aggregation(
return result.squeeze()
return result

def _apply_agg_function_check_index(self, *args, **kwargs):
"""
Perform `self._apply_agg_function` with additional check.
Check the result of `self._apply_agg_function` on the need of resetting index.
Parameters
----------
*args : list
Positional arguments to pass to `self._apply_agg_function`.
**kwargs : dict
Keyword arguments to pass to `self._apply_agg_function`.
Returns
-------
DataFrame
"""
result = self._apply_agg_function(*args, **kwargs)
if self._by is None and not self._as_index:
# This is a workaround to align behavior with pandas. In this case pandas
# resets index, but Modin doesn't do that. More details are in https://github.com/modin-project/modin/issues/3716.
result.reset_index(drop=True, inplace=True)

return result

def _apply_agg_function_check_index_name(self, *args, **kwargs):
"""
Perform `self._apply_agg_function` with additional check.
Check the result of `self._apply_agg_function` on the need of resetting index name.
Parameters
----------
*args : list
Positional arguments to pass to `self._apply_agg_function`.
**kwargs : dict
Keyword arguments to pass to `self._apply_agg_function`.
Returns
-------
DataFrame
"""
result = self._apply_agg_function(*args, **kwargs)
if self._by is not None:
# pandas does not name the index for this case
result._query_compiler.set_index_name(None)
return result

def _apply_agg_function(self, f, *args, **kwargs):
"""
Perform aggregation and combine stages based on a given function.
Expand Down
39 changes: 31 additions & 8 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_mixed_dtypes_groupby(as_index):
]

for by in by_values:
if by_values[0] == "col3":
if isinstance(by[0], str) and by[0] == "col3":
modin_groupby = modin_df.set_index(by[0]).groupby(
by=by[0], as_index=as_index
)
Expand Down Expand Up @@ -127,8 +127,6 @@ def test_mixed_dtypes_groupby(as_index):

# TODO Add more apply functions
apply_functions = [lambda df: df.sum(), min]
# Workaround for Pandas bug #34656. Recreate groupby object for Pandas
pandas_groupby = pandas_df.groupby(by=by[-1], as_index=as_index)
for func in apply_functions:
eval_apply(modin_groupby, pandas_groupby, func)

Expand Down Expand Up @@ -1216,11 +1214,25 @@ def eval_shift(modin_groupby, pandas_groupby):
pandas_groupby,
lambda groupby: groupby.shift(periods=-3),
)
eval_general(
modin_groupby,
pandas_groupby,
lambda groupby: groupby.shift(axis=1, fill_value=777),
)

if isinstance(pandas_groupby, pandas.core.groupby.DataFrameGroupBy):
pandas_res = pandas_groupby.shift(axis=1, fill_value=777)
modin_res = modin_groupby.shift(axis=1, fill_value=777)
# Pandas produces unexpected index order (pandas GH 44269).
# Here we align index of Modin result with pandas to make test passed.
import pandas.core.algorithms as algorithms

indexer, _ = modin_res.index.get_indexer_non_unique(modin_res.index._values)
indexer = algorithms.unique1d(indexer)
modin_res = modin_res.take(indexer)

df_equals(modin_res, pandas_res)
else:
eval_general(
modin_groupby,
pandas_groupby,
lambda groupby: groupby.shift(axis=1, fill_value=777),
)


def test_groupby_on_index_values_with_loop():
Expand Down Expand Up @@ -1717,3 +1729,14 @@ def test_not_str_by(by, as_index):
eval_general(md_grp, pd_grp, lambda grp: grp.agg(lambda df: df.mean()))
eval_general(md_grp, pd_grp, lambda grp: grp.dtypes)
eval_general(md_grp, pd_grp, lambda grp: grp.first())


def test_sum_with_level():
data = {
"A": ["0.0", "1.0", "2.0", "3.0", "4.0"],
"B": ["0.0", "1.0", "0.0", "1.0", "0.0"],
"C": ["foo1", "foo2", "foo3", "foo4", "foo5"],
"D": pandas.bdate_range("1/1/2009", periods=5),
}
modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data)
eval_general(modin_df, pandas_df, lambda df: df.set_index("C").groupby("C").sum())

0 comments on commit a04d7b7

Please sign in to comment.