Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): Handle generalized ufunc edge cases better #16086

Closed
47 changes: 35 additions & 12 deletions py-polars/polars/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
from polars.dependencies import numpy as np
from polars.dependencies import pandas as pd
from polars.dependencies import pyarrow as pa
from polars.exceptions import ModuleUpgradeRequired, ShapeError
from polars.exceptions import ComputeError, ModuleUpgradeRequired, ShapeError
from polars.meta import get_index_type
from polars.series.array import ArrayNameSpace
from polars.series.binary import BinaryNameSpace
Expand Down Expand Up @@ -1295,7 +1295,7 @@ def __getitem__(

def __getitem__(
self,
item: (int | Series | range | slice | np.ndarray[Any, Any] | list[int]),
item: int | Series | range | slice | np.ndarray[Any, Any] | list[int],
) -> Any:
if isinstance(item, Series) and item.dtype.is_integer():
return self._take_with_series(item._pos_idxs(self.len()))
Expand Down Expand Up @@ -1404,13 +1404,10 @@ def __array_ufunc__(
raise NotImplementedError(msg)

args: list[int | float | np.ndarray[Any, Any]] = []

validity_mask = self.is_not_null()
for arg in inputs:
if isinstance(arg, (int, float, np.ndarray)):
args.append(arg)
elif isinstance(arg, Series):
validity_mask &= arg.is_not_null()
args.append(arg.to_physical()._s.to_numpy_view())
else:
msg = f"unsupported type {type(arg).__name__!r} for {arg!r}"
Expand Down Expand Up @@ -1443,6 +1440,21 @@ def __array_ufunc__(
else dtype_char_minimum
)

if ufunc.signature:
# Only generalized ufuncs have a signature set, and they're the
# ones that have problems with missing data.
if self.null_count() > 0:
msg = "Can't pass a Series with missing data to a generalized ufunc, as it might give unexpected results. See https://docs.pola.rs/user-guide/expressions/missing-data/ for suggestions on how to remove or fill in missing data."
raise ComputeError(msg)
# If the input and output are the same size, e.g. "(n)->(n)" we
# can allocate ourselves and save a copy. If they're different,
# we let the ufunc do the allocation, since only it knows the
# output size.
ufunc_input, ufunc_output = ufunc.signature.split("->")
allocate_output = ufunc_input == ufunc_output
else:
allocate_output = True

f = get_ffi_func("apply_ufunc_<>", numpy_char_code_to_dtype(dtype_char), s)

if f is None:
Expand All @@ -1452,13 +1464,24 @@ def __array_ufunc__(
)
raise NotImplementedError(msg)

series = f(lambda out: ufunc(*args, out=out, dtype=dtype_char, **kwargs))
return (
self._from_pyseries(series)
.to_frame()
.select(F.when(validity_mask).then(F.col(self.name)))
.to_series(0)
series = f(
lambda out: ufunc(*args, out=out, dtype=dtype_char, **kwargs),
allocate_output,
)
result = self._from_pyseries(series)
if not ufunc.signature:
# Missing data is allowed, so filter it out:
validity_mask = self.is_not_null()
for arg in inputs:
if isinstance(arg, Series):
validity_mask &= arg.is_not_null()

result = (
result.to_frame()
.select(F.when(validity_mask).then(F.col(self.name)))
.to_series(0)
)
return result
else:
msg = (
"only `__call__` is implemented for numpy ufuncs on a Series, got "
Expand Down Expand Up @@ -4099,7 +4122,7 @@ def equals(

def cast(
self,
dtype: (PolarsDataType | type[int] | type[float] | type[str] | type[bool]),
dtype: PolarsDataType | type[int] | type[float] | type[str] | type[bool],
*,
strict: bool = True,
) -> Self:
Expand Down
1 change: 1 addition & 0 deletions py-polars/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pip

# Interoperability
numpy
numba; python_version < '3.13' # Numba can lag Python releases
pandas
pyarrow
pydantic>=2.0.0
Expand Down
32 changes: 25 additions & 7 deletions py-polars/src/series/numpy_ufunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use numpy::{Element, PyArray1, PyArrayDescrMethods, ToNpyDims, PY_ARRAY_API};
use polars_core::prelude::*;
use polars_core::utils::arrow::types::NativeType;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
use pyo3::types::{PyNone, PyTuple};

use crate::series::PySeries;

Expand Down Expand Up @@ -67,13 +67,31 @@ macro_rules! impl_ufuncs {
($name:ident, $type:ident, $unsafe_from_ptr_method:ident) => {
#[pymethods]
impl PySeries {
// applies a ufunc by accepting a lambda out: ufunc(*args, out=out)
// the out array is allocated in this method, send to Python and once the ufunc is applied
// ownership is taken by Rust again to prevent memory leak.
// if the ufunc fails, we first must take ownership back.
fn $name(&self, lambda: &PyAny) -> PyResult<PySeries> {
// numpy array object, and a *mut ptr
// Applies a ufunc by accepting a lambda out: ufunc(*args, out=out).
//
// If allocate_out is true, the out array is allocated in this
// method, send to Python and once the ufunc is applied ownership is
// taken by Rust again to prevent memory leak. if the ufunc fails,
// we first must take ownership back.
//
// If allocate_out is false, the out parameter to the lambda will be
// None, meaning the ufunc will allocate memory itself. We will then
// have to convert that NumPy array into a pl.Series.
fn $name(&self, lambda: &PyAny, allocate_out: bool) -> PyResult<PySeries> {
Python::with_gil(|py| {
if !allocate_out {
// We're not going to allocate the output array.
// Instead, we'll let the ufunc do it.
let result = lambda.call1((PyNone::get_bound(py),))?;
// Is there a way to do this with less Python overhead?
let series_factory =
PyModule::import_bound(py, "polars")?.getattr("Series")?;
return series_factory
.call((self.name(), result), None)?
.getattr("_s")?
.extract::<PySeries>();
}

let size = self.len();
let (out_array, av) =
unsafe { aligned_array::<<$type as PolarsNumericType>::Native>(py, size) };
Expand Down
28 changes: 28 additions & 0 deletions py-polars/tests/unit/interop/numpy/_numba.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Infrastructure for testing Numba.

Numba releases often lag for a few months after Python releases, so we don't
want Numba to be a blocker for Python 3.X support. So this minimally emulates
the Numba module, while allowing for the fact that Numba may not be installed.
"""

import pytest

try:
from numba import float64, guvectorize # type: ignore[import-untyped]
except ImportError:
float64 = []

def guvectorize(_a, _b): # type: ignore[no-untyped-def]
"""When Numba is unavailable, skip tests using the decorated function."""

def decorator(_): # type: ignore[no-untyped-def]
def skip(*_args, **_kwargs): # type: ignore[no-untyped-def]
pytest.skip("Numba not available")

return skip

return decorator


__all__ = ["guvectorize", "float64"]
22 changes: 22 additions & 0 deletions py-polars/tests/unit/interop/numpy/test_ufunc_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import polars as pl
from polars.testing import assert_frame_equal, assert_series_equal
from tests.unit.interop.numpy._numba import float64, guvectorize


def test_ufunc() -> None:
Expand Down Expand Up @@ -130,3 +131,24 @@ def test_ufunc_multiple_expressions() -> None:
def test_grouped_ufunc() -> None:
df = pl.DataFrame({"id": ["a", "a", "b", "b"], "values": [0.1, 0.1, -0.1, -0.1]})
df.group_by("id").agg(pl.col("values").log1p().sum().pipe(np.expm1))


@guvectorize([(float64[:], float64[:])], "(n)->(n)")
def gufunc_mean(arr, result): # type: ignore[no-untyped-def]
mean = arr.mean()
for i in range(len(arr)):
result[i] = mean + i


def test_generalized_ufunc() -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really testing Expr.array_ufunc if you use map_batches. I was thinking tests like df.select(gufunc_mean(pl.col('s')).alias('result') That said, I don't think you need to put those tests in this PR as it's only tangentially related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

df = pl.DataFrame({"s": [1.0, 2.0, 3.0]})
result = df.select([pl.col("s").map_batches(gufunc_mean).alias("result")])
expected = pl.DataFrame({"result": [2.0, 3.0, 4.0]})
assert_frame_equal(result, expected)


def test_grouped_generalized_ufunc() -> None:
df = pl.DataFrame({"id": ["a", "a", "b", "b"], "values": [1.0, 2.0, 3.0, 4.0]})
result = df.group_by("id").agg(pl.col("values").map_batches(gufunc_mean)).sort("id")
expected = pl.DataFrame({"id": ["a", "b"], "values": [[1.5, 2.5], [3.5, 4.5]]})
assert_frame_equal(result, expected)
55 changes: 55 additions & 0 deletions py-polars/tests/unit/interop/numpy/test_ufunc_series.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import cast

import numpy as np
import pytest
from numpy.testing import assert_array_equal

import polars as pl
from polars.testing import assert_series_equal
from tests.unit.interop.numpy._numba import float64, guvectorize


def test_ufunc() -> None:
Expand Down Expand Up @@ -119,3 +121,56 @@ def test_numpy_string_array() -> None:
np.char.capitalize(s_str),
np.array(["Aa", "Bb", "Cc", "Dd"], dtype="<U2"),
)


@guvectorize([(float64[:], float64[:])], "(n)->(n)")
def add_one(arr, result): # type: ignore[no-untyped-def]
for i in range(len(arr)):
result[i] = arr[i] + 1.0


def test_generalized_ufunc() -> None:
"""A generalized ufunc can be called on a pl.Series."""
s_float = pl.Series("f", [1.0, 2.0, 3.0])
result = add_one(s_float)
assert_series_equal(result, pl.Series("f", [2.0, 3.0, 4.0]))


def test_generalized_ufunc_missing_data() -> None:
"""
If a pl.Series is missing data, using a generalized ufunc is not allowed.

While this particular example isn't necessarily a semantic issue, consider
a mean() function running on integers: it will give wrong results if the
input is missing data, since NumPy has no way to model missing slots. In
the general case, we can't assume the function will handle missing data
correctly.
"""
s_float = pl.Series("f", [1.0, 2.0, 3.0, None], dtype=pl.Float64)
with pytest.raises(pl.ComputeError):
add_one(s_float)


@guvectorize([(float64[:], float64[:], float64[:])], "(n),(m)->(m)")
def divide_by_sum(arr, arr2, result): # type: ignore[no-untyped-def]
total = arr.sum()
for i in range(len(arr2)):
result[i] = arr2[i] / total


def test_generalized_ufunc_different_output_size() -> None:
"""
A generalized ufunc that takes pl.Series of different sizes.

The result has the correct size.
"""
series = pl.Series("s", [1.0, 3.0], dtype=pl.Float64)
series2 = pl.Series("s2", [8.0, 16.0, 32.0], dtype=pl.Float64)
assert_series_equal(
divide_by_sum(series, series2),
pl.Series("s", [2.0, 4.0, 8.0], dtype=pl.Float64),
)
assert_series_equal(
divide_by_sum(series2, series),
pl.Series("s2", [1.0 / 56, 3.0 / 56], dtype=pl.Float64),
)