Skip to content

Commit

Permalink
multiple apply python
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 15, 2021
1 parent 04d41c8 commit 59386ea
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 141 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ arange = ["polars-lazy/arange"]
true_div = ["polars-lazy/true_div"]

# don't use this
private = []
private = ["polars-lazy/private"]

# all opt-in datatypes
dtype-full = [
Expand Down
3 changes: 2 additions & 1 deletion py-polars/docs/source/reference/expression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ These functions can be used as expression and sometimes also in eager contexts.
pearson_corr
spearman_rank_corr
cov
map_binary
map
apply
fold
any
all
Expand Down
65 changes: 65 additions & 0 deletions py-polars/polars/lazy/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from polars.polars import cov as pycov
from polars.polars import fold as pyfold
from polars.polars import lit as pylit
from polars.polars import map_mul as _map_mul
from polars.polars import pearson_corr as pypearson_corr
from polars.polars import spearman_rank_corr as pyspearman_rank_corr

Expand Down Expand Up @@ -48,6 +49,8 @@
"pearson_corr",
"spearman_rank_corr",
"cov",
"map",
"apply",
"map_binary",
"fold",
"any",
Expand Down Expand Up @@ -462,13 +465,75 @@ def cov(
return pl.lazy.expr.wrap_expr(pycov(a._pyexpr, b._pyexpr))


def map(
exprs: Union[tp.List[str], tp.List["pl.Expr"]],
f: Callable[[tp.List["pl.Series"]], "pl.Series"],
return_dtype: Optional[Type[DataType]] = None,
) -> "pl.Expr":
"""
Map a custom function over multiple columns/expressions and produce a single Series result.
Parameters
----------
columns
Input Series to f
f
Function to apply over the input
return_dtype
dtype of the output Series
Returns
-------
Expr
"""
exprs = pl.lazy.expr._selection_to_pyexpr_list(exprs)
return pl.lazy.expr.wrap_expr(_map_mul(exprs, f, return_dtype, apply_groups=False))


def apply(
exprs: Union[tp.List[str], tp.List["pl.Expr"]],
f: Callable[[tp.List["pl.Series"]], "pl.Series"],
return_dtype: Optional[Type[DataType]] = None,
) -> "pl.Expr":
"""
Apply a custom function in a GroupBy context.
Depending on the context it has the following behavior:
## Context
* Select/Project
Don't do this, use `map_mul`
* GroupBy
expected type `f`: Callable[[Series], Series]
Applies a python function over each group.
Parameters
----------
columns
Input Series to f
f
Function to apply over the input
return_dtype
dtype of the output Series
Returns
-------
Expr
"""
exprs = pl.lazy.expr._selection_to_pyexpr_list(exprs)
return pl.lazy.expr.wrap_expr(_map_mul(exprs, f, return_dtype, apply_groups=True))


def map_binary(
a: Union[str, "pl.Expr"],
b: Union[str, "pl.Expr"],
f: Callable[["pl.Series", "pl.Series"], "pl.Series"],
return_dtype: Optional[Type[DataType]] = None,
) -> "pl.Expr":
"""
.. deprecated:: 0.10.4
use `map` or `apply`
Map a custom function over two columns and produce a single Series result.
Parameters
Expand Down
45 changes: 44 additions & 1 deletion py-polars/src/datatypes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::utils::str_to_polarstype;
use polars::prelude::*;
use pyo3::{FromPyObject, PyAny, PyResult};

// Don't change the order of these!
#[repr(u8)]
Expand Down Expand Up @@ -45,11 +47,52 @@ impl From<&DataType> for PyDataType {
DataType::Time => Time,
DataType::Object(_) => Object,
DataType::Categorical => Categorical,
dt => panic!("datatype: {:?} not supported", dt),
DataType::Null => {
panic!("null not expected here")
}
}
}
}

impl From<DataType> for PyDataType {
fn from(dt: DataType) -> Self {
(&dt).into()
}
}

impl Into<DataType> for PyDataType {
fn into(self) -> DataType {
use DataType::*;
match self {
PyDataType::Int8 => Int8,
PyDataType::Int16 => Int16,
PyDataType::Int32 => Int32,
PyDataType::Int64 => Int64,
PyDataType::UInt8 => UInt8,
PyDataType::UInt16 => UInt16,
PyDataType::UInt32 => UInt32,
PyDataType::UInt64 => UInt64,
PyDataType::Float32 => Float32,
PyDataType::Float64 => Float64,
PyDataType::Bool => Boolean,
PyDataType::Utf8 => Utf8,
PyDataType::List => List(DataType::Null.into()),
PyDataType::Date => Date,
PyDataType::Datetime => Datetime,
PyDataType::Time => Time,
PyDataType::Object => Object("object"),
PyDataType::Categorical => Categorical,
}
}
}

impl FromPyObject<'_> for PyDataType {
fn extract(ob: &PyAny) -> PyResult<Self> {
let str_repr = ob.str().unwrap().to_str().unwrap();
Ok(str_to_polarstype(str_repr).into())
}
}

pub trait PyPolarsNumericType: PolarsNumericType {}
impl PyPolarsNumericType for UInt8Type {}
impl PyPolarsNumericType for UInt16Type {}
Expand Down
212 changes: 212 additions & 0 deletions py-polars/src/lazy/apply.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use crate::lazy::dsl::PyExpr;
use crate::prelude::PyDataType;
use crate::series::PySeries;
use crate::utils::str_to_polarstype;
use polars::prelude::*;
use pyo3::prelude::*;
use pyo3::types::PyList;

fn get_output_type(obj: &PyAny) -> Option<DataType> {
match obj.is_none() {
true => None,
false => Some(obj.extract::<PyDataType>().unwrap().into()),
}
}

pub(crate) fn call_lambda_with_series(
py: Python,
s: Series,
lambda: &PyObject,
polars_module: &PyObject,
) -> PyObject {
let pypolars = polars_module.cast_as::<PyModule>(py).unwrap();

// create a PySeries struct/object for Python
let pyseries = PySeries::new(s);
// Wrap this PySeries object in the python side Series wrapper
let python_series_wrapper = pypolars
.getattr("wrap_s")
.unwrap()
.call1((pyseries,))
.unwrap();
// call the lambda and get a python side Series wrapper
match lambda.call1(py, (python_series_wrapper,)) {
Ok(pyobj) => pyobj,
Err(e) => panic!("python apply failed: {}", e.pvalue(py).to_string()),
}
}

/// A python lambda taking two Series
pub(crate) fn binary_lambda(lambda: &PyObject, a: Series, b: Series) -> Result<Series> {
let gil = Python::acquire_gil();
let py = gil.python();
// get the pypolars module
let pypolars = PyModule::import(py, "polars").unwrap();
// create a PySeries struct/object for Python
let pyseries_a = PySeries::new(a);
let pyseries_b = PySeries::new(b);

// Wrap this PySeries object in the python side Series wrapper
let python_series_wrapper_a = pypolars
.getattr("wrap_s")
.unwrap()
.call1((pyseries_a,))
.unwrap();
let python_series_wrapper_b = pypolars
.getattr("wrap_s")
.unwrap()
.call1((pyseries_b,))
.unwrap();

// call the lambda and get a python side Series wrapper
let result_series_wrapper =
match lambda.call1(py, (python_series_wrapper_a, python_series_wrapper_b)) {
Ok(pyobj) => pyobj,
Err(e) => panic!(
"custom python function failed: {}",
e.pvalue(py).to_string()
),
};
// unpack the wrapper in a PySeries
let py_pyseries = result_series_wrapper
.getattr(py, "_s")
.expect("Could net get series attribute '_s'. Make sure that you return a Series object.");
// Downcast to Rust
let pyseries = py_pyseries.extract::<PySeries>(py).unwrap();
// Finally get the actual Series
Ok(pyseries.series)
}

pub fn binary_function(
input_a: PyExpr,
input_b: PyExpr,
lambda: PyObject,
output_type: &PyAny,
) -> PyExpr {
let input_a = input_a.inner;
let input_b = input_b.inner;

let output_field = match output_type.is_none() {
true => Field::new("binary_function", DataType::Null),
false => {
let str_repr = output_type.str().unwrap().to_str().unwrap();
let data_type = str_to_polarstype(str_repr);
Field::new("binary_function", data_type)
}
};

let func = move |a: Series, b: Series| binary_lambda(&lambda, a, b);

polars::lazy::dsl::map_binary(input_a, input_b, func, Some(output_field)).into()
}

pub fn map_single(
pyexpr: &PyExpr,
py: Python,
lambda: PyObject,
output_type: &PyAny,
agg_list: bool,
) -> PyExpr {
let output_type = get_output_type(output_type);
// get the pypolars module
// do the import outside of the function to prevent import side effects in a hot loop.
let pypolars = PyModule::import(py, "polars").unwrap().to_object(py);

let function = move |s: Series| {
let gil = Python::acquire_gil();
let py = gil.python();

// this is a python Series
let out = call_lambda_with_series(py, s, &lambda, &pypolars);

// unpack the wrapper in a PySeries
let py_pyseries = out.getattr(py, "_s").expect(
"Could net get series attribute '_s'. \
Make sure that you return a Series object from a custom function.",
);
// Downcast to Rust
let pyseries = py_pyseries.extract::<PySeries>(py).unwrap();
// Finally get the actual Series
Ok(pyseries.series)
};

let output_map = GetOutput::map_field(move |fld| match output_type {
Some(ref dt) => Field::new(fld.name(), dt.clone()),
None => fld.clone(),
});
if agg_list {
pyexpr.clone().inner.map_list(function, output_map).into()
} else {
pyexpr.clone().inner.map(function, output_map).into()
}
}

pub(crate) fn call_lambda_with_series_slice(
py: Python,
s: &mut [Series],
lambda: &PyObject,
polars_module: &PyObject,
) -> PyObject {
let pypolars = polars_module.cast_as::<PyModule>(py).unwrap();

// create a PySeries struct/object for Python
let iter = s.iter().map(|s| {
let ps = PySeries::new(s.clone());

// Wrap this PySeries object in the python side Series wrapper
let python_series_wrapper = pypolars.getattr("wrap_s").unwrap().call1((ps,)).unwrap();

python_series_wrapper
});
let wrapped_s = PyList::new(py, iter);

// call the lambda and get a python side Series wrapper
match lambda.call1(py, (wrapped_s,)) {
Ok(pyobj) => pyobj,
Err(e) => panic!("python apply failed: {}", e.pvalue(py).to_string()),
}
}

pub fn map_mul(
pyexpr: &[PyExpr],
py: Python,
lambda: PyObject,
output_type: &PyAny,
apply_groups: bool,
) -> PyExpr {
let output_type = get_output_type(output_type);

// get the pypolars module
// do the import outside of the function to prevent import side effects in a hot loop.
let pypolars = PyModule::import(py, "polars").unwrap().to_object(py);

let function = move |s: &mut [Series]| {
let gil = Python::acquire_gil();
let py = gil.python();

// this is a python Series
let out = call_lambda_with_series_slice(py, s, &lambda, &pypolars);

// unpack the wrapper in a PySeries
let py_pyseries = out.getattr(py, "_s").expect(
"Could net get series attribute '_s'. \
Make sure that you return a Series object from a custom function.",
);
// Downcast to Rust
let pyseries = py_pyseries.extract::<PySeries>(py).unwrap();
// Finally get the actual Series
Ok(pyseries.series)
};

let exprs = pyexpr.iter().map(|pe| pe.clone().inner).collect::<Vec<_>>();

let output_map = GetOutput::map_field(move |fld| match output_type {
Some(ref dt) => Field::new(fld.name(), dt.clone()),
None => fld.clone(),
});
if apply_groups {
polars::lazy::dsl::apply_mul(function, exprs, output_map).into()
} else {
polars::lazy::dsl::map_mul(function, exprs, output_map).into()
}
}

0 comments on commit 59386ea

Please sign in to comment.