Skip to content

Commit

Permalink
fix and improve rolling_skew (#4232)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 3, 2022
1 parent c745df1 commit ac916d2
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 24 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ list = ["polars-lazy/list", "polars-ops/list"]
rank = ["polars-core/rank", "polars-lazy/rank"]
diff = ["polars-core/diff", "polars-lazy/diff", "polars-ops/diff"]
pct_change = ["polars-core/pct_change", "polars-lazy/pct_change"]
moment = ["polars-core/moment", "polars-lazy/moment"]
moment = ["polars-core/moment", "polars-lazy/moment", "polars-ops/moment"]
arange = ["polars-lazy/arange"]
true_div = ["polars-lazy/true_div"]
diagonal_concat = ["polars-core/diagonal_concat"]
Expand Down
10 changes: 10 additions & 0 deletions polars/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,20 @@ impl<T: PolarsDataType> ChunkedArray<T> {
}

/// A reference to the chunks
#[inline]
pub fn chunks(&self) -> &Vec<ArrayRef> {
&self.chunks
}

/// A mutable reference to the chunks
///
/// # Safety
/// The caller must ensure to not change the `DataType` or `length` of any of the chunks.
#[inline]
pub unsafe fn chunks_mut(&mut self) -> &mut Vec<ArrayRef> {
&mut self.chunks
}

/// Returns true if contains a single chunk and has no null values
pub fn is_optimal_aligned(&self) -> bool {
self.chunks.len() == 1 && !self.has_validity()
Expand Down
18 changes: 12 additions & 6 deletions polars/polars-core/src/chunked_array/ops/rolling_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,21 @@ mod inner_mod {
T::Native: Float + IsFloat + SubAssign + num::pow::Pow<T::Native, Output = T::Native>,
{
/// Apply a rolling custom function. This is pretty slow because of dynamic dispatch.
pub fn rolling_apply_float<F>(&self, window_size: usize, f: F) -> Result<Self>
pub fn rolling_apply_float<F>(&self, window_size: usize, mut f: F) -> Result<Self>
where
F: Fn(&ChunkedArray<T>) -> Option<T::Native>,
F: FnMut(&mut ChunkedArray<T>) -> Option<T::Native>,
{
if window_size >= self.len() {
return Ok(Self::full_null(self.name(), self.len()));
}
let ca = self.rechunk();
let arr = ca.downcast_iter().next().unwrap();

let arr_container = ChunkedArray::<T>::from_slice("", &[T::Native::zero()]);
let array_ptr = &arr_container.chunks()[0];
// we create a temporary dummy ChunkedArray
// this will be a container where we swap the window contents every iteration
// doing so will save a lot of heap allocations.
let mut heap_container = ChunkedArray::<T>::from_slice("", &[T::Native::zero()]);
let array_ptr = &heap_container.chunks()[0];
let ptr = array_ptr.as_ref() as *const dyn Array as *mut dyn Array
as *mut PrimitiveArray<T::Native>;

Expand All @@ -207,7 +210,10 @@ mod inner_mod {
values.extend(std::iter::repeat(T::Native::default()).take(window_size - 1));

for offset in 0..self.len() + 1 - window_size {
let arr_window = arr.slice(offset, window_size);
debug_assert!(offset + window_size <= arr.len());
let arr_window = unsafe { arr.slice_unchecked(offset, window_size) };
// the lengths are cached, so we must update them
heap_container.length = arr_window.len() as IdxSize;

// Safety.
// ptr is not dropped as we are in scope
Expand All @@ -217,7 +223,7 @@ mod inner_mod {
*ptr = arr_window;
}

let out = f(&arr_container);
let out = f(&mut heap_container);
match out {
Some(v) => unsafe { values.push_unchecked(v) },
None => unsafe { unset_bit_raw(validity_ptr, offset + window_size - 1) },
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/series/ops/moment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Series {
/// Compute the sample skewness of a data set.
///
/// For normally distributed data, the skewness should be about zero. For
/// unimodal continuous distributions, a skewness value greater than zero means
/// uni-modal continuous distributions, a skewness value greater than zero means
/// that there is more weight in the right tail of the distribution. The
/// function `skewtest` can be used to determine if the skewness value
/// is close enough to zero, statistically speaking.
Expand Down
23 changes: 21 additions & 2 deletions polars/polars-core/src/series/unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub type ArrayBox = Box<dyn Array>;

impl<'a> UnstableSeries<'a> {
pub fn new(series: &'a mut Series) -> Self {
debug_assert_eq!(series.chunks().len(), 1);
let container = series as *mut Series;
let inner_chunk = series.array_ref(0);
UnstableSeries {
Expand All @@ -59,7 +60,25 @@ impl<'a> UnstableSeries<'a> {
Series::try_from((name, array_ref)).unwrap()
}

pub fn swap(&mut self, array: ArrayRef) {
unsafe { *self.inner.as_mut() = array };
#[inline]
/// Swaps inner state with the `array`. Prefer `UnstableSeries::with_array` as this
/// restores the state.
pub fn swap(&mut self, array: &mut ArrayRef) {
unsafe { std::mem::swap(self.inner.as_mut(), array) }
// ensure lengths are correct.
self.as_mut()._get_inner_mut().compute_len();
}

/// Temporary swaps out the array, and restores the original state
/// when application of the function `f` is done.
#[inline]
pub fn with_array<F, T>(&mut self, array: &mut ArrayRef, f: F) -> T
where
F: Fn(&UnstableSeries) -> T,
{
self.swap(array);
let out = f(self);
self.swap(array);
out
}
}
1 change: 1 addition & 0 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::ops::{Deref, DerefMut};

#[cfg(feature = "private")]
pub use crate::chunked_array::ops::sort::argsort_no_nulls;
pub use series::*;

#[repr(transparent)]
pub struct Wrap<T>(pub T);
Expand Down
13 changes: 13 additions & 0 deletions polars/polars-core/src/utils/series.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::prelude::*;
use crate::series::unstable::UnstableSeries;

/// Transform to physical type and coerce floating point and similar sized integer to a bit representation
/// to reduce compiler bloat
Expand All @@ -16,3 +17,15 @@ pub(crate) fn to_physical_and_bit_repr(s: &[Series]) -> Vec<Series> {
})
.collect()
}

/// A utility that allocates an `UnstableSeries`. The applied function can then use that
/// series container to save heap allocations and swap arrow arrays.
pub fn with_unstable_series<F, T>(dtype: &DataType, f: F) -> T
where
F: Fn(&mut UnstableSeries) -> T,
{
let mut container = Series::full_null("", 0, dtype);
let mut us = UnstableSeries::new(&mut container);

f(&mut us)
}
2 changes: 1 addition & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ arange = []
mode = ["polars-core/mode"]
cum_agg = ["polars-core/cum_agg"]
interpolate = ["polars-core/interpolate"]
rolling_window = ["polars-core/rolling_window", "polars-time/rolling_window"]
rolling_window = ["polars-core/rolling_window", "polars-time/rolling_window", "polars-ops/rolling_window"]
rank = ["polars-core/rank"]
diff = ["polars-core/diff", "polars-ops/diff"]
pct_change = ["polars-core/pct_change"]
Expand Down
14 changes: 14 additions & 0 deletions polars/polars-lazy/src/dsl/function_expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod fill_null;
mod is_in;
mod list;
mod pow;
#[cfg(all(feature = "rolling_window", feature = "moment"))]
mod rolling;
#[cfg(feature = "row_hash")]
mod row_hash;
#[cfg(feature = "sign")]
Expand Down Expand Up @@ -52,6 +54,12 @@ pub enum FunctionExpr {
},
#[cfg(feature = "is_in")]
ListContains,
#[cfg(all(feature = "rolling_window", feature = "moment"))]
// if we add more, make a sub enum
RollingSkew {
window_size: usize,
bias: bool,
},
}

#[cfg(feature = "trigonometry")]
Expand Down Expand Up @@ -117,6 +125,8 @@ impl FunctionExpr {
FillNull { super_type, .. } => with_dtype(super_type.clone()),
#[cfg(feature = "is_in")]
ListContains => with_dtype(DataType::Boolean),
#[cfg(all(feature = "rolling_window", feature = "moment"))]
RollingSkew { .. } => float_dtype(),
}
}
}
Expand Down Expand Up @@ -233,6 +243,10 @@ impl From<FunctionExpr> for SpecialEq<Arc<dyn SeriesUdf>> {
ListContains => {
wrap!(list::contains)
}
#[cfg(all(feature = "rolling_window", feature = "moment"))]
RollingSkew { window_size, bias } => {
map_with_args!(rolling::rolling_skew, window_size, bias)
}
}
}
}
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/dsl/function_expr/rolling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use super::*;

pub(super) fn rolling_skew(s: &Series, window_size: usize, bias: bool) -> Result<Series> {
s.rolling_skew(window_size, bias)
}
22 changes: 21 additions & 1 deletion polars/polars-lazy/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1765,6 +1765,17 @@ impl Expr {
)
}

/// Apply a rolling skew
#[cfg_attr(docsrs, doc(cfg(all(feature = "rolling_window", feature = "moment"))))]
#[cfg(feature = "rolling_window")]
#[cfg(feature = "moment")]
pub fn rolling_skew(self, window_size: usize, bias: bool) -> Expr {
self.apply_private(
FunctionExpr::RollingSkew { window_size, bias },
"rolling_skew",
)
}

#[cfg_attr(docsrs, doc(cfg(feature = "rolling_window")))]
#[cfg(feature = "rolling_window")]
/// Apply a custom function over a rolling/ moving window of the array.
Expand All @@ -1789,7 +1800,7 @@ impl Expr {
/// This has quite some dynamic dispatch, so prefer rolling_min, max, mean, sum over this.
pub fn rolling_apply_float<F>(self, window_size: usize, f: F) -> Expr
where
F: 'static + Fn(&Float64Chunked) -> Option<f64> + Send + Sync + Copy,
F: 'static + FnMut(&mut Float64Chunked) -> Option<f64> + Send + Sync + Copy,
{
self.apply(
move |s| {
Expand Down Expand Up @@ -1860,6 +1871,15 @@ impl Expr {

#[cfg(feature = "moment")]
#[cfg_attr(docsrs, doc(cfg(feature = "moment")))]
/// Compute the sample skewness of a data set.
///
/// For normally distributed data, the skewness should be about zero. For
/// uni-modal continuous distributions, a skewness value greater than zero means
/// that there is more weight in the right tail of the distribution. The
/// function `skewtest` can be used to determine if the skewness value
/// is close enough to zero, statistically speaking.
///
/// see: https://github.com/scipy/scipy/blob/47bb6febaa10658c72962b9615d5d5aa2513fa3a/scipy/stats/stats.py#L1024
pub fn skew(self, bias: bool) -> Expr {
self.apply(
move |s| s.skew(bias).map(|opt_v| Series::new(s.name(), &[opt_v])),
Expand Down
12 changes: 4 additions & 8 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,8 @@ impl PhysicalExpr for BinaryExpr {

// Safety:
// we are in bounds
let arr = unsafe { arr_l.slice_unchecked(idx, 1) };
us.swap(arr);
// ensure the length is correct
us.as_mut()._get_inner_mut().compute_len();
let mut arr = unsafe { arr_l.slice_unchecked(idx, 1) };
us.swap(&mut arr);

let l = us.as_ref();

Expand Down Expand Up @@ -272,10 +270,8 @@ impl PhysicalExpr for BinaryExpr {
// TODO: optimize this? Its slow.
// Safety:
// we are in bounds
let arr = unsafe { arr_r.slice_unchecked(idx, 1) };
us.swap(arr);
// ensure the length is correct
us.as_mut()._get_inner_mut().compute_len();
let mut arr = unsafe { arr_r.slice_unchecked(idx, 1) };
us.swap(&mut arr);
let r = us.as_ref();

apply_operator(l, r, self.op)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,9 @@ impl<'a> Iterator for FlatIter<'a> {
if self.len == self.offset {
None
} else {
let arr = unsafe { self.array.slice_unchecked(self.offset, 1) };
let mut arr = unsafe { self.array.slice_unchecked(self.offset, 1) };
self.offset += 1;
self.item.swap(arr);
// ensure lengths are correct.
self.series_container._get_inner_mut().compute_len();
self.item.swap(&mut arr);
Some(Some(self.item))
}
}
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-ops/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ strings = ["polars-core/strings"]
string_justify = ["polars-core/strings"]
log = []
hash = []
rolling_window = ["polars-core/rolling_window"]
moment = ["polars-core/moment"]
5 changes: 5 additions & 0 deletions polars/polars-ops/src/series/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
#[cfg(feature = "log")]
mod log;
#[cfg(feature = "rolling_window")]
mod rolling;
mod various;

#[cfg(feature = "log")]
pub use log::*;
use polars_core::prelude::*;

#[cfg(feature = "rolling_window")]
pub use rolling::*;
pub use various::*;

pub trait SeriesSealed {
Expand Down
58 changes: 58 additions & 0 deletions polars/polars-ops/src/series/ops/rolling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::series::ops::SeriesSealed;
use polars_core::export::num;
use polars_core::export::num::{Float, FromPrimitive};
use polars_core::prelude::*;
use polars_core::utils::with_unstable_series;
use std::ops::SubAssign;

#[cfg(feature = "moment")]
fn rolling_skew<T>(ca: &ChunkedArray<T>, window_size: usize, bias: bool) -> Result<ChunkedArray<T>>
where
ChunkedArray<T>: IntoSeries,
T: PolarsFloatType,
T::Native: Float + IsFloat + SubAssign + num::pow::Pow<T::Native, Output = T::Native>,
{
with_unstable_series(ca.dtype(), |us| {
ca.rolling_apply_float(window_size, |arr| {
let arr = unsafe { arr.chunks_mut().get_mut(0).unwrap() };

us.with_array(arr, |us| {
us.as_ref()
.skew(bias)
.unwrap()
.map(|flt| T::Native::from_f64(flt).unwrap())
})
})
})
}

pub trait RollingSeries: SeriesSealed {
#[cfg(feature = "moment")]
fn rolling_skew(&self, window_size: usize, bias: bool) -> Result<Series> {
let s = self.as_series();

match s.dtype() {
DataType::Float64 => {
let ca = s.f64().unwrap();
rolling_skew(ca, window_size, bias).map(|ca| ca.into_series())
}
DataType::Float32 => {
let ca = s.f32().unwrap();
rolling_skew(ca, window_size, bias).map(|ca| ca.into_series())
}
dt if dt.is_numeric() => {
let s = s.cast(&DataType::Float64).unwrap();
s.rolling_skew(window_size, bias)
}
dt => Err(PolarsError::ComputeError(
format!(
"cannot use rolling_skew function on Series of dtype: {:?}",
dt
)
.into(),
)),
}
}
}

impl RollingSeries for Series {}
23 changes: 23 additions & 0 deletions py-polars/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,26 @@ def test_rolling_kernels_and_groupby_rolling() -> None:
]
)
pl.testing.assert_frame_equal(out1, out2)


def test_rolling_skew() -> None:
s = pl.Series([1, 2, 3, 3, 2, 10, 8])
assert s.rolling_skew(window_size=4, bias=True).to_list() == [
None,
None,
None,
-0.49338220021815865,
0.0,
1.097025449363867,
0.09770939201338157,
]

assert s.rolling_skew(window_size=4, bias=False).to_list() == [
None,
None,
None,
-0.8545630383279711,
0.0,
1.9001038154942962,
0.16923763134384154,
]

0 comments on commit ac916d2

Please sign in to comment.