Skip to content

Commit

Permalink
setup union median
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 28, 2023
1 parent 62a6e7f commit 213af31
Showing 1 changed file with 124 additions and 16 deletions.
140 changes: 124 additions & 16 deletions crates/polars-arrow/src/legacy/kernels/rolling/median_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl<'a, T: IsFloat + PartialOrd + NativeType> Block<'a, T> {
b
}

fn capacity(&self) -> usize {
self.alpha.len()
}

fn init_links(&mut self) {
let mut p = self.tail;

Expand Down Expand Up @@ -166,6 +170,7 @@ impl<'a, T: IsFloat + PartialOrd + NativeType> Block<'a, T> {

fn delete(&mut self, i: usize) {
let delete = self.get_pair(i);

let current = self.get_pair(self.m);

// delete from links
Expand All @@ -188,8 +193,15 @@ impl<'a, T: IsFloat + PartialOrd + NativeType> Block<'a, T> {
Ordering::Equal => {
// 1, 2, [3], 4, 5
// 1, 2, [4], 5
// go to next position because hte link was deleted
self.m = self.next[self.m as usize] as usize;
// go to next position because the link was deleted
if self.n_element > self.current_index {
self.m = self.next[self.m as usize] as usize;
} else {
// move to previous position because the link was deleted
// 1, [2],
// [1]
self.m = self.prev[self.m as usize] as usize
}
},
};
}
Expand Down Expand Up @@ -222,10 +234,10 @@ impl<'a, T: IsFloat + PartialOrd + NativeType> Block<'a, T> {
// index position remains unaffected
},
Ordering::Equal => {
// 1, 2, [4], 5
// 1, 2, 4, 5
// 1, 2, [3], 4, 5
// go to prev position because hte link was added
self.m = self.prev[self.m as usize] as usize;
// self.m = self.prev[self.m as usize] as usize;
},
};
}
Expand Down Expand Up @@ -286,6 +298,41 @@ impl<T: IsFloat + PartialOrd + NativeType> LenGet for &mut Block<'_, T> {
}
}

struct BlockUnion<'a, T: IsFloat + PartialOrd + NativeType> {
block_left: &'a mut Block<'a, T>,
block_right: &'a mut Block<'a, T>,
}

impl<'a, T: IsFloat + PartialOrd + NativeType> BlockUnion<'a, T> {
fn new(block_left: &'a mut Block<'a, T>, block_right: &'a mut Block<'a, T> ) -> Self {
Self {
block_left,
block_right
}
}
}

impl<T: IsFloat + PartialOrd + NativeType> LenGet for BlockUnion<'_, T> {
type Item = T;

fn len(&self) -> usize {
self.block_left.n_element + self.block_right.n_element
}

fn get(&mut self, i: usize) -> Self::Item {
let i= if i < self.block_left.n_element {
self.block_left.traverse_to_index(i);
return self.block_left.peek().unwrap()
} else if self.block_left.n_element == 0 {
i
} else{
i % self.block_left.n_element
};
self.block_right.traverse_to_index(i);
self.block_right.peek().unwrap()
}
}

struct MedianUpdate<M: LenGet> {
inner: M
}
Expand Down Expand Up @@ -333,26 +380,80 @@ where
+ Add<Output=T>
,
{
let mut scratch_a = vec![];
let mut prev_a = vec![];
let mut next_a = vec![];
let mut scratch_left = vec![];
let mut prev_left = vec![];
let mut next_left = vec![];

let mut scratch_b = vec![];
let mut prev_b = vec![];
let mut next_b = vec![];
let mut scratch_right = vec![];
let mut prev_right = vec![];
let mut next_right = vec![];

let k = std::cmp::min(k, slice.len());
let alpha = &slice[..k];

// let mut out = Vec::with_capacity(slice.len());
let mut block_left = Block::new(alpha, &mut scratch_a, &mut prev_a, &mut next_a);
let mut block_right = Block::new(&alpha[..1], &mut scratch_b, &mut prev_b, &mut next_b);
let mut out = Vec::with_capacity(slice.len());

let scratch_right_ptr = &mut scratch_right as *mut Vec<u8>;
let scratch_left_ptr = &mut scratch_left as *mut Vec<u8>;
let prev_right_ptr = &mut prev_right as *mut Vec<_>;
let prev_left_ptr = &mut prev_left as *mut Vec<_>;
let next_right_ptr = &mut next_right as *mut Vec<_>;
let next_left_ptr = &mut next_left as *mut Vec<_>;

let n_blocks = slice.len() / k;

let mut block_left = unsafe { Block::new(alpha, &mut *scratch_left_ptr, &mut *prev_left_ptr, &mut *next_left_ptr) };
let mut block_right = unsafe { Block::new(&alpha[..1], &mut *scratch_right_ptr, &mut *prev_right_ptr, &mut *next_right_ptr) };

let ptr_left = &mut block_left as *mut Block<'_, _>;
let ptr_right = &mut block_right as *mut Block<'_, _>;

block_left.unwind();
let mut mu = MedianUpdate::new(&mut block_left);
dbg!(mu.median());

for i in 0..block_left.capacity() {
block_left.undelete(i);

let mut mu = MedianUpdate::new(&mut block_left);
out.push(mu.median());
}
for i in 1..n_blocks {
// Block left is now completely full as it is completely filled coming from the boundary effects.
debug_assert!(block_left.n_element == k);

// Windows state at this point.
//
// - BLOCK_LEFT -- BLOCK_RIGHT -
// |-------------||-------------|
// - WINDOW -
// |--------------|
let alpha = &slice[i * k..(i + 1) *k];
block_right = unsafe { Block::new(alpha, &mut *scratch_right_ptr, &mut *prev_right_ptr, &mut *next_right_ptr) };

// Time reverse the rhs so we can undelete in sorted order.
block_right.unwind();

// Here the window will move from BLOCK_LEFT into BLOCK_RIGHT
for j in 0..block_right.capacity() {
dbg!(j);
block_left.delete(j);
block_right.undelete(j);
dbg!(&block_left);
dbg!(&block_right);

unsafe {
let union = BlockUnion::new(&mut *ptr_left, &mut *ptr_right);
out.push(dbg!(MedianUpdate::new(union).median()))

}
}
unsafe {
std::ptr::swap_nonoverlapping(ptr_left, ptr_right, 1);
std::ptr::swap_nonoverlapping(scratch_left_ptr, scratch_right_ptr, 1);
std::ptr::swap_nonoverlapping(prev_left_ptr, prev_right_ptr, 1);
std::ptr::swap_nonoverlapping(next_left_ptr, next_right_ptr, 1);
}
}
dbg!(&out);
// mu.

// let mut block_b = Block::new(h, alpha, &mut scratch_b, &mut prev_b, &mut next_b);
Expand Down Expand Up @@ -460,9 +561,16 @@ mod test {

#[test]
fn test_median() {
let values = [2, 8, 5, 9, 1, 3, 4, 10];
let values = [2.0, 8.0, 5.0, 9.0, 1.0, 2.0, 4.0, 2.0];
let k = 3;

// block 1
// i: 2, 8, 5
// s: 2, 5, 8
// block 2
// i: 5, 9, 1
// s: 1, 5, 9

rolling_median(k, &values);
}
}

0 comments on commit 213af31

Please sign in to comment.