Skip to content

Commit

Permalink
fill_null limits (#3559)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 2, 2022
1 parent ff4a318 commit a98438b
Show file tree
Hide file tree
Showing 21 changed files with 350 additions and 93 deletions.
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl<T> ChunkedArray<T> {
}

/// Set the 'sorted' bit meta info.
pub(crate) fn set_sorted(&mut self, reverse: bool) {
pub fn set_sorted(&mut self, reverse: bool) {
if reverse {
self.bit_settings |= 1 << 1
} else {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/ops/chunkops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl ChunkOps for Utf8Chunked {
)
.unwrap()
.into()];
ChunkedArray::from_chunks(self.name(), chunks)
self.copy_with_chunks(chunks)
}
}
#[inline]
Expand Down
177 changes: 163 additions & 14 deletions polars/polars-core/src/chunked_array/ops/fill_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,105 @@ use polars_arrow::kernels::set::set_at_nulls;
use polars_arrow::utils::CustomIterTools;
use std::ops::Add;

fn fill_forward_limit<T>(ca: &ChunkedArray<T>, limit: IdxSize) -> ChunkedArray<T>
where
T: PolarsNumericType,
{
let mut cnt = 0;
let mut previous = None;
ca.into_iter()
.map(|opt_v| match opt_v {
Some(v) => {
cnt = 0;
previous = Some(v);
Some(v)
}
None => {
if cnt < limit {
cnt += 1;
previous
} else {
None
}
}
})
.collect_trusted()
}

fn fill_backward_limit<T>(ca: &ChunkedArray<T>, limit: IdxSize) -> ChunkedArray<T>
where
T: PolarsNumericType,
{
let mut cnt = 0;
let mut previous = None;
ca.into_iter()
.rev()
.map(|opt_v| match opt_v {
Some(v) => {
cnt = 0;
previous = Some(v);
Some(v)
}
None => {
if cnt < limit {
cnt += 1;
previous
} else {
None
}
}
})
.collect_reversed()
}

fn fill_backward_limit_bool(ca: &BooleanChunked, limit: IdxSize) -> BooleanChunked {
let mut cnt = 0;
let mut previous = None;
ca.into_iter()
.rev()
.map(|opt_v| match opt_v {
Some(v) => {
cnt = 0;
previous = Some(v);
Some(v)
}
None => {
if cnt < limit {
cnt += 1;
previous
} else {
None
}
}
})
.collect_reversed()
}

fn fill_backward_limit_utf8(ca: &Utf8Chunked, limit: IdxSize) -> Utf8Chunked {
let mut cnt = 0;
let mut previous = None;
let out: Utf8Chunked = ca
.into_iter()
.rev()
.map(|opt_v| match opt_v {
Some(v) => {
cnt = 0;
previous = Some(v);
Some(v)
}
None => {
if cnt < limit {
cnt += 1;
previous
} else {
None
}
}
})
.collect_trusted();
out.into_iter().rev().collect_trusted()
}

fn fill_forward<T>(ca: &ChunkedArray<T>) -> ChunkedArray<T>
where
T: PolarsNumericType,
Expand Down Expand Up @@ -35,6 +134,30 @@ macro_rules! impl_fill_forward {
}};
}

macro_rules! impl_fill_forward_limit {
($ca:ident, $limit:expr) => {{
let mut cnt = 0;
let mut previous = None;
$ca.into_iter()
.map(|opt_v| match opt_v {
Some(v) => {
cnt = 0;
previous = Some(v);
Some(v)
}
None => {
if cnt < $limit {
cnt += 1;
previous
} else {
None
}
}
})
.collect_trusted()
}};
}

fn fill_backward<T>(ca: &ChunkedArray<T>) -> ChunkedArray<T>
where
T: PolarsNumericType,
Expand All @@ -51,6 +174,19 @@ where
.collect_reversed()
}

fn fill_backward_bool(ca: &BooleanChunked) -> BooleanChunked {
ca.into_iter()
.rev()
.scan(None, |previous, opt_v| match opt_v {
Some(value) => {
*previous = Some(value);
Some(Some(value))
}
None => Some(*previous),
})
.collect_reversed()
}

macro_rules! impl_fill_backward {
($ca:ident, $ChunkedArray:ty) => {{
let ca: $ChunkedArray = $ca
Expand Down Expand Up @@ -81,8 +217,10 @@ where
return Ok(self.clone());
}
let mut ca = match strategy {
FillNullStrategy::Forward => fill_forward(self),
FillNullStrategy::Backward => fill_backward(self),
FillNullStrategy::Forward(None) => fill_forward(self),
FillNullStrategy::Forward(Some(limit)) => fill_forward_limit(self, limit),
FillNullStrategy::Backward(None) => fill_backward(self),
FillNullStrategy::Backward(Some(limit)) => fill_backward_limit(self, limit),
FillNullStrategy::Min => {
self.fill_null_with_values(self.min().ok_or_else(|| {
PolarsError::ComputeError("Could not determine fill value".into())
Expand Down Expand Up @@ -126,14 +264,19 @@ impl ChunkFillNull for BooleanChunked {
return Ok(self.clone());
}
match strategy {
FillNullStrategy::Forward => {
let mut out: Self = impl_fill_forward!(self);
FillNullStrategy::Forward(limit) => {
let mut out: Self = match limit {
Some(limit) => impl_fill_forward_limit!(self, limit),
None => impl_fill_forward!(self),
};
out.rename(self.name());
Ok(out)
}
FillNullStrategy::Backward => {
// TODO: still a double scan. impl collect_reversed for boolean
let mut out: Self = impl_fill_backward!(self, BooleanChunked);
FillNullStrategy::Backward(limit) => {
let mut out: Self = match limit {
None => fill_backward_bool(self),
Some(limit) => fill_backward_limit_bool(self, limit),
};
out.rename(self.name());
Ok(out)
}
Expand Down Expand Up @@ -171,13 +314,19 @@ impl ChunkFillNull for Utf8Chunked {
return Ok(self.clone());
}
match strategy {
FillNullStrategy::Forward => {
let mut out: Self = impl_fill_forward!(self);
FillNullStrategy::Forward(limit) => {
let mut out: Self = match limit {
Some(limit) => impl_fill_forward_limit!(self, limit),
None => impl_fill_forward!(self),
};
out.rename(self.name());
Ok(out)
}
FillNullStrategy::Backward => {
let mut out: Self = impl_fill_backward!(self, Utf8Chunked);
FillNullStrategy::Backward(limit) => {
let mut out = match limit {
None => impl_fill_backward!(self, Utf8Chunked),
Some(limit) => fill_backward_limit_utf8(self, limit),
};
out.rename(self.name());
Ok(out)
}
Expand Down Expand Up @@ -234,14 +383,14 @@ mod test {
#[test]
fn test_fill_null() {
let ca = Int32Chunked::new("a", &[None, Some(2), Some(3), None, Some(4), None]);
let filled = ca.fill_null(FillNullStrategy::Forward).unwrap();
let filled = ca.fill_null(FillNullStrategy::Forward(None)).unwrap();
assert_eq!(filled.name(), "a");

assert_eq!(
Vec::from(&filled),
&[None, Some(2), Some(3), Some(3), Some(4), Some(4)]
);
let filled = ca.fill_null(FillNullStrategy::Backward).unwrap();
let filled = ca.fill_null(FillNullStrategy::Backward(None)).unwrap();
assert_eq!(filled.name(), "a");
assert_eq!(
Vec::from(&filled),
Expand All @@ -266,7 +415,7 @@ mod test {
&[Some(3), Some(2), Some(3), Some(3), Some(4), Some(3)]
);
let ca = Int32Chunked::new("a", &[None, None, None, None, Some(4), None]);
let filled = ca.fill_null(FillNullStrategy::Backward).unwrap();
let filled = ca.fill_null(FillNullStrategy::Backward(None)).unwrap();
assert_eq!(filled.name(), "a");
assert_eq!(
Vec::from(&filled),
Expand Down
7 changes: 4 additions & 3 deletions polars/polars-core/src/chunked_array/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
.zip(filter.downcast_iter())
.map(|(left, mask)| filter_fn(left, mask).unwrap().into())
.collect::<Vec<_>>();
Ok(ChunkedArray::from_chunks(self.name(), chunks))
Ok(self.copy_with_chunks(chunks))
}
}

Expand All @@ -66,7 +66,7 @@ impl ChunkFilter<BooleanType> for BooleanChunked {
.zip(filter.downcast_iter())
.map(|(left, mask)| filter_fn(left, mask).unwrap().into())
.collect::<Vec<_>>();
Ok(ChunkedArray::from_chunks(self.name(), chunks))
Ok(self.copy_with_chunks(chunks))
}
}

Expand All @@ -87,7 +87,8 @@ impl ChunkFilter<Utf8Type> for Utf8Chunked {
.zip(filter.downcast_iter())
.map(|(left, mask)| filter_fn(left, mask).unwrap().into())
.collect::<Vec<_>>();
Ok(ChunkedArray::from_chunks(self.name(), chunks))

Ok(self.copy_with_chunks(chunks))
}
}

Expand Down
6 changes: 4 additions & 2 deletions polars/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,14 @@ pub trait ChunkSort<T> {
}
}

pub type FillNullLimit = Option<IdxSize>;

#[derive(Copy, Clone, Debug)]
pub enum FillNullStrategy {
/// previous value in array
Backward,
Backward(FillNullLimit),
/// next value in array
Forward,
Forward(FillNullLimit),
/// mean value of array
Mean,
/// minimal value in array
Expand Down
37 changes: 36 additions & 1 deletion polars/polars-core/src/chunked_array/trusted_len.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::chunked_array::upstream_traits::PolarsAsRef;
use crate::prelude::*;
use crate::utils::{CustomIterTools, FromTrustedLenIterator, NoNull};
use arrow::bitmap::MutableBitmap;
use polars_arrow::bit_util::unset_bit_raw;
use polars_arrow::bit_util::{set_bit_raw, unset_bit_raw};
use polars_arrow::trusted_len::{FromIteratorReversed, PushUnchecked};
use std::borrow::Borrow;

Expand Down Expand Up @@ -76,6 +76,41 @@ where
}
}

impl FromIteratorReversed<Option<bool>> for BooleanChunked {
fn from_trusted_len_iter_rev<I: TrustedLen<Item = Option<bool>>>(iter: I) -> Self {
let size = iter.size_hint().1.unwrap();

let vals = MutableBitmap::from_len_zeroed(size);
let mut validity = MutableBitmap::with_capacity(size);
validity.extend_constant(size, true);
let validity_ptr = validity.as_slice().as_ptr() as *mut u8;
let vals_ptr = vals.as_slice().as_ptr() as *mut u8;
unsafe {
let mut offset = size;

iter.for_each(|opt_item| {
offset -= 1;
match opt_item {
Some(item) => {
if item {
// set value
// validity bit is already true
set_bit_raw(vals_ptr, offset);
}
}
None => {
// unset validity bit
unset_bit_raw(validity_ptr, offset)
}
}
});
}
let arr =
BooleanArray::from_data(ArrowDataType::Boolean, vals.into(), Some(validity.into()));
ChunkedArray::from_chunks("", vec![Arc::new(arr)])
}
}

impl<T> FromIteratorReversed<T::Native> for NoNull<ChunkedArray<T>>
where
T: PolarsNumericType,
Expand Down
25 changes: 12 additions & 13 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,13 @@ impl DataFrame {
pub fn with_row_count(&self, name: &str, offset: Option<IdxSize>) -> Result<Self> {
let mut columns = Vec::with_capacity(self.columns.len() + 1);
let offset = offset.unwrap_or(0);
columns.push(
IdxCa::from_vec(
name,
(offset..(self.height() as IdxSize) + offset).collect(),
)
.into_series(),

let mut ca = IdxCa::from_vec(
name,
(offset..(self.height() as IdxSize) + offset).collect(),
);
ca.set_sorted(false);
columns.push(ca.into_series());

columns.extend_from_slice(&self.columns);
DataFrame::new(columns)
Expand All @@ -342,14 +342,13 @@ impl DataFrame {
/// Add a row count in place.
pub fn with_row_count_mut(&mut self, name: &str, offset: Option<IdxSize>) -> &mut Self {
let offset = offset.unwrap_or(0);
self.columns.insert(
0,
IdxCa::from_vec(
name,
(offset..(self.height() as IdxSize) + offset).collect(),
)
.into_series(),
let mut ca = IdxCa::from_vec(
name,
(offset..(self.height() as IdxSize) + offset).collect(),
);
ca.set_sorted(false);

self.columns.insert(0, ca.into_series());
self
}

Expand Down

0 comments on commit a98438b

Please sign in to comment.