Skip to content

Commit

Permalink
reduce probability of quadratic behavior in min/max rolling (#3516)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 27, 2022
1 parent c6999fc commit a9d2298
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 82 deletions.
253 changes: 172 additions & 81 deletions polars/polars-arrow/src/kernels/rolling/no_nulls/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,56 +24,100 @@ impl<'a, T: NativeType + IsFloat + PartialOrd> RollingAggWindowNoNulls<'a, T> fo
}

unsafe fn update(&mut self, start: usize, end: usize) -> T {
// if we exceed the end, we have a completely new window
// so we recompute
let recompute_min = if start >= self.last_end {
true
} else {
let mut recompute_min = false;

// remove elements that should leave the window
for idx in self.last_start..start {
// safety
// we are in bounds
let leaving_value = self.slice.get_unchecked(idx);

// if the leaving value is the
// max value, we need to recompute the max.
if matches!(
compare_fn_nan_min(leaving_value, &self.min),
Ordering::Equal
) {
recompute_min = true;
break;
}
}
recompute_min
};

self.last_start = start;

// we traverse all values and compute
if recompute_min {
// recompute min
if start >= self.last_end {
self.min = *self
.slice
.get_unchecked(start..end)
.iter()
.min_by(|a, b| compare_fn_nan_min(*a, *b))
.unwrap();
.unwrap_or(&self.slice[start]);

self.last_start = start;
self.last_end = end;

return self.min;
}
// the max has not left the window, so we only check
// if the entering values are larger
else if end > self.last_end {
let min_entering = self
.slice
.get_unchecked(self.last_end..end)
.iter()
.min_by(|a, b| compare_fn_nan_min(*a, *b))
.unwrap_unchecked();
if matches!(compare_fn_nan_min(min_entering, &self.min), Ordering::Less) {
self.min = *min_entering

let mut recompute_min = false;
// remove elements that should leave the window
for idx in self.last_start..start {
// safety
// we are in bounds
let leaving_value = self.slice.get_unchecked(idx);

// if the leaving value is the
// max value, we need to recompute the max.
if matches!(
compare_fn_nan_min(leaving_value, &self.min),
Ordering::Equal
) {
recompute_min = true;
break;
}
}

let entering_min = self
.slice
.get_unchecked(self.last_end..end)
.iter()
.min_by(|a, b| compare_fn_nan_min(*a, *b))
.unwrap_or(
&self.slice[std::cmp::max(self.last_start, self.last_end.saturating_sub(1))],
);

if recompute_min {
match compare_fn_nan_min(&self.min, entering_min) {
// do nothing
Ordering::Equal => {}
// leaving < entering
Ordering::Less => {
// leaving value could be the smallest, we might need to recompute

// just a random value in the window to prevent O(n^2) behavior
// that can occur when all values in the window are the same
let remaining_value1 = self.slice.get(start).unwrap_unchecked();
let remaining_value2 = self.slice.get(end.saturating_sub(1)).unwrap();

// we check those two value in the window, if they are equal to leaving, we know
// we don't need to traverse all to compote the window
if !matches!(
compare_fn_nan_min(remaining_value1, &self.min),
Ordering::Equal
) && !matches!(
compare_fn_nan_min(remaining_value2, &self.min),
Ordering::Equal
) {
// the minimum value int the window we did not yet compute
let min_in_between = self
.slice
.get_unchecked(start..self.last_end)
.iter()
.min_by(|a, b| compare_fn_nan_min(*a, *b))
.unwrap_or(&self.slice[start]);

if matches!(
compare_fn_nan_min(min_in_between, entering_min),
Ordering::Less
) {
self.min = *min_in_between
} else {
self.min = *entering_min
}
}
}
// leaving > entering
Ordering::Greater => {
if matches!(compare_fn_nan_min(entering_min, &self.min), Ordering::Less) {
self.min = *entering_min
}
}
}
} else if matches!(compare_fn_nan_min(entering_min, &self.min), Ordering::Less) {
self.min = *entering_min
}

self.last_start = start;
self.last_end = end;
self.min
}
Expand Down Expand Up @@ -101,55 +145,102 @@ impl<'a, T: NativeType + IsFloat + PartialOrd> RollingAggWindowNoNulls<'a, T> fo
}

unsafe fn update(&mut self, start: usize, end: usize) -> T {
// if we exceed the end, we have a completely new window
// so we recompute
let recompute_max = if start >= self.last_end {
true
} else {
// remove elements that should leave the window
let mut recompute_max = false;
for idx in self.last_start..start {
// safety
// we are in bounds
let leaving_value = self.slice.get_unchecked(idx);
// if the leaving value is the max value, we need to recompute the max.
if matches!(
compare_fn_nan_max(leaving_value, &self.max),
Ordering::Equal
) {
recompute_max = true;
break;
}
}
recompute_max
};
self.last_start = start;

// we traverese all values and compute
if recompute_max {
// recompute max
if start >= self.last_end {
self.max = *self
.slice
.get_unchecked(start..end)
.iter()
.max_by(|a, b| compare_fn_nan_max(*a, *b))
.unwrap_unchecked();
.unwrap_or(&self.slice[start]);

self.last_start = start;
self.last_end = end;

return self.max;
}
// the max has not left the window, so we only check
// if the entering values are larger
else if end > self.last_end {
let max_entering = self
.slice
.get_unchecked(self.last_end..end)
.iter()
.max_by(|a, b| compare_fn_nan_max(*a, *b))
.unwrap_unchecked();

let mut recompute_max = false;
for idx in self.last_start..start {
// safety
// we are in bounds
let leaving_value = self.slice.get_unchecked(idx);
// if the leaving value is the max value, we need to recompute the max.
if matches!(
compare_fn_nan_max(max_entering, &self.max),
Ordering::Greater
compare_fn_nan_max(leaving_value, &self.max),
Ordering::Equal
) {
self.max = *max_entering
recompute_max = true;
break;
}
}

let entering_max = self
.slice
.get_unchecked(self.last_end..end)
.iter()
.max_by(|a, b| compare_fn_nan_max(*a, *b))
.unwrap_or(
&self.slice[std::cmp::max(self.last_start, self.last_end.saturating_sub(1))],
);

if recompute_max {
match compare_fn_nan_max(&self.max, entering_max) {
// do nothing
Ordering::Equal => {}
// leaving < entering
Ordering::Less => {
if matches!(
compare_fn_nan_max(entering_max, &self.max),
Ordering::Greater
) {
self.max = *entering_max
}
}
// leaving > entering
Ordering::Greater => {
// leaving value could be the largest, we might need to recompute

// just a random value in the window to prevent O(n^2) behavior
// that can occur when all values in the window are the same
let remaining_value1 = self.slice.get(start).unwrap_unchecked();
let remaining_value2 = self.slice.get(end.saturating_sub(1)).unwrap();

// we check those two value in the window, if they are equal to leaving, we know
// we don't need to traverse all to compote the window
if !matches!(
compare_fn_nan_max(remaining_value1, &self.max),
Ordering::Equal
) && !matches!(
compare_fn_nan_max(remaining_value2, &self.max),
Ordering::Equal
) {
// the maximum value int the window we did not yet compute
let max_in_between = self
.slice
.get_unchecked(start..self.last_end)
.iter()
.max_by(|a, b| compare_fn_nan_max(*a, *b))
.unwrap_or(&self.slice[start]);

if matches!(
compare_fn_nan_max(max_in_between, entering_max),
Ordering::Greater
) {
self.max = *max_in_between
} else {
self.max = *entering_max
}
}
}
}
} else if matches!(
compare_fn_nan_max(entering_max, &self.max),
Ordering::Greater
) {
self.max = *entering_max
}
self.last_start = start;
self.last_end = end;
self.max
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-arrow/src/kernels/rolling/nulls/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a, T: NativeType + IsFloat + PartialOrd> RollingAggWindowNulls<'a, T> for
let leaving_value = self.slice.get_unchecked(idx);

// if the leaving value is the
// max value, we need to recompute the max.
// min value, we need to recompute the min.
if matches!(
compare_fn_nan_min(leaving_value, &self.min.unwrap()),
Ordering::Equal
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a9d2298

Please sign in to comment.