Skip to content

Commit

Permalink
FIX-#3376: fix handling of groupby-dict aggregation of 'by' cols (#3592)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Dec 1, 2021
1 parent 0947ee8 commit d42c070
Show file tree
Hide file tree
Showing 6 changed files with 585 additions and 106 deletions.
270 changes: 262 additions & 8 deletions modin/core/dataframe/algebra/default2pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .default import DefaultMethod

import pandas
from pandas.core.dtypes.common import is_list_like


# FIXME: there is no sence of keeping `GroupBy` and `GroupByDefault` logic in a different
Expand Down Expand Up @@ -207,7 +208,8 @@ def fn(
if numeric_only:
df = df.select_dtypes(include="number")

by = by.squeeze(axis=1)
if isinstance(by, pandas.DataFrame):
by = by.squeeze(axis=1)
if (
drop
and isinstance(by, pandas.Series)
Expand All @@ -230,13 +232,30 @@ def fn(
result = result.to_frame()

if not as_index:
if (
len(result.index.names) == 1 and result.index.names[0] is None
) or all([name in result.columns for name in result.index.names]):
drop = False
elif kwargs.get("method") == "size":
drop = True
result = result.reset_index(drop=not drop)
method = kwargs.get("method")
if isinstance(by, pandas.Series):
# 1. If `drop` is True then 'by' Series represents a column from the
# source frame and so the 'by' is internal.
# 2. If method is 'size' then any 'by' is considered to be internal.
# This is a hacky legacy from the ``groupby_size`` implementation:
# https://github.com/modin-project/modin/issues/3739
internal_by = (by.name,) if drop or method == "size" else tuple()
else:
internal_by = by

cls.handle_as_index_for_dataframe(
result,
internal_by,
by_cols_dtypes=(
df.index.dtypes.values
if isinstance(df.index, pandas.MultiIndex)
else (df.index.dtype,)
),
by_length=len(by),
drop=drop,
method=method,
inplace=True,
)

if result.index.name == "__reduced__":
result.index.name = None
Expand Down Expand Up @@ -270,6 +289,241 @@ def build_groupby(cls, func):
return cls.build_aggregate_method(func)
return cls.build_groupby_reduce_method(func)

@staticmethod
def handle_as_index_for_dataframe(
result,
internal_by_cols,
by_cols_dtypes=None,
by_length=None,
selection=None,
partition_idx=0,
drop=True,
method=None,
inplace=False,
):
"""
Handle `as_index=False` parameter for the passed GroupBy aggregation result.
Parameters
----------
result : DataFrame
Frame containing GroupBy aggregation result computed with `as_index=True`
parameter (group names are located at the frame's index).
internal_by_cols : list-like
Internal 'by' columns.
by_cols_dtypes : list-like, optional
Data types of the internal 'by' columns. Required to do special casing
in case of categorical 'by'. If not specified, assume that there is no
categorical data in 'by'.
by_length : int, optional
Amount of keys to group on (including frame columns and external objects like list, Series, etc.)
If not specified, consider `by_length` to be equal ``len(internal_by_cols)``.
selection : label or list of labels, optional
Set of columns that were explicitly selected for aggregation (for example
via dict-aggregation). If not specified assuming that aggregation was
applied to all of the available columns.
partition_idx : int, default: 0
Positional index of the current partition.
drop : bool, default: True
Indicates whether or not any of the `by` data came from the same frame.
method : str, optional
Name of the groupby function. This is a hint to be able to do special casing.
Note: this parameter is a legacy from the ``groupby_size`` implementation,
it's a hacky one and probably will be removed in the future: https://github.com/modin-project/modin/issues/3739.
inplace : bool, default: False
Modify the DataFrame in place (do not create a new object).
Returns
-------
DataFrame
GroupBy aggregation result with the considered `as_index=False` parameter.
"""
if not inplace:
result = result.copy()

reset_index, drop, lvls_to_drop, cols_to_drop = GroupBy.handle_as_index(
result_cols=result.columns,
result_index_names=result.index.names,
internal_by_cols=internal_by_cols,
by_cols_dtypes=by_cols_dtypes,
by_length=by_length,
selection=selection,
partition_idx=partition_idx,
drop=drop,
method=method,
)

if len(lvls_to_drop) > 0:
result.index = result.index.droplevel(lvls_to_drop)
if len(cols_to_drop) > 0:
result.drop(columns=cols_to_drop, inplace=True)
if reset_index:
result.reset_index(drop=drop, inplace=True)
return result

@staticmethod
def handle_as_index(
result_cols,
result_index_names,
internal_by_cols,
by_cols_dtypes=None,
by_length=None,
selection=None,
partition_idx=0,
drop=True,
method=None,
):
"""
Compute hints to process ``as_index=False`` parameter for the GroupBy result.
This function resolves naming conflicts of the index levels to insert and the column labels
for the GroupBy result. The logic of this function assumes that the initial GroupBy result
was computed as ``as_index=True``.
Parameters
----------
result_cols : pandas.Index
Columns of the GroupBy result.
result_index_names : list-like
Index names of the GroupBy result.
internal_by_cols : list-like
Internal 'by' columns.
by_cols_dtypes : list-like, optional
Data types of the internal 'by' columns. Required to do special casing
in case of categorical 'by'. If not specified, assume that there is no
categorical data in 'by'.
by_length : int, optional
Amount of keys to group on (including frame columns and external objects like list, Series, etc.)
If not specified, consider `by_length` to be equal ``len(internal_by_cols)``.
selection : label or list of labels, optional
Set of columns that were explicitly selected for aggregation (for example
via dict-aggregation). If not specified assuming that aggregation was
applied to all of the available columns.
partition_idx : int, default: 0
Positional index of the current partition.
drop : bool, default: True
Indicates whether or not any of the `by` data came from the same frame.
method : str, optional
Name of the groupby function. This is a hint to be able to do special casing.
Note: this parameter is a legacy from the ``groupby_size`` implementation,
it's a hacky one and probably will be removed in the future: https://github.com/modin-project/modin/issues/3739.
Returns
-------
reset_index : bool
Indicates whether to reset index to the default one (0, 1, 2 ... n) at this partition.
drop_index : bool
If `reset_index` is True, indicates whether to drop all index levels (True) or insert them into the
resulting columns (False).
lvls_to_drop : list of ints
Contains numeric indices of the levels of the result index to drop as intersected.
cols_to_drop : list of labels
Contains labels of the columns to drop from the result as intersected.
Examples
--------
>>> groupby_result = compute_groupby_without_processing_as_index_parameter()
>>> if not as_index:
>>> reset_index, drop, lvls_to_drop, cols_to_drop = handle_as_index(**extract_required_params(groupby_result))
>>> if len(lvls_to_drop) > 0:
>>> groupby_result.index = groupby_result.index.droplevel(lvls_to_drop)
>>> if len(cols_to_drop) > 0:
>>> groupby_result = groupby_result.drop(columns=cols_to_drop)
>>> if reset_index:
>>> groupby_result_with_processed_as_index_parameter = groupby_result.reset_index(drop=drop)
>>> else:
>>> groupby_result_with_processed_as_index_parameter = groupby_result
"""
if by_length is None:
by_length = len(internal_by_cols)

reset_index = by_length > 0 or selection is not None

# If the method is "size" then the result contains only one unique named column
# and we don't have to worry about any naming conflicts, so inserting all of
# the "by" into the result (just a fast-path)
if method == "size":
return reset_index, False, [], []

# Pandas logic of resolving naming conflicts is the following:
# 1. If any categorical is in 'by' and 'by' is multi-column, then the categorical
# index is prioritized: drop intersected columns and insert all of the 'by' index
# levels to the frame as columns.
# 2. Otherwise, aggregation result is prioritized: drop intersected index levels and
# insert the filtered ones to the frame as columns.
if by_cols_dtypes is not None:
keep_index_levels = (
by_length > 1
and selection is None
and any(isinstance(x, pandas.CategoricalDtype) for x in by_cols_dtypes)
)
else:
keep_index_levels = False

# 1. We insert 'by'-columns to the result at the beginning of the frame and so only to the
# first partition, if partition_idx != 0 we just drop the index. If there are no columns
# that are required to drop (keep_index_levels is True) then we can exit here.
# 2. We don't insert 'by'-columns to the result if 'by'-data came from a different
# frame (drop is False), there's only one exception for this rule: if the `method` is "size",
# so if (drop is False) and method is not "size" we just drop the index and so can exit here.
if (not keep_index_levels and partition_idx != 0) or (
not drop and method != "size"
):
return reset_index, True, [], []

if not isinstance(internal_by_cols, pandas.Index):
if not is_list_like(internal_by_cols):
internal_by_cols = [internal_by_cols]
internal_by_cols = pandas.Index(internal_by_cols)

internal_by_cols = (
internal_by_cols[~internal_by_cols.str.startswith("__reduced__", na=False)]
if hasattr(internal_by_cols, "str")
else internal_by_cols
)

if selection is not None and not isinstance(selection, pandas.Index):
selection = pandas.Index(selection)

lvls_to_drop = []
cols_to_drop = []

if not keep_index_levels:
# We want to insert only these internal-by-cols that are not presented
# in the result in order to not create naming conflicts
if selection is None:
cols_to_insert = frozenset(internal_by_cols) - frozenset(result_cols)
else:
cols_to_insert = frozenset(
# We have to use explicit 'not in' check and not just difference
# of sets because of specific '__contains__' operator in case of
# scalar 'col' and MultiIndex 'selection'.
col
for col in internal_by_cols
if col not in selection
)
else:
cols_to_insert = internal_by_cols
# We want to drop such internal-by-cols that are presented
# in the result in order to not create naming conflicts
cols_to_drop = frozenset(internal_by_cols) & frozenset(result_cols)

if partition_idx == 0:
lvls_to_drop = [
i
for i, name in enumerate(result_index_names)
if name not in cols_to_insert
]
else:
lvls_to_drop = result_index_names

drop = False
if len(lvls_to_drop) == len(result_index_names):
drop = True
lvls_to_drop = []

return reset_index, drop, lvls_to_drop, cols_to_drop


class GroupByDefault(DefaultMethod):
"""Builder for default-to-pandas GroupBy aggregation functions."""
Expand Down
30 changes: 25 additions & 5 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def map(
# right index and placing columns in the correct order.
groupby_args["as_index"] = True
groupby_args["observed"] = True
# We have to filter func-dict BEFORE inserting broadcasted 'by' columns
# to avoid multiple aggregation results for 'by' cols in case they're
# present in the func-dict:
apply_func = cls.try_filter_dict(map_func, df)
if other is not None:
# Other is a broadcasted partition that represents 'by' columns
# Concatenate it with 'df' to group on its columns names
Expand All @@ -136,7 +140,6 @@ def map(
else:
by_part = by

apply_func = cls.try_filter_dict(map_func, df)
result = apply_func(
df.groupby(by=by_part, axis=axis, **groupby_args), **map_args
)
Expand Down Expand Up @@ -194,8 +197,12 @@ def reduce(
# there is a bug in pandas with intersection that forces us to do so:
# https://github.com/pandas-dev/pandas/issues/39699
by_part = pandas.Index(df.index.names)
if drop and len(df.columns.intersection(by_part)) > 0:
df.drop(columns=by_part, errors="ignore", inplace=True)
if drop:
to_drop = df.columns.intersection(by_part)
if isinstance(reduce_func, dict):
to_drop = to_drop.difference(reduce_func.keys())
if len(to_drop) > 0:
df.drop(columns=by_part, errors="ignore", inplace=True)

groupby_args = groupby_args.copy()
as_index = groupby_args["as_index"]
Expand All @@ -211,8 +218,21 @@ def reduce(
result = apply_func(df.groupby(axis=axis, **groupby_args), **reduce_args)

if not as_index:
insert_levels = partition_idx == 0 and (drop or method == "size")
result.reset_index(drop=not insert_levels, inplace=True)
GroupBy.handle_as_index_for_dataframe(
result,
by_part,
by_cols_dtypes=(
df.index.dtypes.values
if isinstance(df.index, pandas.MultiIndex)
else (df.index.dtype,)
),
by_length=len(by_part),
selection=reduce_func.keys() if isinstance(reduce_func, dict) else None,
partition_idx=partition_idx,
drop=drop,
method=method,
inplace=True,
)
# Result could not always be a frame, so wrapping it into DataFrame
return pandas.DataFrame(result)

Expand Down

0 comments on commit d42c070

Please sign in to comment.