Skip to content

Commit

Permalink
feat(python): allow objects in struct types (#5925)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 28, 2022
1 parent eaeb703 commit fcd5835
Show file tree
Hide file tree
Showing 16 changed files with 175 additions and 30 deletions.
9 changes: 8 additions & 1 deletion polars/polars-core/src/chunked_array/logical/struct_/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ use crate::prelude::*;

impl From<StructChunked> for DataFrame {
fn from(ca: StructChunked) -> Self {
DataFrame::new_no_checks(ca.fields)
#[cfg(feature = "object")]
{
DataFrame::new_no_checks(ca.fields.clone())
}
#[cfg(not(feature = "object"))]
{
DataFrame::new_no_checks(ca.fields)
}
}
}

Expand Down
53 changes: 45 additions & 8 deletions polars/polars-core/src/chunked_array/logical/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,24 @@ pub struct StructChunked {
chunks: Vec<ArrayRef>,
}

fn arrays_to_fields(field_arrays: &[ArrayRef], fields: &[Series]) -> Vec<ArrowField> {
field_arrays
.iter()
.zip(fields)
.map(|(arr, s)| ArrowField::new(s.name(), arr.data_type().clone(), true))
.collect()
}

fn fields_to_struct_array(fields: &[Series]) -> (ArrayRef, Vec<Series>) {
let fields = fields.iter().map(|s| s.rechunk()).collect::<Vec<_>>();

let new_fields = fields.iter().map(|s| s.field().to_arrow()).collect();
let field_arrays = fields.iter().map(|s| s.to_arrow(0)).collect::<Vec<_>>();
let field_arrays = fields
.iter()
.map(|s| s.rechunk().to_arrow(0))
.collect::<Vec<_>>();
// we determine fields from arrays as there might be object arrays
// where the dtype is bound to that single array
let new_fields = arrays_to_fields(&field_arrays, &fields);
let arr = StructArray::new(ArrowDataType::Struct(new_fields), field_arrays, None);
(Box::new(arr), fields)
}
Expand Down Expand Up @@ -82,20 +95,19 @@ impl StructChunked {

// Should be called after append or extend
pub(crate) fn update_chunks(&mut self, offset: usize) {
let new_fields = self
.fields
.iter()
.map(|s| s.field().to_arrow())
.collect::<Vec<_>>();
let n_chunks = self.fields[0].chunks().len();
for i in offset..n_chunks {
let field_arrays = self
.fields
.iter()
.map(|s| s.to_arrow(i))
.collect::<Vec<_>>();

// we determine fields from arrays as there might be object arrays
// where the dtype is bound to that single array
let new_fields = arrays_to_fields(&field_arrays, &self.fields);
let arr = Box::new(StructArray::new(
ArrowDataType::Struct(new_fields.clone()),
ArrowDataType::Struct(new_fields),
field_arrays,
None,
)) as ArrayRef;
Expand Down Expand Up @@ -244,3 +256,28 @@ impl LogicalType for StructChunked {
}
}
}

#[cfg(feature = "object")]
impl Drop for StructChunked {
fn drop(&mut self) {
use crate::chunked_array::object::extension::drop::drop_object_array;
use crate::chunked_array::object::extension::EXTENSION_NAME;
if self
.fields
.iter()
.any(|s| matches!(s.dtype(), DataType::Object(_)))
{
for arr in std::mem::take(&mut self.chunks) {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
for arr in arr.values() {
match arr.data_type() {
ArrowDataType::Extension(name, _, _) if name == EXTENSION_NAME => unsafe {
drop_object_array(arr.as_ref())
},
_ => {}
}
}
}
}
}
}
17 changes: 17 additions & 0 deletions polars/polars-core/src/chunked_array/object/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,20 @@ where
Self::new_from_vec(name, vec![])
}
}

/// Convert a Series of dtype object to an Arrow Array of FixedSizeBinary
pub(crate) fn object_series_to_arrow_array(s: &Series) -> ArrayRef {
// The list builder knows how to create an arrow array
// we simply piggy back on that code.

// safety: 0..len is in bounds
let list_s = unsafe {
s.agg_list(&GroupsProxy::Slice {
groups: vec![[0, s.len() as IdxSize]],
rolling: false,
})
};
let arr = &list_s.chunks()[0];
let arr = arr.as_any().downcast_ref::<ListArray<i64>>().unwrap();
arr.values().to_boxed()
}
27 changes: 15 additions & 12 deletions polars/polars-core/src/chunked_array/object/extension/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,22 @@ pub(crate) unsafe fn drop_list(ca: &mut ListChunked) {
let arr = lst_arr.as_any().downcast_ref::<LargeListArray>().unwrap();

let values = arr.values();

let arr = values
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();

// if the buf is not shared with anyone but us
// we can deallocate
let buf = arr.values();
if buf.shared_count_strong() == 1 {
PolarsExtension::new(arr.clone());
};
drop_object_array(values.as_ref())
}
}
}
}

pub(crate) unsafe fn drop_object_array(values: &dyn Array) {
let arr = values
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.unwrap();

// if the buf is not shared with anyone but us
// we can deallocate
let buf = arr.values();
if buf.shared_count_strong() == 1 {
PolarsExtension::new(arr.clone());
};
}
9 changes: 4 additions & 5 deletions polars/polars-core/src/chunked_array/object/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use polars_extension::PolarsExtension;
use crate::prelude::*;
use crate::PROCESS_ID;

pub const EXTENSION_NAME: &str = "POLARS_EXTENSION_TYPE";

/// Invariants
/// `ptr` must point to start a `T` allocation
/// `n_t_vals` must represent the correct number of `T` values in that allocation
Expand Down Expand Up @@ -115,11 +117,8 @@ pub(crate) fn create_extension<
let metadata = format!("{};{}", *PROCESS_ID, et_ptr as usize);

let physical_type = ArrowDataType::FixedSizeBinary(t_size);
let extension_type = ArrowDataType::Extension(
"POLARS_EXTENSION_TYPE".into(),
physical_type.into(),
Some(metadata),
);
let extension_type =
ArrowDataType::Extension(EXTENSION_NAME.into(), physical_type.into(), Some(metadata));
// first freeze, otherwise we compute null
let validity = if null_count > 0 {
Some(validity.into())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ pub struct PolarsExtension {
}

impl PolarsExtension {
/// This is very expensive
pub(crate) unsafe fn arr_to_av(arr: &FixedSizeBinaryArray, i: usize) -> AnyValue {
let arr = arr.slice(i, 1);
let pe = Self::new(arr);
let pe = ManuallyDrop::new(pe);
pe.get_series("").get(0).unwrap().into_static().unwrap()
}

pub(crate) unsafe fn new(array: FixedSizeBinaryArray) -> Self {
Self { array: Some(array) }
}
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/src/chunked_array/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub trait PolarsObjectSafe: Any + Debug + Send + Sync + Display {
fn type_name(&self) -> &'static str;

fn as_any(&self) -> &dyn Any;

fn to_boxed(&self) -> Box<dyn PolarsObjectSafe>;
}

/// Values need to implement this so that they can be stored into a Series and DataFrame
Expand All @@ -48,6 +50,10 @@ impl<T: PolarsObject> PolarsObjectSafe for T {
fn as_any(&self) -> &dyn Any {
self
}

fn to_boxed(&self) -> Box<dyn PolarsObjectSafe> {
Box::new(self.clone())
}
}

impl<T> ObjectArray<T>
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/chunked_array/object/registry.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! This is a heap allocated utility that can be used to register an object type.
//! That object type will know its own generic type parameter `T` and callers can simply
//! send `&Any` values and don't have to know the generic type themselves.
use std::any::Any;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
Expand Down
9 changes: 8 additions & 1 deletion polars/polars-core/src/chunked_array/ops/any_value.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::convert::TryFrom;

#[cfg(feature = "object")]
use crate::chunked_array::object::extension::polars_extension::PolarsExtension;
use crate::prelude::*;

#[inline]
Expand Down Expand Up @@ -100,7 +102,12 @@ pub(crate) unsafe fn arr_to_any_value<'a>(
AnyValue::Time(v)
}
#[cfg(feature = "object")]
DataType::Object(_) => panic!("should not be here"),
DataType::Object(_) => {
// We should almost never hit this. The only known exception is when we put objects in
// structs. Any other hit should be considered a bug.
let arr = &*(arr as *const dyn Array as *const FixedSizeBinaryArray);
PolarsExtension::arr_to_av(arr, idx)
}
dt => panic!("not implemented for {dt:?}"),
}
}
Expand Down
16 changes: 16 additions & 0 deletions polars/polars-core/src/datatypes/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ use polars_utils::unwrap::UnwrapUncheckedRelease;

use super::*;

#[cfg(feature = "object")]
#[derive(Debug)]
pub struct OwnedObject(pub Box<dyn PolarsObjectSafe>);

#[cfg(feature = "object")]
impl Clone for OwnedObject {
fn clone(&self) -> Self {
Self(self.0.to_boxed())
}
}

#[derive(Debug, Clone)]
pub enum AnyValue<'a> {
Null,
Expand Down Expand Up @@ -50,7 +61,10 @@ pub enum AnyValue<'a> {
List(Series),
#[cfg(feature = "object")]
/// Can be used to fmt and implements Any, so can be downcasted to the proper value type.
#[cfg(feature = "object")]
Object(&'a dyn PolarsObjectSafe),
#[cfg(feature = "object")]
ObjectOwned(OwnedObject),
#[cfg(feature = "dtype-struct")]
// 3 pointers and thus not larger than string/vec
Struct(usize, &'a StructArray, &'a [Field]),
Expand Down Expand Up @@ -546,6 +560,8 @@ impl<'a> AnyValue<'a> {
Binary(v) => BinaryOwned(v.to_vec()),
#[cfg(feature = "dtype-binary")]
BinaryOwned(v) => BinaryOwned(v),
#[cfg(feature = "object")]
Object(v) => ObjectOwned(OwnedObject(v.to_boxed())),
dt => {
return Err(PolarsError::ComputeError(
format!("cannot get static AnyValue from {dt}").into(),
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/src/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,8 @@ impl Display for AnyValue<'_> {
AnyValue::List(s) => write!(f, "{}", s.fmt_list()),
#[cfg(feature = "object")]
AnyValue::Object(v) => write!(f, "{v}"),
#[cfg(feature = "object")]
AnyValue::ObjectOwned(v) => write!(f, "{}", v.0.as_ref()),
#[cfg(feature = "dtype-struct")]
av @ AnyValue::Struct(_, _, _) => {
let mut avs = vec![];
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/src/series/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ impl<'a> From<&AnyValue<'a>> for DataType {
Categorical(_, rev_map) => DataType::Categorical(Some(Arc::new((*rev_map).clone()))),
#[cfg(feature = "object")]
Object(o) => DataType::Object(o.type_name()),
#[cfg(feature = "object")]
ObjectOwned(o) => DataType::Object(o.0.type_name()),
}
}
}
4 changes: 3 additions & 1 deletion polars/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use polars_arrow::kernels::concatenate::concatenate_owned_unchecked;
use crate::chunked_array::cast::cast_chunks;
#[cfg(feature = "object")]
use crate::chunked_array::object::extension::polars_extension::PolarsExtension;
#[cfg(feature = "object")]
use crate::chunked_array::object::extension::EXTENSION_NAME;
use crate::prelude::*;

impl Series {
Expand Down Expand Up @@ -299,7 +301,7 @@ impl Series {
Ok(CategoricalChunked::from_keys_and_values(name, keys, values).into_series())
}
#[cfg(feature = "object")]
ArrowDataType::Extension(s, _, Some(_)) if s == "POLARS_EXTENSION_TYPE" => {
ArrowDataType::Extension(s, _, Some(_)) if s == EXTENSION_NAME => {
assert_eq!(chunks.len(), 1);
let arr = chunks[0]
.as_any()
Expand Down
16 changes: 16 additions & 0 deletions polars/polars-core/src/series/into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ impl Series {
}
#[cfg(feature = "dtype-time")]
DataType::Time => cast(&*self.chunks()[chunk_idx], &DataType::Time.to_arrow()).unwrap(),
#[cfg(feature = "object")]
DataType::Object(_) => {
use crate::chunked_array::object::builder::object_series_to_arrow_array;
if self.chunks().len() == 1 && chunk_idx == 0 {
object_series_to_arrow_array(self)
} else {
// we slice the series to only that chunk
let offset = self.chunks()[..chunk_idx]
.iter()
.map(|arr| arr.len())
.sum::<usize>() as i64;
let len = self.chunks()[chunk_idx].len();
let s = self.slice(offset, len);
object_series_to_arrow_array(&s)
}
}
_ => self.array_ref(chunk_idx).clone(),
}
}
Expand Down
9 changes: 7 additions & 2 deletions py-polars/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,13 @@ impl IntoPy<PyObject> for Wrap<AnyValue<'_>> {
AnyValue::StructOwned(payload) => struct_dict(py, payload.0.into_iter(), &payload.1),
#[cfg(feature = "object")]
AnyValue::Object(v) => {
let s = format!("{v}");
s.into_py(py)
let object = v.as_any().downcast_ref::<ObjectValue>().unwrap();
object.inner.clone()
}
#[cfg(feature = "object")]
AnyValue::ObjectOwned(v) => {
let object = v.0.as_any().downcast_ref::<ObjectValue>().unwrap();
object.inner.clone()
}
AnyValue::Binary(v) => v.into_py(py),
AnyValue::BinaryOwned(v) => v.into_py(py),
Expand Down
15 changes: 15 additions & 0 deletions py-polars/tests/unit/test_object.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import numpy as np

import polars as pl


Expand Down Expand Up @@ -34,3 +36,16 @@ def test_object_empty_filter_5911() -> None:
out = empty_df.select(["pet_obj"])
assert out.dtypes == [pl.Object]
assert out.shape == (0, 1)


def test_object_in_struct() -> None:
np_a = np.array([1, 2, 3])
np_b = np.array([4, 5, 6])
df = pl.DataFrame({"A": [1, 2], "B": pl.Series([np_a, np_b], dtype=pl.Object)})

out = df.select([pl.struct(["B"]).alias("foo")]).to_dict(False)
arr = out["foo"][0]["B"]
assert isinstance(arr, np.ndarray)
assert (arr == np_a).sum() == 3
arr = out["foo"][1]["B"]
assert (arr == np_b).sum() == 3

0 comments on commit fcd5835

Please sign in to comment.