Skip to content

Commit

Permalink
perfect sort optimization for window expressions (#2614)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 11, 2022
1 parent d27546e commit c65a6d5
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 10 deletions.
17 changes: 7 additions & 10 deletions polars/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use polars_core::frame::groupby::{GroupBy, GroupsProxy};
use polars_core::frame::hash_join::private_left_join_multiple_keys;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::utils::argsort_no_nulls;
use polars_core::POOL;
use polars_utils::sort::perfect_sort;
use std::sync::Arc;

pub struct WindowExpr {
Expand Down Expand Up @@ -395,18 +396,14 @@ impl PhysicalExpr for WindowExpr {
}
}
cache_gb(gb);
argsort_no_nulls(&mut idx_mapping, false);
// Safety:
// we only have unique indices ranging from 0..len
let idx = unsafe { perfect_sort(&POOL, &idx_mapping) };
let idx = UInt32Chunked::from_vec("", idx);

// Safety:
// groups should always be in bounds.
let out = unsafe {
flattened.take_iter_unchecked(&mut idx_mapping.into_iter().map(|(idx, _)| {
debug_assert!((idx as usize) < flattened.len());
idx as usize
}))
};

Ok(out)
unsafe { flattened.take_unchecked(&idx) }
}
Join => {
let out_column = ac.aggregated();
Expand Down
1 change: 1 addition & 0 deletions polars/polars-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ description = "private utils for the polars dataframe library"

[dependencies]
parking_lot = "0.11"
rayon = "1.5"
1 change: 1 addition & 0 deletions polars/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod arena;
pub mod contention_pool;
mod error;
pub mod mem;
pub mod sort;
40 changes: 40 additions & 0 deletions polars/polars-utils/src/sort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use rayon::{prelude::*, ThreadPool};

/// This is a perfect sort particularly useful for an argsort of an argsort
/// The second argsort sorts indices from `0` to `len` so can be just assigned to the
/// new index location.
///
/// Besides that we know that all indices are unique ang thus not alias so we can parallelize.
///
/// This sort does not sort in place and will allocate.
///
/// - The right indices are used for sorting
/// - The left indices are placed at the location right points to.
///
/// # Safety
/// The caller must ensure that the right indexes fo `&[(_, u32)]` are integers ranging from `0..idx.len`
pub unsafe fn perfect_sort(pool: &ThreadPool, idx: &[(u32, u32)]) -> Vec<u32> {
let chunk_size = std::cmp::max(
idx.len() / pool.current_num_threads(),
pool.current_num_threads(),
);

let mut out: Vec<u32> = Vec::with_capacity(idx.len());
let ptr = out.as_mut_ptr() as *const u32 as usize;

pool.install(|| {
idx.par_chunks(chunk_size).for_each(|indices| {
let ptr = ptr as *mut u32;
for (idx_val, idx_location) in indices {
// Safety:
// idx_location is in bounds by invariant of this function
// and we ensured we have at least `idx.len()` capacity
*ptr.add(*idx_location as usize) = *idx_val;
}
});
});
// Safety:
// all elements are written
out.set_len(idx.len());
out
}
1 change: 1 addition & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c65a6d5

Please sign in to comment.