Skip to content

Commit

Permalink
Add a fix to Groupby for aggregations by a column from the DataFrame (#…
Browse files Browse the repository at this point in the history
…413)

* Add a fix to Groupby for aggregations by a column from the DataFrame

* Resolves #409
* Removes a grouped column from the result to match pandas
* Changes the way we compute `size` to match pandas
* Adds consisntecy between the DataFrame being computed on and the result

* Additional updates for `groupby` + agg:

* Computing columns more directly now. We reset the index or columns and
  use those indices to compute that actual index externally. This is
  more correct (and was actually being computed previously, but
  incorrectly).
* Adding **kwargs to `modin.pandas.groupby.DataFrameGroupby.rank`
* Adding tests for string + integer inter-operations
* Cleaning up and making some code more consistent

* Fix lint and version issue

* Fix lint

* Making sort correct for python2

* Fix lint

* Python2 and Python3 compat fix
  • Loading branch information
devin-petersohn committed Jan 20, 2019
1 parent 2eb8171 commit 5b2bb3b
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 53 deletions.
21 changes: 16 additions & 5 deletions modin/data_management/query_compiler/pandas_query_compiler.py
Expand Up @@ -2368,12 +2368,12 @@ def _post_process_apply(self, result_data, axis, try_scale=True):
else:
columns = self.columns
else:
# See above explaination for checking the lengths of columns
columns = internal_columns
# See above explanation for checking the lengths of columns
if len(internal_index) != len(self.index):
index = internal_index
else:
index = self.index
columns = internal_columns
# `apply` and `aggregate` can return a Series or a DataFrame object,
# and since we need to handle each of those differently, we have to add
# this logic here.
Expand All @@ -2383,7 +2383,6 @@ def _post_process_apply(self, result_data, axis, try_scale=True):
index = self.columns
elif axis and len(series_result) == len(self.index):
index = self.index

series_result.index = index
return series_result
return self.__constructor__(result_data, index, columns)
Expand Down Expand Up @@ -2492,20 +2491,32 @@ def groupby_agg(self, by, axis, agg_func, groupby_args, agg_args):
def groupby_agg_builder(df):
if not axis:
df.index = remote_index
df.columns = pandas.RangeIndex(len(df.columns))
else:
df.columns = remote_index
df.index = pandas.RangeIndex(len(df.index))
grouped_df = df.groupby(by=by, axis=axis, **groupby_args)
try:
return agg_func(grouped_df, **agg_args)
# This happens when the partition is filled with non-numeric data and a
# numeric operation is done. We need to build the index here to avoid issues
# with extracting the index.
except DataError:
return pandas.DataFrame(index=grouped_df.count().index)
return pandas.DataFrame(index=grouped_df.size().index)

func_prepared = self._prepare_method(lambda df: groupby_agg_builder(df))
result_data = self.map_across_full_axis(axis, func_prepared)
return self._post_process_apply(result_data, axis, try_scale=False)
if axis == 0:
index = self.compute_index(0, result_data, False)
columns = self.compute_index(1, result_data, True)
else:
index = self.compute_index(0, result_data, True)
columns = self.compute_index(1, result_data, False)
# If the result is a Series, this is how `compute_index` returns the columns.
if len(columns) == 0:
return self._post_process_apply(result_data, axis, try_scale=True)
else:
return self.__constructor__(result_data, index, columns)

# END Manual Partitioning methods

Expand Down
3 changes: 2 additions & 1 deletion modin/pandas/dataframe.py
Expand Up @@ -356,14 +356,15 @@ def groupby(
A new DataFrame resulting from the groupby.
"""
axis = pandas.DataFrame()._get_axis_number(axis)
idx_name = ""
idx_name = None
if callable(by):
by = by(self.index)
elif isinstance(by, string_types):
idx_name = by
by = self.__getitem__(by).values.tolist()
elif is_list_like(by):
if isinstance(by, pandas.Series):
idx_name = by.name
by = by.values.tolist()

mismatch = (
Expand Down
80 changes: 36 additions & 44 deletions modin/pandas/groupby.py
Expand Up @@ -6,7 +6,7 @@
import pandas.core.groupby
from pandas.core.dtypes.common import is_list_like
import pandas.core.common as com
import numpy as np
import sys

from modin.error_message import ErrorMessage
from .utils import _inherit_docstrings
Expand All @@ -33,23 +33,31 @@ def __init__(
idx_name,
**kwargs
):

self._axis = axis
self._idx_name = idx_name
self._df = df
self._query_compiler = df._query_compiler
self._query_compiler = self._df._query_compiler
self._index = self._query_compiler.index
self._columns = self._query_compiler.columns
self._by = by
# This tells us whether or not there are multiple columns/rows in the groupby
self._is_multi_by = all(obj in self._df for obj in self._by)
self._level = level
self._idx_name = idx_name
self._kwargs = {
"sort": sort,
"as_index": as_index,
"group_keys": group_keys,
"squeeze": squeeze,
}
self._kwargs.update(kwargs)

@property
def _sort(self):
return self._kwargs.get("sort")

@property
def _as_index(self):
return self._kwargs.get("as_index")

def __getattr__(self, key):
"""Afer regular attribute access, looks up the name in the columns
Expand Down Expand Up @@ -102,20 +110,14 @@ def _index_grouped(self):
self._index_grouped_cache = self._columns.groupby(self._by)
return self._index_grouped_cache

_keys_and_values_cache = None

@property
def _keys_and_values(self):
if self._keys_and_values_cache is None:
self._keys_and_values_cache = list(self._index_grouped.items())
if self._sort:
self._keys_and_values_cache.sort()
return self._keys_and_values_cache

@property
def _iter(self):
from .dataframe import DataFrame

if sys.version_info[0] == 2:
group_ids = self._index_grouped.iterkeys()
elif sys.version_info[0] == 3:
group_ids = self._index_grouped.keys()
if self._axis == 0:
return (
(
Expand All @@ -126,7 +128,7 @@ def _iter(self):
)
),
)
for k, _ in self._keys_and_values
for k in (sorted(group_ids) if self._sort else group_ids)
)
else:
return (
Expand All @@ -138,7 +140,7 @@ def _iter(self):
)
),
)
for k, _ in self._keys_and_values
for k in (sorted(group_ids) if self._sort else group_ids)
)

@property
Expand All @@ -155,9 +157,7 @@ def sem(self, ddof=1):
return self._default_to_pandas(lambda df: df.sem(ddof=ddof))

def mean(self, *args, **kwargs):
return self._apply_agg_function(
lambda df: df.mean(*args, **kwargs), numeric=True
)
return self._apply_agg_function(lambda df: df.mean(*args, **kwargs))

def any(self):
return self._apply_agg_function(lambda df: df.any())
Expand Down Expand Up @@ -203,7 +203,7 @@ def cumsum(self, axis=0, *args, **kwargs):

@property
def indices(self):
return dict(self._keys_and_values)
return self._index_grouped

def pct_change(self):
return self._default_to_pandas(lambda df: df.pct_change())
Expand Down Expand Up @@ -268,8 +268,8 @@ def last(self, **kwargs):
def mad(self):
return self._default_to_pandas(lambda df: df.mad())

def rank(self):
return self._apply_agg_function(lambda df: df.rank())
def rank(self, **kwargs):
return self._apply_agg_function(lambda df: df.rank(**kwargs))

@property
def corrwith(self):
Expand All @@ -294,10 +294,10 @@ def all(self, **kwargs):
return self._apply_agg_function(lambda df: df.all(**kwargs))

def size(self):
return self._apply_agg_function(lambda df: df.size())
return pandas.Series({k: len(v) for k, v in self._index_grouped.items()})

def sum(self, **kwargs):
return self._apply_agg_function(lambda df: df.sum(**kwargs), numeric=True)
return self._apply_agg_function(lambda df: df.sum(**kwargs))

def __unicode__(self):
return self._default_to_pandas(lambda df: df.__unicode__())
Expand Down Expand Up @@ -342,13 +342,13 @@ def ngroup(self, ascending=True):
)

def nunique(self, dropna=True):
return self._apply_agg_function(lambda df: df.nunique(dropna))
return self._apply_agg_function(lambda df: df.nunique(dropna), drop=False)

def resample(self, rule, *args, **kwargs):
return self._default_to_pandas(lambda df: df.resample(rule, *args, **kwargs))

def median(self, **kwargs):
return self._apply_agg_function(lambda df: df.median(**kwargs), numeric=True)
return self._apply_agg_function(lambda df: df.median(**kwargs))

def head(self, n=5):
return self._default_to_pandas(lambda df: df.head(n))
Expand Down Expand Up @@ -409,7 +409,7 @@ def diff(self):
def take(self, **kwargs):
return self._default_to_pandas(lambda df: df.take(**kwargs))

def _apply_agg_function(self, f, numeric=False, **kwargs):
def _apply_agg_function(self, f, drop=True, **kwargs):
"""Perform aggregation and combine stages based on a given function.
Args:
Expand All @@ -423,26 +423,18 @@ def _apply_agg_function(self, f, numeric=False, **kwargs):

if self._is_multi_by:
return self._default_to_pandas(f, **kwargs)

new_manager = self._query_compiler.groupby_agg(
# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
if self._idx_name is not None and drop:
groupby_qc = self._query_compiler.drop(columns=[self._idx_name])
else:
groupby_qc = self._query_compiler
new_manager = groupby_qc.groupby_agg(
self._by, self._axis, f, self._kwargs, kwargs
)
if self._idx_name != "":
if self._idx_name is not None and self._as_index:
new_manager.index.name = self._idx_name
new_columns = self._df.columns.drop(self._idx_name)
# We preserve columns only if the grouped axis is the index
elif self._axis == 0:
new_columns = self._df.columns
# We just keep everything the same if it is column groups
else:
new_columns = new_manager.columns
if numeric and self._axis == 0:
new_columns = [
col
for col in new_columns
if np.issubdtype(self._df.dtypes[col], np.number)
]
new_manager.columns = new_columns
return DataFrame(query_compiler=new_manager)

def _default_to_pandas(self, f, **kwargs):
Expand Down
86 changes: 83 additions & 3 deletions modin/pandas/test/test_groupby.py
Expand Up @@ -19,8 +19,12 @@
@pytest.fixture
def ray_df_equals_pandas(ray_df, pandas_df):
assert isinstance(ray_df, pd.DataFrame)
assert to_pandas(ray_df).equals(pandas_df) or (
all(ray_df.isna().all()) and all(pandas_df.isna().all())
# Order may not match here, but pandas behavior can change, so we will be consistent
# ourselves in keeping the columns in the order they were in before the groupby
assert (
to_pandas(ray_df).equals(pandas_df)
or (all(ray_df.isna().all()) and all(pandas_df.isna().all()))
or to_pandas(ray_df)[list(pandas_df.columns)].equals(pandas_df)
)


Expand Down Expand Up @@ -53,6 +57,83 @@ def ray_groupby_equals_pandas(ray_groupby, pandas_groupby):
ray_df_equals_pandas(g1[1], g2[1])


def test_mixed_dtypes_groupby():
frame_data = np.random.randint(97, 198, size=(2 ** 6, 2 ** 4))
pandas_df = pandas.DataFrame(frame_data).add_prefix("col")
# Convert every other column to string
for col in pandas_df.iloc[
:, [i for i in range(len(pandas_df.columns)) if i % 2 == 0]
]:
pandas_df[col] = [str(chr(i)) for i in pandas_df[col]]
ray_df = from_pandas(pandas_df)

n = 1

ray_groupby = ray_df.groupby(by="col1")
pandas_groupby = pandas_df.groupby(by="col1")

ray_groupby_equals_pandas(ray_groupby, pandas_groupby)
test_ngroups(ray_groupby, pandas_groupby)
test_skew(ray_groupby, pandas_groupby)
test_ffill(ray_groupby, pandas_groupby)
test_sem(ray_groupby, pandas_groupby)
test_mean(ray_groupby, pandas_groupby)
test_any(ray_groupby, pandas_groupby)
test_min(ray_groupby, pandas_groupby)
test_idxmax(ray_groupby, pandas_groupby)
test_ndim(ray_groupby, pandas_groupby)
test_cumsum(ray_groupby, pandas_groupby)
test_pct_change(ray_groupby, pandas_groupby)
test_cummax(ray_groupby, pandas_groupby)

# TODO Add more apply functions
apply_functions = [lambda df: df.sum(), min]
for func in apply_functions:
test_apply(ray_groupby, pandas_groupby, func)

test_dtypes(ray_groupby, pandas_groupby)
test_first(ray_groupby, pandas_groupby)
test_backfill(ray_groupby, pandas_groupby)
test_cummin(ray_groupby, pandas_groupby)
test_bfill(ray_groupby, pandas_groupby)
test_idxmin(ray_groupby, pandas_groupby)
test_prod(ray_groupby, pandas_groupby)
test_std(ray_groupby, pandas_groupby)

agg_functions = ["min", "max"]
for func in agg_functions:
test_agg(ray_groupby, pandas_groupby, func)
test_aggregate(ray_groupby, pandas_groupby, func)

test_last(ray_groupby, pandas_groupby)
test_mad(ray_groupby, pandas_groupby)
test_max(ray_groupby, pandas_groupby)
test_var(ray_groupby, pandas_groupby)
test_len(ray_groupby, pandas_groupby)
test_sum(ray_groupby, pandas_groupby)
test_ngroup(ray_groupby, pandas_groupby)
test_nunique(ray_groupby, pandas_groupby)
test_median(ray_groupby, pandas_groupby)
test_head(ray_groupby, pandas_groupby, n)
test_cumprod(ray_groupby, pandas_groupby)
test_cov(ray_groupby, pandas_groupby)

transform_functions = [lambda df: df + 4, lambda df: -df - 10]
for func in transform_functions:
test_transform(ray_groupby, pandas_groupby, func)

pipe_functions = [lambda dfgb: dfgb.sum()]
for func in pipe_functions:
test_pipe(ray_groupby, pandas_groupby, func)

test_corr(ray_groupby, pandas_groupby)
test_fillna(ray_groupby, pandas_groupby)
test_count(ray_groupby, pandas_groupby)
test_tail(ray_groupby, pandas_groupby, n)
test_quantile(ray_groupby, pandas_groupby)
test_take(ray_groupby, pandas_groupby)


def test_simple_row_groupby():
pandas_df = pandas.DataFrame(
{
Expand Down Expand Up @@ -215,7 +296,6 @@ def test_single_group_row_groupby():
test_take(ray_groupby, pandas_groupby)


@pytest.mark.skip(reason="See Modin issue #21.")
def test_large_row_groupby():
pandas_df = pandas.DataFrame(
np.random.randint(0, 8, size=(100, 4)), columns=list("ABCD")
Expand Down

0 comments on commit 5b2bb3b

Please sign in to comment.