Skip to content

Commit

Permalink
perf[rust]: reuse allocated buffer if available in window perferct so…
Browse files Browse the repository at this point in the history
…rt (#4428)
  • Loading branch information
ritchie46 committed Aug 15, 2022
1 parent b740e77 commit c3f4ac8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
17 changes: 12 additions & 5 deletions polars/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ impl PhysicalExpr for WindowExpr {
// idx (new-idx, original-idx)
let mut idx_mapping = Vec::with_capacity(out_column.len());

// we already set this buffer so we can reuse the `original_idx` buffer
// that saves an allocation
let mut take_idx = vec![];

// groups are not changed, we can map by doing a standard argsort.
if std::ptr::eq(ac.groups().as_ref(), gb.get_groups()) {
let mut iter = 0..flattened.len() as IdxSize;
Expand Down Expand Up @@ -399,26 +403,29 @@ impl PhysicalExpr for WindowExpr {
}
};

let mut original_idx = original_idx.into_iter();
let mut original_idx_iter = original_idx.iter().copied();

match ac.groups().as_ref() {
GroupsProxy::Idx(groups) => {
for g in groups.all() {
idx_mapping.extend(g.iter().copied().zip(&mut original_idx));
idx_mapping.extend(g.iter().copied().zip(&mut original_idx_iter));
}
}
GroupsProxy::Slice { groups, .. } => {
for &[first, len] in groups {
idx_mapping.extend((first..first + len).zip(&mut original_idx));
idx_mapping
.extend((first..first + len).zip(&mut original_idx_iter));
}
}
}
original_idx.clear();
take_idx = original_idx;
}
cache_gb(gb);
// Safety:
// we only have unique indices ranging from 0..len
let idx = unsafe { perfect_sort(&POOL, &idx_mapping) };
let idx = IdxCa::from_vec("", idx);
unsafe { perfect_sort(&POOL, &idx_mapping, &mut take_idx) };
let idx = IdxCa::from_vec("", take_idx);

// Safety:
// groups should always be in bounds.
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-utils/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ use crate::IdxSize;
///
/// # Safety
/// The caller must ensure that the right indexes fo `&[(_, IdxSize)]` are integers ranging from `0..idx.len`
pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(IdxSize, IdxSize)]) -> Vec<IdxSize> {
pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(IdxSize, IdxSize)], out: &mut Vec<IdxSize>) {
let chunk_size = std::cmp::max(
idx.len() / pool.current_num_threads(),
pool.current_num_threads(),
);

let mut out: Vec<IdxSize> = Vec::with_capacity(idx.len());
out.reserve(idx.len());
let ptr = out.as_mut_ptr() as *const IdxSize as usize;

pool.install(|| {
Expand All @@ -38,5 +38,4 @@ pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(IdxSize, IdxSize)]) -> Vec
// Safety:
// all elements are written
out.set_len(idx.len());
out
}

0 comments on commit c3f4ac8

Please sign in to comment.