Skip to content

Commit

Permalink
improve and extend UnstableSeries
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 25, 2021
1 parent f8de9ce commit 63fde06
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 73 deletions.
45 changes: 12 additions & 33 deletions polars/polars-core/src/chunked_array/list/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,34 @@
use crate::prelude::*;
use crate::series::unstable::{ArrayBox, UnstableSeries};
use crate::utils::CustomIterTools;
use arrow::array::ArrayRef;
use std::convert::TryFrom;
use std::marker::PhantomData;
use std::pin::Pin;

/// A wrapper type that should make it a bit more clear that we should not clone Series
#[derive(Debug)]
#[cfg(feature = "private")]
pub struct UnsafeSeries<'a>(&'a Series);

/// We don't implement Deref so that the caller is aware of converting to Series
impl AsRef<Series> for UnsafeSeries<'_> {
fn as_ref(&self) -> &Series {
self.0
}
}

type ArrayBox = Box<dyn Array>;

impl UnsafeSeries<'_> {
pub fn clone(&self) {
panic!("don't clone this type, use deep_clone")
}

pub fn deep_clone(&self) -> Series {
let array_ref = self.0.chunks()[0].clone();
Series::try_from((self.0.name(), array_ref)).unwrap()
}
}
use std::ptr::NonNull;

#[cfg(feature = "private")]
pub struct AmortizedListIter<'a, I: Iterator<Item = Option<ArrayBox>>> {
series_container: Pin<Box<Series>>,
inner: *mut ArrayRef,
inner: NonNull<ArrayRef>,
lifetime: PhantomData<&'a ArrayRef>,
iter: I,
}

impl<'a, I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'a, I> {
type Item = Option<UnsafeSeries<'a>>;
type Item = Option<UnstableSeries<'a>>;

fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|opt_val| {
opt_val.map(|array_ref| {
unsafe { *self.inner = array_ref.into() };
unsafe { *self.inner.as_mut() = array_ref.into() };
// Safety
// we cannot control the lifetime of an iterators `next` method.
// but as long as self is alive the reference to the series container is valid
let refer = &*self.series_container;
UnsafeSeries(unsafe { std::mem::transmute::<&Series, &'a Series>(refer) })
unsafe {
let s = std::mem::transmute::<&Series, &'a Series>(refer);
UnstableSeries::new_with_chunk(s, self.inner.as_ref())
}
})
})
}
Expand Down Expand Up @@ -88,7 +67,7 @@ impl ListChunked {

AmortizedListIter {
series_container,
inner: ptr,
inner: NonNull::new(ptr).unwrap(),
lifetime: PhantomData,
iter: self
.downcast_iter()
Expand All @@ -102,7 +81,7 @@ impl ListChunked {
#[cfg(feature = "private")]
pub fn apply_amortized<'a, F>(&'a self, mut f: F) -> Self
where
F: FnMut(UnsafeSeries<'a>) -> Series,
F: FnMut(UnstableSeries<'a>) -> Series,
{
if self.is_empty() {
return self.clone();
Expand Down Expand Up @@ -130,7 +109,7 @@ impl ListChunked {

pub fn try_apply_amortized<'a, F>(&'a self, mut f: F) -> Result<Self>
where
F: FnMut(UnsafeSeries<'a>) -> Result<Series>,
F: FnMut(UnstableSeries<'a>) -> Result<Series>,
{
if self.is_empty() {
return Ok(self.clone());
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod into;
pub(crate) mod iterator;
pub mod ops;
mod series_trait;
#[cfg(feature = "private")]
pub mod unstable;

use crate::chunked_array::ops::rolling_window::RollingOptions;
#[cfg(feature = "rank")]
Expand Down
58 changes: 58 additions & 0 deletions polars/polars-core/src/series/unstable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::prelude::*;
use std::convert::TryFrom;
use std::ptr::NonNull;

/// A wrapper type that should make it a bit more clear that we should not clone Series
#[derive(Debug)]
#[cfg(feature = "private")]
pub struct UnstableSeries<'a> {
// A series containing a single chunk ArrayRef
// the ArrayRef will be replaced by amortized_iter
// use with caution!
container: &'a Series,
// the ptr to the inner chunk, this saves some ptr chasing
inner: NonNull<ArrayRef>,
}

/// We don't implement Deref so that the caller is aware of converting to Series
impl AsRef<Series> for UnstableSeries<'_> {
fn as_ref(&self) -> &Series {
self.container
}
}

pub type ArrayBox = Box<dyn Array>;

impl<'a> UnstableSeries<'a> {
pub fn new(series: &'a Series) -> Self {
let inner_chunk = &series.chunks()[0];
UnstableSeries {
container: series,
inner: NonNull::new(inner_chunk as *const ArrayRef as *mut ArrayRef).unwrap(),
}
}

/// Creates a new `[UnsafeSeries]`
/// # Panics
/// panics if `inner_chunk` is not from `Series`.
pub fn new_with_chunk(series: &'a Series, inner_chunk: &ArrayRef) -> Self {
assert_eq!(series.chunks()[0].as_ref(), inner_chunk.as_ref());
UnstableSeries {
container: series,
inner: NonNull::new(inner_chunk as *const ArrayRef as *mut ArrayRef).unwrap(),
}
}

pub fn clone(&self) {
panic!("don't clone this type, use deep_clone")
}

pub fn deep_clone(&self) -> Series {
let array_ref = self.container.chunks()[0].clone();
Series::try_from((self.container.name(), array_ref)).unwrap()
}

pub fn swap(&mut self, array: ArrayRef) {
unsafe { *self.inner.as_mut() = array };
}
}
29 changes: 10 additions & 19 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::physical_plan::state::ExecutionState;
use crate::physical_plan::PhysicalAggregation;
use crate::prelude::*;
use polars_arrow::arrow::array::ArrayRef;
use polars_core::frame::groupby::GroupTuples;
use polars_core::series::unstable::UnstableSeries;
use polars_core::{prelude::*, POOL};
use std::convert::TryFrom;
use std::sync::Arc;
Expand Down Expand Up @@ -109,12 +109,7 @@ impl PhysicalExpr for BinaryExpr {
// so we can swap the ArrayRef during the hot loop
// this prevents a series Arc alloc and a vec alloc per iteration
let dummy = Series::try_from(("dummy", vec![arr_l.clone()])).unwrap();
let chunks = unsafe {
let chunks = dummy.chunks();
let ptr = chunks.as_ptr() as *mut ArrayRef;
let len = chunks.len();
std::slice::from_raw_parts_mut(ptr, len)
};
let mut us = UnstableSeries::new(&dummy);

// this is now a list
let r = ac_r.aggregated();
Expand All @@ -131,9 +126,10 @@ impl PhysicalExpr for BinaryExpr {

// Safety:
// we are in bounds
let mut arr = unsafe { Arc::from(arr_l.slice_unchecked(idx, 1)) };
std::mem::swap(&mut chunks[0], &mut arr);
let l = &dummy;
let arr = unsafe { Arc::from(arr_l.slice_unchecked(idx, 1)) };
us.swap(arr);

let l = us.as_ref();

apply_operator(l, r, self.op)
})
Expand All @@ -160,12 +156,7 @@ impl PhysicalExpr for BinaryExpr {
// so we can swap the ArrayRef during the hot loop
// this prevents a series Arc alloc and a vec alloc per iteration
let dummy = Series::try_from(("dummy", vec![arr_r.clone()])).unwrap();
let chunks = unsafe {
let chunks = dummy.chunks();
let ptr = chunks.as_ptr() as *mut ArrayRef;
let len = chunks.len();
std::slice::from_raw_parts_mut(ptr, len)
};
let mut us = UnstableSeries::new(&dummy);

let mut ca: ListChunked = l
.amortized_iter()
Expand All @@ -177,9 +168,9 @@ impl PhysicalExpr for BinaryExpr {
// TODO: optimize this? Its slow.
// Safety:
// we are in bounds
let mut arr = unsafe { Arc::from(arr_r.slice_unchecked(idx, 1)) };
std::mem::swap(&mut chunks[0], &mut arr);
let r = &dummy;
let arr = unsafe { Arc::from(arr_r.slice_unchecked(idx, 1)) };
us.swap(arr);
let r = us.as_ref();

apply_operator(l, r, self.op)
})
Expand Down
31 changes: 10 additions & 21 deletions polars/polars-lazy/src/physical_plan/expressions/ternary.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_arrow::arrow::array::ArrayRef;
use polars_core::frame::groupby::GroupTuples;
use polars_core::prelude::*;
use polars_core::series::unstable::UnstableSeries;
use polars_core::POOL;
use std::convert::TryFrom;
use std::sync::Arc;
Expand Down Expand Up @@ -75,12 +75,7 @@ The predicate produced {} values. Where the original DataFrame has {} values",
// so we can swap the ArrayRef during the hot loop
// this prevents a series Arc alloc and a vec alloc per iteration
let dummy = Series::try_from(("dummy", vec![arr_truthy.clone()])).unwrap();
let chunks = unsafe {
let chunks = dummy.chunks();
let ptr = chunks.as_ptr() as *mut ArrayRef;
let len = chunks.len();
std::slice::from_raw_parts_mut(ptr, len)
};
let mut us = UnstableSeries::new(&dummy);

// this is now a list
let falsy = ac_falsy.aggregated();
Expand Down Expand Up @@ -109,10 +104,9 @@ The predicate produced {} values. Where the original DataFrame has {} values",

// Safety:
// we are in bounds
let mut arr =
unsafe { Arc::from(arr_truthy.slice_unchecked(idx, 1)) };
std::mem::swap(&mut chunks[0], &mut arr);
let truthy = &dummy;
let arr = unsafe { Arc::from(arr_truthy.slice_unchecked(idx, 1)) };
us.swap(arr);
let truthy = us.as_ref();

Some(truthy.zip_with(mask, falsy))
}
Expand Down Expand Up @@ -142,12 +136,8 @@ The predicate produced {} values. Where the original DataFrame has {} values",
// so we can swap the ArrayRef during the hot loop
// this prevents a series Arc alloc and a vec alloc per iteration
let dummy = Series::try_from(("dummy", vec![arr_falsy.clone()])).unwrap();
let chunks = unsafe {
let chunks = dummy.chunks();
let ptr = chunks.as_ptr() as *mut ArrayRef;
let len = chunks.len();
std::slice::from_raw_parts_mut(ptr, len)
};
let mut us = UnstableSeries::new(&dummy);

let mask = ac_mask.aggregated();
let mask = mask.as_ref();
let mask = mask.list()?;
Expand All @@ -170,10 +160,9 @@ The predicate produced {} values. Where the original DataFrame has {} values",

// Safety:
// we are in bounds
let mut arr =
unsafe { Arc::from(arr_falsy.slice_unchecked(idx, 1)) };
std::mem::swap(&mut chunks[0], &mut arr);
let falsy = &dummy;
let arr = unsafe { Arc::from(arr_falsy.slice_unchecked(idx, 1)) };
us.swap(arr);
let falsy = us.as_ref();

Some(truthy.zip_with(mask, falsy))
}
Expand Down

0 comments on commit 63fde06

Please sign in to comment.