Skip to content

Commit

Permalink
python apply can extract anyvalues (#3071)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 6, 2022
1 parent 0363e97 commit 7e085cd
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 1 deletion.
218 changes: 218 additions & 0 deletions py-polars/src/apply/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ fn infer_and_finish<'a, A: ApplyLambda<'a>>(
Some(first_value),
)
.map(|ca| ca.into_series().into())
} else if let Ok(av) = out.extract::<Wrap<AnyValue>>() {
applyer
.apply_extract_any_values(py, lambda, null_count, av.0)
.map(|s| s.into())
} else {
applyer
.apply_lambda_with_object_out_type(
Expand Down Expand Up @@ -150,6 +154,14 @@ pub trait ApplyLambda<'a> {
dt: &DataType,
) -> PyResult<ListChunked>;

fn apply_extract_any_values(
&'a self,
py: Python,
lambda: &'a PyAny,
init_null_count: usize,
first_value: AnyValue<'a>,
) -> PyResult<Series>;

/// Apply a lambda with list output type
fn apply_lambda_with_object_out_type(
&'a self,
Expand Down Expand Up @@ -404,6 +416,40 @@ impl<'a> ApplyLambda<'a> for BooleanChunked {
}
}

fn apply_extract_any_values(
&'a self,
py: Python,
lambda: &'a PyAny,
init_null_count: usize,
first_value: AnyValue<'a>,
) -> PyResult<Series> {
let mut avs = Vec::with_capacity(self.len());
avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count));
avs.push(first_value);

if self.null_count() > 0 {
let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| {
let out_wrapped = match opt_val {
None => Wrap(AnyValue::Null),
Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(),
};
out_wrapped.0
});
avs.extend(iter);
} else {
let iter = self
.into_no_null_iter()
.skip(init_null_count + 1)
.map(|val| {
call_lambda_and_extract::<_, Wrap<AnyValue>>(py, lambda, val)
.unwrap()
.0
});
avs.extend(iter);
}
Ok(Series::new(self.name(), &avs))
}

fn apply_lambda_with_object_out_type(
&'a self,
py: Python,
Expand Down Expand Up @@ -665,6 +711,40 @@ where
}
}

fn apply_extract_any_values(
&'a self,
py: Python,
lambda: &'a PyAny,
init_null_count: usize,
first_value: AnyValue<'a>,
) -> PyResult<Series> {
let mut avs = Vec::with_capacity(self.len());
avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count));
avs.push(first_value);

if self.null_count() > 0 {
let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| {
let out_wrapped = match opt_val {
None => Wrap(AnyValue::Null),
Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(),
};
out_wrapped.0
});
avs.extend(iter);
} else {
let iter = self
.into_no_null_iter()
.skip(init_null_count + 1)
.map(|val| {
call_lambda_and_extract::<_, Wrap<AnyValue>>(py, lambda, val)
.unwrap()
.0
});
avs.extend(iter);
}
Ok(Series::new(self.name(), &avs))
}

fn apply_lambda_with_object_out_type(
&'a self,
py: Python,
Expand Down Expand Up @@ -920,6 +1000,40 @@ impl<'a> ApplyLambda<'a> for Utf8Chunked {
}
}

fn apply_extract_any_values(
&'a self,
py: Python,
lambda: &'a PyAny,
init_null_count: usize,
first_value: AnyValue<'a>,
) -> PyResult<Series> {
let mut avs = Vec::with_capacity(self.len());
avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count));
avs.push(first_value);

if self.null_count() > 0 {
let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| {
let out_wrapped = match opt_val {
None => Wrap(AnyValue::Null),
Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(),
};
out_wrapped.0
});
avs.extend(iter);
} else {
let iter = self
.into_no_null_iter()
.skip(init_null_count + 1)
.map(|val| {
call_lambda_and_extract::<_, Wrap<AnyValue>>(py, lambda, val)
.unwrap()
.0
});
avs.extend(iter);
}
Ok(Series::new(self.name(), &avs))
}

fn apply_lambda_with_object_out_type(
&'a self,
py: Python,
Expand Down Expand Up @@ -1383,6 +1497,52 @@ impl<'a> ApplyLambda<'a> for ListChunked {
))
}
}

fn apply_extract_any_values(
&'a self,
py: Python,
lambda: &'a PyAny,
init_null_count: usize,
first_value: AnyValue<'a>,
) -> PyResult<Series> {
let pypolars = PyModule::import(py, "polars")?;
let mut avs = Vec::with_capacity(self.len());
avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count));
avs.push(first_value);

let call_with_value = |val: Series| {
// create a PySeries struct/object for Python
let pyseries = PySeries::new(val);
// Wrap this PySeries object in the python side Series wrapper
let python_series_wrapper = pypolars
.getattr("wrap_s")
.unwrap()
.call1((pyseries,))
.unwrap();
call_lambda_and_extract::<_, Wrap<AnyValue>>(py, lambda, python_series_wrapper)
.unwrap()
.0
};

if self.null_count() > 0 {
let iter = self
.into_iter()
.skip(init_null_count + 1)
.map(|opt_val| match opt_val {
None => AnyValue::Null,
Some(val) => call_with_value(val),
});
avs.extend(iter);
} else {
let iter = self
.into_no_null_iter()
.skip(init_null_count + 1)
.map(|val| call_with_value(val));
avs.extend(iter);
}
Ok(Series::new(self.name(), &avs))
}

fn apply_lambda_with_object_out_type(
&'a self,
py: Python,
Expand Down Expand Up @@ -1647,6 +1807,40 @@ impl<'a> ApplyLambda<'a> for ObjectChunked<ObjectValue> {
}
}

fn apply_extract_any_values(
&'a self,
py: Python,
lambda: &'a PyAny,
init_null_count: usize,
first_value: AnyValue<'a>,
) -> PyResult<Series> {
let mut avs = Vec::with_capacity(self.len());
avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count));
avs.push(first_value);

if self.null_count() > 0 {
let iter = self.into_iter().skip(init_null_count + 1).map(|opt_val| {
let out_wrapped = match opt_val {
None => Wrap(AnyValue::Null),
Some(val) => call_lambda_and_extract(py, lambda, val).unwrap(),
};
out_wrapped.0
});
avs.extend(iter);
} else {
let iter = self
.into_no_null_iter()
.skip(init_null_count + 1)
.map(|val| {
call_lambda_and_extract::<_, Wrap<AnyValue>>(py, lambda, val)
.unwrap()
.0
});
avs.extend(iter);
}
Ok(Series::new(self.name(), &avs))
}

fn apply_lambda_with_object_out_type(
&'a self,
py: Python,
Expand Down Expand Up @@ -1837,6 +2031,30 @@ impl<'a> ApplyLambda<'a> for StructChunked {
self.len(),
))
}

fn apply_extract_any_values(
&'a self,
py: Python,
lambda: &'a PyAny,
init_null_count: usize,
first_value: AnyValue<'a>,
) -> PyResult<Series> {
let names = self.fields().iter().map(|s| s.name()).collect::<Vec<_>>();
let mut avs = Vec::with_capacity(self.len());
avs.extend(std::iter::repeat(AnyValue::Null).take(init_null_count));
avs.push(first_value);

let iter = self.into_iter().skip(init_null_count + 1).map(|val| {
let arg = make_dict_arg(py, &names, val);
call_lambda_and_extract::<_, Wrap<AnyValue>>(py, lambda, arg)
.unwrap()
.0
});
avs.extend(iter);

Ok(Series::new(self.name(), &avs))
}

fn apply_lambda_with_object_out_type(
&'a self,
py: Python,
Expand Down
16 changes: 16 additions & 0 deletions py-polars/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,22 @@ impl ToPyObject for Wrap<&StructChunked> {
}
}

impl ToPyObject for Wrap<&DurationChunked> {
fn to_object(&self, py: Python) -> PyObject {
let pl = PyModule::import(py, "polars").unwrap();
let pl_utils = pl.getattr("utils").unwrap();
let convert = pl_utils.getattr("_to_python_timedelta").unwrap();

let tu = Wrap(self.0.time_unit()).to_object(py);

let iter = self
.0
.into_iter()
.map(|opt_v| opt_v.map(|v| convert.call1((v, &tu)).unwrap()));
PyList::new(py, iter).into_py(py)
}
}

impl ToPyObject for Wrap<&DatetimeChunked> {
fn to_object(&self, py: Python) -> PyObject {
let pl = PyModule::import(py, "polars").unwrap();
Expand Down
4 changes: 4 additions & 0 deletions py-polars/src/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,10 @@ impl PySeries {
let ca = series.struct_().unwrap();
return Wrap(ca).to_object(python);
}
DataType::Duration(_) => {
let ca = series.duration().unwrap();
return Wrap(ca).to_object(python);
}
dt => primitive_to_list(dt, series),
};
pylist.to_object(python)
Expand Down
11 changes: 11 additions & 0 deletions py-polars/tests/test_datelike.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,3 +794,14 @@ def test_asof_join() -> None:
assert quotes.join_asof(trades, on="dates", strategy="forward", tolerance="5ms")[
"bid_right"
].to_list() == [51.95, 51.95, None, None, 720.77, None, None, None]


def test_lambda_with_python_datetime_return_type() -> None:
df = pl.DataFrame({"timestamp": [1284286794, 1234567890]})

assert df.with_column(
pl.col("timestamp").apply(lambda x: datetime(2010, 9, 12)).alias("my_date_time")
)["my_date_time"].to_list() == [
datetime(2010, 9, 12),
datetime(2010, 9, 12),
]
13 changes: 12 additions & 1 deletion py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,18 @@ def test_hashing_on_python_objects() -> None:
# this requires that the hashing and aggregations are done on python objects

df = pl.DataFrame({"a": [1, 1, 3, 4], "b": [1, 1, 2, 2]})
df = df.with_column(pl.col("a").apply(lambda x: datetime(2021, 1, 1)).alias("foo"))

class Foo:
def __init__(self): # type: ignore
pass

def __hash__(self): # type: ignore
return 0

def __eq__(self, other): # type: ignore
return True

df = df.with_column(pl.col("a").apply(lambda x: Foo()).alias("foo"))
assert df.groupby(["foo"]).first().shape == (1, 3)
assert df.distinct().shape == (3, 3)

Expand Down

0 comments on commit 7e085cd

Please sign in to comment.