Skip to content

Commit

Permalink
refactor rows apply
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 2, 2022
1 parent bdaf4d1 commit bdf5176
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 85 deletions.
7 changes: 5 additions & 2 deletions polars/polars-core/src/frame/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::prelude::*;
use crate::utils::get_supertype;
use std::fmt::{Debug, Formatter};

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Row<'a>(pub Vec<AnyValue<'a>>);

impl<'a> Row<'a> {
Expand Down Expand Up @@ -56,7 +56,10 @@ impl DataFrame {
Self::from_rows_iter_and_schema(rows.iter(), schema)
}

fn from_rows_iter_and_schema<'a, I>(mut rows: I, schema: &Schema) -> Result<Self>
/// Create a new DataFrame from an iterator over rows. This should only be used when you have row wise data,
/// as this is a lot slower than creating the `Series` in a columnar fashion
#[cfg_attr(docsrs, doc(cfg(feature = "rows")))]
pub fn from_rows_iter_and_schema<'a, I>(mut rows: I, schema: &Schema) -> Result<Self>
where
I: Iterator<Item = &'a Row<'a>>,
{
Expand Down
20 changes: 8 additions & 12 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2716,11 +2716,10 @@ def join(

def apply(
self,
f: Callable[[Tuple[Any]], Any],
f: Callable[[Tuple[Any, ...]], Any],
return_dtype: Optional[Type[DataType]] = None,
batch_size: int = 2048,
rechunk: bool = True,
) -> Union["pli.Series", "DataFrame"]:
inference_size: int = 256,
) -> "DataFrame":
"""
Apply a custom function over the rows of the DataFrame. The rows are passed as tuple.
Expand All @@ -2732,19 +2731,16 @@ def apply(
Custom function/ lambda function.
return_dtype
Output type of the operation. If none given, Polars tries to infer the type.
batch_size
Only used in the case when the custom function returns rows. This sets the batch size in which
sub dataframes are created
rechunk
Only used in the case when the custom function returns rows. This rechunks the DataFrame when the apply
is finished
inference_size
Only used in the case when the custom function returns rows.
This uses the first `n` rows to determine the output schema
"""
out, is_df = self._df.apply(f, return_dtype, batch_size, rechunk)
out, is_df = self._df.apply(f, return_dtype, inference_size)
if is_df:
return wrap_df(out)
else:
return pli.wrap_s(out)
return pli.wrap_s(out).to_frame()

def with_column(self, column: Union["pli.Series", "pli.Expr"]) -> "DataFrame":
"""
Expand Down
134 changes: 69 additions & 65 deletions py-polars/src/apply/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use crate::error::PyPolarsEr;
use crate::series::PySeries;
use crate::PyDataFrame;
use polars::prelude::*;
use polars_core::frame::row::Row;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::frame::row::{rows_to_schema, Row};
use pyo3::conversion::{FromPyObject, IntoPy};
use pyo3::prelude::*;
use pyo3::types::{PyBool, PyFloat, PyInt, PyList, PyString, PyTuple};
Expand All @@ -15,8 +14,7 @@ pub fn apply_lambda_unknown<'a>(
df: &'a DataFrame,
py: Python,
lambda: &'a PyAny,
batch_size: usize,
rechunk: bool,
inference_size: usize,
) -> PyResult<(PyObject, bool)> {
let columns = df.get_columns();
let mut null_count = 0;
Expand Down Expand Up @@ -104,8 +102,7 @@ pub fn apply_lambda_unknown<'a>(
lambda,
null_count,
first_value,
batch_size,
rechunk,
inference_size,
)
.map_err(PyPolarsEr::from)?,
)
Expand All @@ -126,10 +123,31 @@ Then return a Series object."
Err(PyPolarsEr::Other("Could not determine output type".into()).into())
}

fn apply_iter<'a, T>(
df: &'a DataFrame,
py: Python<'a>,
lambda: &'a PyAny,
init_null_count: usize,
skip: usize,
) -> impl Iterator<Item = Option<T>> + 'a
where
T: FromPyObject<'a>,
{
let columns = df.get_columns();
((init_null_count + skip)..df.height()).map(move |idx| {
let iter = columns.iter().map(|s: &Series| Wrap(s.get(idx)));
let tpl = (PyTuple::new(py, iter),);
match lambda.call1(tpl) {
Ok(val) => val.extract::<T>().ok(),
Err(e) => panic!("python function failed {}", e),
}
})
}

/// Apply a lambda with a primitive output type
pub fn apply_lambda_with_primitive_out_type<'a, D>(
df: &'a DataFrame,
py: Python,
py: Python<'a>,
lambda: &'a PyAny,
init_null_count: usize,
first_value: Option<D::Native>,
Expand All @@ -138,20 +156,11 @@ where
D: PyArrowPrimitiveType,
D::Native: ToPyObject + FromPyObject<'a>,
{
let columns = df.get_columns();

let skip = if first_value.is_some() { 1 } else { 0 };
if init_null_count == df.height() {
ChunkedArray::full_null("apply", df.height())
} else {
let iter = ((init_null_count + skip)..df.height()).map(|idx| {
let iter = columns.iter().map(|s: &Series| Wrap(s.get(idx)));
let tpl = (PyTuple::new(py, iter),);
match lambda.call1(tpl) {
Ok(val) => val.extract::<D::Native>().ok(),
Err(e) => panic!("python function failed {}", e),
}
});
let iter = apply_iter(df, py, lambda, init_null_count, skip);
iterator_to_primitive(iter, init_null_count, first_value, "apply", df.height())
}
}
Expand All @@ -164,20 +173,11 @@ pub fn apply_lambda_with_bool_out_type<'a>(
init_null_count: usize,
first_value: Option<bool>,
) -> ChunkedArray<BooleanType> {
let columns = df.get_columns();

let skip = if first_value.is_some() { 1 } else { 0 };
if init_null_count == df.height() {
ChunkedArray::full_null("apply", df.height())
} else {
let iter = ((init_null_count + skip)..df.height()).map(|idx| {
let iter = columns.iter().map(|s: &Series| Wrap(s.get(idx)));
let tpl = (PyTuple::new(py, iter),);
match lambda.call1(tpl) {
Ok(val) => val.extract::<bool>().ok(),
Err(e) => panic!("python function failed {}", e),
}
});
let iter = apply_iter(df, py, lambda, init_null_count, skip);
iterator_to_bool(iter, init_null_count, first_value, "apply", df.height())
}
}
Expand All @@ -190,20 +190,11 @@ pub fn apply_lambda_with_utf8_out_type<'a>(
init_null_count: usize,
first_value: Option<&str>,
) -> Utf8Chunked {
let columns = df.get_columns();

let skip = if first_value.is_some() { 1 } else { 0 };
if init_null_count == df.height() {
ChunkedArray::full_null("apply", df.height())
} else {
let iter = ((init_null_count + skip)..df.height()).map(|idx| {
let iter = columns.iter().map(|s: &Series| Wrap(s.get(idx)));
let tpl = (PyTuple::new(py, iter),);
match lambda.call1(tpl) {
Ok(val) => val.extract::<&str>().ok(),
Err(e) => panic!("python function failed {}", e),
}
});
let iter = apply_iter::<&str>(df, py, lambda, init_null_count, skip);
iterator_to_utf8(iter, init_null_count, first_value, "apply", df.height())
}
}
Expand Down Expand Up @@ -250,48 +241,61 @@ pub fn apply_lambda_with_rows_output<'a>(
lambda: &'a PyAny,
init_null_count: usize,
first_value: Row<'a>,
batch_size: usize,
rechunk: bool,
inference_size: usize,
) -> Result<DataFrame> {
let columns = df.get_columns();
let width = first_value.0.len();
let null_row = Row::new(vec![AnyValue::Null; width]);

let mut row_buf = Row::default();

let skip = 1;
let mut row_iter = ((init_null_count + skip)..df.height()).map(|idx| {
let iter = columns.iter().map(|s: &Series| Wrap(s.get(idx)));
let tpl = (PyTuple::new(py, iter),);
match lambda.call1(tpl) {
Ok(val) => val
.extract::<Wrap<Row>>()
.map(|r| r.0)
.unwrap_or_else(|_| null_row.clone()),
Ok(val) => {
match val.downcast::<PyTuple>().ok() {
Some(tuple) => {
row_buf.0.clear();
for v in tuple {
let v = v.extract::<Wrap<AnyValue>>().unwrap().0;
row_buf.0.push(v);
}
let ptr = &row_buf as *const Row;
// Safety:
// we know that row constructor of polars dataframe does not keep a reference
// to the row. Before we mutate the row buf again, the reference is dropped.
// we only cannot prove it to the compiler.
// we still to this because it save a Vec allocation in a hot loop.
unsafe { &*ptr }
}
None => &null_row,
}
}
Err(e) => panic!("python function failed {}", e),
}
});
let mut buf = Vec::with_capacity(batch_size);
buf.push(first_value);

buf.extend((&mut row_iter).take(batch_size));
let df = DataFrame::from_rows(&buf)?;
let schema = df.schema();

let mut dfs = Vec::with_capacity(df.height() / batch_size + 1);
dfs.push(df);

loop {
buf.clear();
buf.extend((&mut row_iter).take(batch_size));
if buf.is_empty() {
break;
}
let df = DataFrame::from_rows_and_schema(&buf, &schema)?;
dfs.push(df);
}
// first rows for schema inference
let mut buf = Vec::with_capacity(inference_size);
buf.push(first_value);
buf.extend((&mut row_iter).take(inference_size).cloned());
let schema = rows_to_schema(&buf);

let mut df = accumulate_dataframes_vertical(dfs.into_iter())?;
if rechunk {
df.rechunk();
if init_null_count > 0 {
// Safety: we know the iterators size
let iter = unsafe {
(0..init_null_count)
.map(|_| &null_row)
.chain(buf.iter())
.chain(row_iter)
.trust_my_length(df.height())
};
DataFrame::from_rows_iter_and_schema(iter, &schema)
} else {
// Safety: we know the iterators size
let iter = unsafe { buf.iter().chain(row_iter).trust_my_length(df.height()) };
DataFrame::from_rows_iter_and_schema(iter, &schema)
}
Ok(df)
}
5 changes: 2 additions & 3 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,7 @@ impl PyDataFrame {
&self,
lambda: &PyAny,
output_type: &PyAny,
batch_size: usize,
rechunk: bool,
inference_size: usize,
) -> PyResult<(PyObject, bool)> {
let gil = Python::acquire_gil();
let py = gil.python();
Expand Down Expand Up @@ -987,7 +986,7 @@ impl PyDataFrame {
Some(DataType::Utf8) => {
apply_lambda_with_utf8_out_type(df, py, lambda, 0, None).into_series()
}
_ => return apply_lambda_unknown(df, py, lambda, batch_size, rechunk),
_ => return apply_lambda_unknown(df, py, lambda, inference_size),
};

Ok((PySeries::from(out).into_py(py), false))
Expand Down
6 changes: 3 additions & 3 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ def test_read_csv_categorical() -> None:

def test_df_apply() -> None:
df = pl.DataFrame({"a": ["foo", "bar", "2"], "b": [1, 2, 3], "c": [1.0, 2.0, 3.0]})
out = df.apply(lambda x: len(x), None)
out = df.apply(lambda x: len(x), None).to_series()
assert out.sum() == 9


Expand Down Expand Up @@ -1251,7 +1251,7 @@ def test_slicing() -> None:

def test_apply_list_return() -> None:
df = pl.DataFrame({"start": [1, 2], "end": [3, 5]})
out = df.apply(lambda r: pl.Series(range(r[0], r[1] + 1))) # type: ignore
out = df.apply(lambda r: pl.Series(range(r[0], r[1] + 1))).to_series()
assert out.to_list() == [[1, 2, 3], [2, 3, 4, 5]]


Expand All @@ -1267,7 +1267,7 @@ def test_apply_dataframe_return() -> None:
"column_3": ["c", "d", None],
}
)
assert out.frame_equal(expected, null_equal=True) # type: ignore
assert out.frame_equal(expected, null_equal=True)


def test_groupby_cat_list() -> None: # noqa: W191,E101
Expand Down

0 comments on commit bdf5176

Please sign in to comment.