Skip to content

Commit

Permalink
make groupby on objects possible
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 29, 2021
1 parent db23fee commit ff78941
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 14 deletions.
27 changes: 26 additions & 1 deletion polars/polars-core/src/chunked_array/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use crate::prelude::*;
use arrow::bitmap::Bitmap;
use std::any::Any;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::sync::Arc;

#[derive(Debug, Clone)]
Expand All @@ -19,7 +20,9 @@ where
pub(crate) len: usize,
}

pub trait PolarsObject: Any + Debug + Clone + Send + Sync + Default + Display {
pub trait PolarsObject:
Any + Debug + Clone + Send + Sync + Default + Display + Hash + PartialEq + Eq
{
fn type_name() -> &'static str;
}

Expand All @@ -46,6 +49,28 @@ where
pub unsafe fn value_unchecked(&self, index: usize) -> &T {
self.values.get_unchecked(index)
}

/// Check validity
///
/// # Safety
/// No bounds checks
#[inline]
pub unsafe fn is_valid_unchecked(&self, i: usize) -> bool {
if let Some(b) = &self.null_bitmap {
b.get_bit_unchecked(i)
} else {
true
}
}

/// Check validity
///
/// # Safety
/// No bounds checks
#[inline]
pub unsafe fn is_null_unchecked(&self, i: usize) -> bool {
!self.is_valid_unchecked(i)
}
}

impl<T> Array for ObjectArray<T>
Expand Down
71 changes: 71 additions & 0 deletions polars/polars-core/src/chunked_array/ops/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,74 @@ impl<'a> ChunkApply<'a, Series, Series> for ListChunked {
});
}
}

#[cfg(feature = "object")]
impl<'a, T> ChunkApply<'a, &'a T, T> for ObjectChunked<T>
where
T: PolarsObject,
{
fn apply_cast_numeric<F, S>(&'a self, _f: F) -> ChunkedArray<S>
where
F: Fn(&'a T) -> S::Native + Copy,
S: PolarsNumericType,
{
todo!()
}

fn branch_apply_cast_numeric_no_null<F, S>(&'a self, _f: F) -> ChunkedArray<S>
where
F: Fn(Option<&'a T>) -> S::Native + Copy,
S: PolarsNumericType,
{
todo!()
}

fn apply<F>(&'a self, f: F) -> Self
where
F: Fn(&'a T) -> T + Copy,
{
let mut ca: ObjectChunked<T> = self.into_iter().map(|opt_v| opt_v.map(f)).collect();
ca.rename(self.name());
ca
}

fn apply_on_opt<F>(&'a self, f: F) -> Self
where
F: Fn(Option<&'a T>) -> Option<T> + Copy,
{
let mut ca: ObjectChunked<T> = self.into_iter().map(f).collect();
ca.rename(self.name());
ca
}

fn apply_with_idx<F>(&'a self, _f: F) -> Self
where
F: Fn((usize, &'a T)) -> T + Copy,
{
todo!()
}

fn apply_with_idx_on_opt<F>(&'a self, _f: F) -> Self
where
F: Fn((usize, Option<&'a T>)) -> Option<T> + Copy,
{
todo!()
}

fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
where
F: Fn(Option<&'a T>, &V) -> V,
{
assert!(slice.len() >= self.len());
let mut idx = 0;
self.downcast_iter().for_each(|arr| {
arr.into_iter().for_each(|opt_val| {
// Safety:
// length asserted above
let item = unsafe { slice.get_unchecked_mut(idx) };
*item = f(opt_val, item);
idx += 1;
})
});
}
}
45 changes: 45 additions & 0 deletions polars/polars-core/src/chunked_array/ops/compare_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crate::chunked_array::ops::take::take_random::{
BoolTakeRandom, BoolTakeRandomSingleChunk, NumTakeRandomChunked, NumTakeRandomCont,
NumTakeRandomSingleChunk, Utf8TakeRandom, Utf8TakeRandomSingleChunk,
};
#[cfg(feature = "object")]
use crate::chunked_array::ops::take::take_random::{ObjectTakeRandom, ObjectTakeRandomSingleChunk};
use crate::prelude::*;
use std::cmp::{Ordering, PartialEq};

Expand Down Expand Up @@ -284,3 +286,46 @@ impl<'a> IntoPartialOrdInner<'a> for &'a CategoricalChunked {
unimplemented!()
}
}

#[cfg(feature = "object")]
impl<'a, T> PartialEqInner for ObjectTakeRandom<'a, T>
where
T: PolarsObject,
{
#[inline]
unsafe fn eq_element_unchecked(&self, idx_a: usize, idx_b: usize) -> bool {
self.get(idx_a) == self.get(idx_b)
}
}

#[cfg(feature = "object")]
impl<'a, T> PartialEqInner for ObjectTakeRandomSingleChunk<'a, T>
where
T: PolarsObject,
{
#[inline]
unsafe fn eq_element_unchecked(&self, idx_a: usize, idx_b: usize) -> bool {
self.get(idx_a) == self.get(idx_b)
}
}

#[cfg(feature = "object")]
impl<'a, T: PolarsObject> IntoPartialEqInner<'a> for &'a ObjectChunked<T> {
fn into_partial_eq_inner(self) -> Box<dyn PartialEqInner + 'a> {
match self.chunks.len() {
1 => {
let arr = self.downcast_iter().next().unwrap();
let t = ObjectTakeRandomSingleChunk { arr };
Box::new(t)
}
_ => {
let chunks = self.downcast_chunks();
let t = ObjectTakeRandom {
chunks,
chunk_lens: self.chunks.iter().map(|a| a.len() as u32).collect(),
};
Box::new(t)
}
}
}
}
12 changes: 6 additions & 6 deletions polars/polars-core/src/chunked_array/ops/take/take_random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,8 @@ impl<'a> TakeRandom for ListTakeRandomSingleChunk<'a> {

#[cfg(feature = "object")]
pub struct ObjectTakeRandom<'a, T: PolarsObject> {
chunks: Vec<&'a ObjectArray<T>>,
chunk_lens: Vec<u32>,
pub(crate) chunks: Chunks<'a, ObjectArray<T>>,
pub(crate) chunk_lens: Vec<u32>,
}

#[cfg(feature = "object")]
Expand All @@ -465,7 +465,7 @@ impl<'a, T: PolarsObject> TakeRandom for ObjectTakeRandom<'a, T> {

#[cfg(feature = "object")]
pub struct ObjectTakeRandomSingleChunk<'a, T: PolarsObject> {
arr: &'a ObjectArray<T>,
pub(crate) arr: &'a ObjectArray<T>,
}

#[cfg(feature = "object")]
Expand Down Expand Up @@ -493,15 +493,15 @@ impl<'a, T: PolarsObject> IntoTakeRandom<'a> for &'a ObjectChunked<T> {
type TakeRandom = TakeRandBranch2<ObjectTakeRandomSingleChunk<'a, T>, ObjectTakeRandom<'a, T>>;

fn take_rand(&self) -> Self::TakeRandom {
let mut chunks = self.downcast_iter();
let chunks = self.downcast_chunks();
if self.chunks.len() == 1 {
let t = ObjectTakeRandomSingleChunk {
arr: chunks.next().unwrap(),
arr: chunks.get(0).unwrap(),
};
TakeRandBranch2::Single(t)
} else {
let t = ObjectTakeRandom {
chunks: chunks.collect(),
chunks,
chunk_lens: self.chunks.iter().map(|a| a.len() as u32).collect(),
};
TakeRandBranch2::Multi(t)
Expand Down
9 changes: 8 additions & 1 deletion polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,14 @@ impl IntoGroupTuples for CategoricalChunked {

impl IntoGroupTuples for ListChunked {}
#[cfg(feature = "object")]
impl<T> IntoGroupTuples for ObjectChunked<T> {}
impl<T> IntoGroupTuples for ObjectChunked<T>
where
T: PolarsObject,
{
fn group_tuples(&self, _multithreaded: bool) -> GroupTuples {
groupby(self.into_iter())
}
}

/// Used to tightly two 32 bit values and null information
/// Only the bit values matter, not the meaning of the bits
Expand Down
18 changes: 18 additions & 0 deletions polars/polars-core/src/series/implementations/object.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::chunked_array::object::compare_inner::{IntoPartialEqInner, PartialEqInner};
use crate::chunked_array::ChunkIdIter;
use crate::fmt::FmtList;
use crate::frame::groupby::{GroupTuples, IntoGroupTuples};
use crate::prelude::*;
use crate::series::implementations::SeriesWrap;
use crate::series::private::{PrivateSeries, PrivateSeriesNumeric};
use ahash::RandomState;
use arrow::array::ArrayRef;
use std::any::Any;
use std::borrow::Cow;
Expand Down Expand Up @@ -33,6 +36,21 @@ where
Some(val) => Cow::Owned(format!("{}", val)),
}
}
fn into_partial_eq_inner<'a>(&'a self) -> Box<dyn PartialEqInner + 'a> {
(&self.0).into_partial_eq_inner()
}

fn vec_hash(&self, random_state: RandomState) -> AlignedVec<u64> {
self.0.vec_hash(random_state)
}

fn vec_hash_combine(&self, build_hasher: RandomState, hashes: &mut [u64]) {
self.0.vec_hash_combine(build_hasher, hashes)
}

fn group_tuples(&self, multithreaded: bool) -> GroupTuples {
IntoGroupTuples::group_tuples(&self.0, multithreaded)
}
}
#[cfg(feature = "object")]
#[cfg_attr(docsrs, doc(cfg(feature = "object")))]
Expand Down
36 changes: 36 additions & 0 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,42 @@ impl VecHash for Float64Chunked {

impl VecHash for ListChunked {}

#[cfg(feature = "object")]
impl<T> VecHash for ObjectChunked<T>
where
T: PolarsObject,
{
fn vec_hash(&self, random_state: RandomState) -> AlignedVec<u64> {
// Note that we don't use the no null branch! This can break in unexpected ways.
// for instance with threading we split an array in n_threads, this may lead to
// splits that have no nulls and splits that have nulls. Then one array is hashed with
// Option<T> and the other array with T.
// Meaning that they cannot be compared. By always hashing on Option<T> the random_state is
// the only deterministic seed.
let mut av = AlignedVec::with_capacity(self.len());

self.downcast_iter().for_each(|arr| {
av.extend(arr.into_iter().map(|opt_v| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
hasher.finish()
}))
});
av
}

fn vec_hash_combine(&self, random_state: RandomState, hashes: &mut [u64]) {
self.apply_to_slice(
|opt_v, h| {
let mut hasher = random_state.build_hasher();
opt_v.hash(&mut hasher);
boost_hash_combine(hasher.finish(), *h)
},
hashes,
)
}
}

// Used to to get a u64 from the hashing keys
// We need to modify the hashing algorithm to use the hash for this and only compute the hash once.
pub(crate) trait AsU64 {
Expand Down
32 changes: 32 additions & 0 deletions py-polars/src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use crate::prelude::*;
use crate::series::PySeries;
use polars::frame::row::Row;
use polars::prelude::AnyValue;
use pyo3::basic::CompareOp;
use pyo3::conversion::{FromPyObject, IntoPy};
use pyo3::prelude::*;
use pyo3::types::PySequence;
use pyo3::{PyAny, PyResult};
use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};

#[repr(transparent)]
pub struct Wrap<T>(pub T);
Expand Down Expand Up @@ -181,6 +183,36 @@ pub struct ObjectValue {
pub inner: PyObject,
}

impl Hash for ObjectValue {
fn hash<H: Hasher>(&self, state: &mut H) {
let gil = Python::acquire_gil();
let python = gil.python();
let h = self
.inner
.as_ref(python)
.hash()
.expect("should be hashable");
state.write_isize(h)
}
}

impl Eq for ObjectValue {}

impl PartialEq for ObjectValue {
fn eq(&self, other: &Self) -> bool {
let gil = Python::acquire_gil();
let py = gil.python();
match self
.inner
.as_ref(py)
.rich_compare(other.inner.as_ref(py), CompareOp::Eq)
{
Ok(result) => result.is_true().unwrap(),
Err(_) => false,
}
}
}

impl Display for ObjectValue {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
Expand Down

0 comments on commit ff78941

Please sign in to comment.