Skip to content

Commit

Permalink
take refactors and impls and slice aggregate in lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 21, 2021
1 parent 884dfaa commit 3bc322e
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 49 deletions.
12 changes: 3 additions & 9 deletions polars/polars-core/src/chunked_array/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,9 @@ where
}

if self.null_count() == 0 {
let mut av: AlignedVec<_> = self.into_no_null_iter().collect();
sort_branch(
av.as_mut_slice(),
sort_parallel,
reverse,
order_default,
order_reverse,
);
ChunkedArray::new_from_aligned_vec(self.name(), av)
// rechunk and call again, then it will fall in the contiguous slice path.
let ca = self.rechunk();
ca.sort(reverse)
} else {
let mut v = Vec::from_iter(self);
sort_branch(
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-core/src/chunked_array/ops/take_random.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[cfg(feature = "object")]
use crate::chunked_array::object::ObjectArray;
use crate::prelude::downcast::Chunks;
use crate::prelude::*;
Expand Down Expand Up @@ -414,6 +415,7 @@ pub struct ObjectTakeRandom<'a, T: PolarsObject> {
chunks: Vec<&'a ObjectArray<T>>,
}

#[cfg(feature = "object")]
impl<'a, T: PolarsObject> TakeRandom for ObjectTakeRandom<'a, T> {
type Item = &'a T;

Expand All @@ -433,6 +435,7 @@ pub struct ObjectTakeRandomSingleChunk<'a, T: PolarsObject> {
arr: &'a ObjectArray<T>,
}

#[cfg(feature = "object")]
impl<'a, T: PolarsObject> TakeRandom for ObjectTakeRandomSingleChunk<'a, T> {
type Item = &'a T;

Expand All @@ -447,6 +450,7 @@ impl<'a, T: PolarsObject> TakeRandom for ObjectTakeRandomSingleChunk<'a, T> {
}
}

#[cfg(feature = "object")]
impl<'a, T: PolarsObject> IntoTakeRandom<'a> for &'a ObjectChunked<T> {
type Item = &'a T;
type TakeRandom = TakeRandBranch2<ObjectTakeRandomSingleChunk<'a, T>, ObjectTakeRandom<'a, T>>;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/groupby/aggregations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ where

// TODO! use collect, can be faster
// needed capacity for the list
let values_cap = groups.iter().fold(0, |acc, g| acc + g.1.len());
let values_cap = self.len();

macro_rules! impl_gb {
($type:ty, $agg_col:expr) => {{
Expand Down
110 changes: 89 additions & 21 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use crate::chunked_array::ops::unique::is_unique_helper;
use crate::frame::select::Selection;
use crate::prelude::*;
use crate::utils::{
accumulate_dataframes_horizontal, accumulate_dataframes_vertical, get_supertype, NoNull,
accumulate_dataframes_horizontal, accumulate_dataframes_vertical, get_supertype, split_ca,
split_df, NoNull,
};

mod arithmetic;
Expand Down Expand Up @@ -579,6 +580,36 @@ impl DataFrame {
}
}

/// Does a filter but splits thread chunks vertically instead of horizontally
/// This yields a DataFrame with `n_chunks == n_threads`.
fn filter_vertical(&self, mask: &BooleanChunked) -> Result<Self> {
let n_threads = POOL.current_num_threads();

let masks = split_ca(mask, n_threads).unwrap();
let dfs = split_df(self, n_threads).unwrap();
let dfs: Result<Vec<_>> = POOL.install(|| {
masks
.par_iter()
.zip(dfs)
.map(|(mask, df)| {
let cols = df
.columns
.iter()
.map(|s| s.filter(mask))
.collect::<Result<_>>()?;
Ok(DataFrame::new_no_checks(cols))
})
.collect()
});

let mut iter = dfs?.into_iter();
let first = iter.next().unwrap();
Ok(iter.fold(first, |mut acc, df| {
acc.vstack_mut(&df).unwrap();
acc
}))
}

/// Take DataFrame rows by a boolean mask.
///
/// # Example
Expand All @@ -592,6 +623,10 @@ impl DataFrame {
///
/// ```
pub fn filter(&self, mask: &BooleanChunked) -> Result<Self> {
if std::env::var("POLARS_VERT_PAR").is_ok() {
return self.filter_vertical(mask);
}

let new_col = POOL.install(|| {
self.columns
.par_iter()
Expand Down Expand Up @@ -643,6 +678,11 @@ impl DataFrame {
where
I: Iterator<Item = usize> + Clone + Sync,
{
if std::env::var("POLARS_VERT_PAR").is_ok() {
let idx_ca: NoNull<UInt32Chunked> = iter.into_iter().map(|idx| idx as u32).collect();
return self.take_unchecked_vectical(&idx_ca.into_inner());
}

let n_chunks = match self.n_chunks() {
Err(_) => return self.clone(),
Ok(n) => n,
Expand All @@ -655,16 +695,7 @@ impl DataFrame {
if n_chunks == 1 || has_utf8 {
let idx_ca: NoNull<UInt32Chunked> = iter.into_iter().map(|idx| idx as u32).collect();
let idx_ca = idx_ca.into_inner();
let cols = POOL.install(|| {
self.columns
.par_iter()
.map(|s| match s.dtype() {
DataType::Utf8 => s.take_unchecked_threaded(&idx_ca, true).unwrap(),
_ => s.take_unchecked(&idx_ca).unwrap(),
})
.collect()
});
return DataFrame::new_no_checks(cols);
return self.take_unchecked(&idx_ca);
}

let new_col = self
Expand Down Expand Up @@ -692,6 +723,11 @@ impl DataFrame {
where
I: Iterator<Item = Option<usize>> + Clone + Sync,
{
if std::env::var("POLARS_VERT_PAR").is_ok() {
let idx_ca: UInt32Chunked = iter.into_iter().map(|opt| opt.map(|v| v as u32)).collect();
return self.take_unchecked_vectical(&idx_ca);
}

let n_chunks = match self.n_chunks() {
Err(_) => return self.clone(),
Ok(n) => n,
Expand All @@ -704,16 +740,7 @@ impl DataFrame {

if n_chunks == 1 || has_utf8 {
let idx_ca: UInt32Chunked = iter.into_iter().map(|opt| opt.map(|v| v as u32)).collect();
let cols = POOL.install(|| {
self.columns
.par_iter()
.map(|s| match s.dtype() {
DataType::Utf8 => s.take_unchecked_threaded(&idx_ca, true).unwrap(),
_ => s.take_unchecked(&idx_ca).unwrap(),
})
.collect()
});
return DataFrame::new_no_checks(cols);
return self.take_unchecked(&idx_ca);
}

let new_col = self
Expand Down Expand Up @@ -764,6 +791,44 @@ impl DataFrame {
DataFrame::new_no_checks(new_col)
}

unsafe fn take_unchecked(&self, idx: &UInt32Chunked) -> Self {
let cols = POOL.install(|| {
self.columns
.par_iter()
.map(|s| match s.dtype() {
DataType::Utf8 => s.take_unchecked_threaded(&idx, true).unwrap(),
_ => s.take_unchecked(&idx).unwrap(),
})
.collect()
});
DataFrame::new_no_checks(cols)
}

unsafe fn take_unchecked_vectical(&self, indices: &UInt32Chunked) -> Self {
let n_threads = POOL.current_num_threads();
let idxs = split_ca(&indices, n_threads).unwrap();

let dfs: Vec<_> = POOL.install(|| {
idxs.par_iter()
.map(|idx| {
let cols = self
.columns
.iter()
.map(|s| s.take_unchecked(idx).unwrap())
.collect();
DataFrame::new_no_checks(cols)
})
.collect()
});

let mut iter = dfs.into_iter();
let first = iter.next().unwrap();
iter.fold(first, |mut acc, df| {
acc.vstack_mut(&df).unwrap();
acc
})
}

/// Rename a column in the DataFrame
///
/// # Example
Expand Down Expand Up @@ -846,6 +911,9 @@ impl DataFrame {
}
}
};
if std::env::var("POLARS_VERT_PAR").is_ok() {
return Ok(unsafe { self.take_unchecked_vectical(&take) });
}
Ok(self.take(&take))
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,7 @@ mod test {
.agg(vec![col("day").head(Some(2))])
.collect()
.unwrap();
dbg!(&out);
let s = out
.select_at_idx(1)
.unwrap()
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-lazy/src/physical_plan/executors/filter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::physical_plan::executors::POLARS_VERBOSE;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::prelude::*;
Expand All @@ -20,8 +19,8 @@ impl Executor for FilterExec {
let s = self.predicate.evaluate(&df, state)?;
let mask = s.bool().expect("filter predicate wasn't of type boolean");
let df = df.filter(mask)?;
if std::env::var(POLARS_VERBOSE).is_ok() {
println!("dataframe filtered");
if state.verbose {
eprintln!("dataframe filtered");
}
Ok(df)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ pub(crate) struct BinaryFunctionExpr {

impl PhysicalExpr for BinaryFunctionExpr {
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let series_a = self.input_a.evaluate(df, state)?;
let series_b = self.input_b.evaluate(df, state)?;
let (series_a, series_b) = POOL.install(|| {
rayon::join(
|| self.input_a.evaluate(df, state),
|| self.input_b.evaluate(df, state),
)
});

self.function.call_udf(series_a, series_b).map(|mut s| {
self.function.call_udf(series_a?, series_b?).map(|mut s| {
s.rename("binary_function");
s
})
Expand Down
15 changes: 3 additions & 12 deletions polars/polars-lazy/src/physical_plan/expressions/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl PhysicalExpr for SliceExpr {
let groups = groups
.iter()
.map(|(first, idx)| {
let (offset, len) = slice_offsets(self.offset, self.len, s.len());
let (offset, len) = slice_offsets(self.offset, self.len, idx.len());
(*first, idx[offset..offset + len].to_vec())
})
.collect();
Expand All @@ -53,16 +53,7 @@ impl PhysicalAggregation for SliceExpr {
groups: &GroupTuples,
state: &ExecutionState,
) -> Result<Option<Series>> {
let s = self.input.evaluate(df, state)?;
let agg_s = s.agg_list(groups);
let out = agg_s.map(|s| {
s.list()
.unwrap()
.into_iter()
.map(|opt_s| opt_s.map(|s| s.slice(self.offset, self.len)))
.collect::<ListChunked>()
.into_series()
});
Ok(out)
let (s, groups) = self.evaluate_on_groups(df, groups, state)?;
Ok(s.agg_list(&groups))
}
}

0 comments on commit 3bc322e

Please sign in to comment.