Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support args in groupby apply #10682

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 9 additions & 7 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def pipe(self, func, *args, **kwargs):
"""
return cudf.core.common.pipe(self, func, *args, **kwargs)

def apply(self, function):
def apply(self, function, *args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't support **kwargs in the same way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I considered this, and the reason I did not add it was really roadmap considerations. I know we want to rethink how this API works in general, and if we start supporting this now, we could back ourselves into a corner where users are reliant upon it and we're limited in the avenues we can pursue to replace this pipeline without complicating things at the outset. The same thing makes me a little uneasy about even adding *args, but at least there's a direct feature request for that and precedent for it elsewhere (DataFrame.apply and Series.apply both support args=).

As you have observed, it would probably not take much work to add so I'm happy to add it if there's strong motivation to do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation is to make this API align with pandas. https://pandas.pydata.org/docs/reference/api/pandas.core.groupby.GroupBy.apply.html#pandas.core.groupby.GroupBy.apply

What are we wanting to rethink? The public API, or the internal implementation? I would say that *args and **kwargs go together for this feature and we should implement both or neither -- not just *args.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, we should enable **kwargs for Series.apply and DataFrame.apply if the numba machinery is no more complicated than it is here. Parameters-by-keyword and parameters-by-position serve the same purpose and I don't see a fundamental distinction in complexity between them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the internal implementation that I expect to change, since this implementation does a loop over chunks which is what makes it potentially slow. Right now this API isn't backed by Numba at all, which is why we can pass through args and theoretically kwargs rather easily.

kwargs in a numba implementation is not trivial. For context, numba does not support kwargs at all in compiled functions. This is why we don't have it in the other apply APIs. Since we could end up exploring some kind of JIT compiled implementation of this API as well, it might be closing the door on some solutions for us that could otherwise cover a broad swath of use cases.

I would say that *args and **kwargs go together for this feature and we should implement both or neither -- not just *args.

I empathize with the feeling of asymmetry that comes along with having *args and not **kwargs, but not with the notion that we should not have *args unless we can also have **kwargs. For reasons similar to what you noted about there being little fundamental distinction, supporting *args unlocks a space of functions including many logical equivalents to **kwargs functions while still allowing us to tiptoe around this API a bit.

What do you think, @bdice ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that's the context I was missing. 😄 Let's move forward with this as-is, then. I'll review the rest of the PR.

"""Apply a python transformation function over the grouped chunk.

Parameters
Expand Down Expand Up @@ -595,17 +595,19 @@ def mult(df):
chunks = [
grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:])
]
chunk_results = [function(chk) for chk in chunks]

chunk_results = [function(chk, *args) for chk in chunks]
if not len(chunk_results):
return self.obj.head(0)

if cudf.api.types.is_scalar(chunk_results[0]):
result = cudf.Series(chunk_results, index=group_names)
result.index.names = self.grouping.names
elif isinstance(chunk_results[0], cudf.Series):
result = cudf.concat(chunk_results, axis=1).T
result.index.names = self.grouping.names
if isinstance(self.obj, cudf.DataFrame):
result = cudf.concat(chunk_results, axis=1).T
result.index.names = self.grouping.names
else:
result = cudf.concat(chunk_results)
else:
result = cudf.concat(chunk_results)

Expand Down Expand Up @@ -1577,8 +1579,8 @@ def agg(self, func):

return result

def apply(self, func):
result = super().apply(func)
def apply(self, func, *args):
result = super().apply(func, *args)

# apply Series name to result
result.name = self.obj.name
Expand Down
131 changes: 117 additions & 14 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,40 @@ def foo(df):
assert_groupby_results_equal(expect, got)


def create_test_groupby_apply_args_params():
def f1(df, k):
df["out"] = df["val1"] + df["val2"] + k
return df

def f2(df, k, L):
df["out"] = df["val1"] - df["val2"] + (k / L)
return df

def f3(df, k, L, m):
df["out"] = ((k * df["val1"]) + (L * df["val2"])) / m
return df

return [(f1, (42,)), (f2, (42, 119)), (f3, (42, 119, 212.1))]


@pytest.mark.parametrize("func,args", create_test_groupby_apply_args_params())
def test_groupby_apply_args(func, args):
np.random.seed(0)
df = DataFrame()
nelem = 20
df["key1"] = np.random.randint(0, 3, nelem)
df["key2"] = np.random.randint(0, 2, nelem)
df["val1"] = np.random.random(nelem)
df["val2"] = np.random.random(nelem)

expect_grpby = df.to_pandas().groupby(["key1", "key2"], as_index=False)
got_grpby = df.groupby(["key1", "key2"])

expect = expect_grpby.apply(func, *args)
got = got_grpby.apply(func, *args)
assert_groupby_results_equal(expect, got)


def test_groupby_apply_grouped():
np.random.seed(0)
df = DataFrame()
Expand Down Expand Up @@ -1595,7 +1629,38 @@ def test_groupby_pipe():
assert_groupby_results_equal(expected, actual)


def test_groupby_apply_return_scalars():
def create_test_groupby_apply_return_scalars_params():
def f0(x):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = ticker / 10
return full

def f1(x, k):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = ticker / k
return full

def f2(x, k, L):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = L * (ticker / k)
return full

def f3(x, k, L, m):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = L * (ticker / k) % m
return full

return [(f0, ()), (f1, (42,)), (f2, (42, 119)), (f3, (42, 119, 212.1))]


@pytest.mark.parametrize(
"func,args", create_test_groupby_apply_return_scalars_params()
)
def test_groupby_apply_return_scalars(func, args):
pdf = pd.DataFrame(
{
"A": [1, 1, 2, 2, 3, 3, 4, 4, 5, 5],
Expand All @@ -1615,30 +1680,52 @@ def test_groupby_apply_return_scalars():
)
gdf = cudf.from_pandas(pdf)

def custom_map_func(x):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = ticker / 10
return full

expected = pdf.groupby("A").apply(lambda x: custom_map_func(x))
actual = gdf.groupby("A").apply(lambda x: custom_map_func(x))
expected = pdf.groupby("A").apply(func, *args)
actual = gdf.groupby("A").apply(func, *args)

assert_groupby_results_equal(expected, actual)


def create_test_groupby_apply_return_series_dataframe_params():
def f0(x):
return x - x.max()

def f1(x):
return x.min() - x.max()

def f2(x):
return x.min()

def f3(x, k):
return x - x.max() + k

def f4(x, k, L):
return x.min() - x.max() + (k / L)

def f5(x, k, L, m):
return m * x.min() + (k / L)

return [
(f0, ()),
(f1, ()),
(f2, ()),
(f3, (42,)),
(f4, (42, 119)),
(f5, (41, 119, 212.1)),
]


@pytest.mark.parametrize(
"cust_func",
[lambda x: x - x.max(), lambda x: x.min() - x.max(), lambda x: x.min()],
"func,args", create_test_groupby_apply_return_series_dataframe_params()
)
def test_groupby_apply_return_series_dataframe(cust_func):
def test_groupby_apply_return_series_dataframe(func, args):
pdf = pd.DataFrame(
{"key": [0, 0, 1, 1, 2, 2, 2], "val": [0, 1, 2, 3, 4, 5, 6]}
)
gdf = cudf.from_pandas(pdf)

expected = pdf.groupby(["key"]).apply(cust_func)
actual = gdf.groupby(["key"]).apply(cust_func)
expected = pdf.groupby(["key"]).apply(func, *args)
actual = gdf.groupby(["key"]).apply(func, *args)

assert_groupby_results_equal(expected, actual)

Expand Down Expand Up @@ -2213,6 +2300,22 @@ def foo(x):
assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize(
"func,args",
[
(lambda x, k: x + k, (42,)),
(lambda x, k, L: x + k - L, (42, 191)),
(lambda x, k, L, m: (x + k) / (L * m), (42, 191, 99.9)),
],
)
def test_groupby_apply_series_args(func, args):

got = make_frame(DataFrame, 100).groupby("x").y.apply(func, *args)
expect = make_frame(pd.DataFrame, 100).groupby("x").y.apply(func, *args)

assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("label", [None, "left", "right"])
@pytest.mark.parametrize("closed", [None, "left", "right"])
def test_groupby_freq_week(label, closed):
Expand Down