Skip to content

Commit

Permalink
partition init
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 14, 2021
1 parent 2978ce5 commit 800dc3b
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 43 deletions.
94 changes: 94 additions & 0 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,100 @@ pub enum AnyValue<'a> {
Object(&'a str),
}

impl From<f64> for AnyValue<'_> {
fn from(a: f64) -> Self {
AnyValue::Float64(a)
}
}
impl From<f32> for AnyValue<'_> {
fn from(a: f32) -> Self {
AnyValue::Float32(a)
}
}
impl From<u32> for AnyValue<'_> {
fn from(a: u32) -> Self {
AnyValue::UInt32(a)
}
}
impl From<u64> for AnyValue<'_> {
fn from(a: u64) -> Self {
AnyValue::UInt64(a)
}
}
impl From<i64> for AnyValue<'_> {
fn from(a: i64) -> Self {
AnyValue::Int64(a)
}
}
impl From<i32> for AnyValue<'_> {
fn from(a: i32) -> Self {
AnyValue::Int32(a)
}
}
impl From<i16> for AnyValue<'_> {
fn from(a: i16) -> Self {
AnyValue::Int16(a)
}
}
impl From<u16> for AnyValue<'_> {
fn from(a: u16) -> Self {
AnyValue::UInt16(a)
}
}

impl From<i8> for AnyValue<'_> {
fn from(a: i8) -> Self {
AnyValue::Int8(a)
}
}
impl From<u8> for AnyValue<'_> {
fn from(a: u8) -> Self {
AnyValue::UInt8(a)
}
}

impl<'a, T> From<Option<T>> for AnyValue<'a>
where
T: Into<AnyValue<'a>>,
{
fn from(a: Option<T>) -> Self {
match a {
None => AnyValue::Null,
Some(v) => v.into(),
}
}
}

impl<'a> AnyValue<'a> {
pub fn add<'b>(&self, rhs: &AnyValue<'b>) -> AnyValue<'a> {
use AnyValue::*;
match (self, rhs) {
(Null, _) => Null,
(_, Null) => Null,
(Int32(l), Int32(r)) => Int32(l + r),
(Int64(l), Int64(r)) => Int64(l + r),
(UInt32(l), UInt32(r)) => UInt32(l + r),
(UInt64(l), UInt64(r)) => UInt64(l + r),
(Float32(l), Float32(r)) => Float32(l + r),
(Float64(l), Float64(r)) => Float64(l + r),
_ => todo!(),
}
}
}

impl<'a> From<AnyValue<'a>> for Option<i64> {
fn from(val: AnyValue<'a>) -> Self {
use AnyValue::*;
match val {
Null => None,
Int32(v) => Some(v as i64),
Int64(v) => Some(v as i64),
UInt32(v) => Some(v as i64),
_ => todo!(),
}
}
}

impl Display for DataType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = match self {
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) mod pivot;
pub mod resample;

pub type GroupTuples = Vec<(u32, Vec<u32>)>;
pub type GroupedMap<T> = HashMap<T, Vec<u32>, RandomState>;

fn groupby<T>(a: impl Iterator<Item = T>) -> GroupTuples
where
Expand Down
19 changes: 12 additions & 7 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,21 @@ where
.enumerate()
.for_each(|(idx, (h, t))| {
let idx = (idx + offset) as u32;
hash_tbl

let entry = hash_tbl
.raw_entry_mut()
// uses the key to check equality to find and entry
.from_key_hashed_nocheck(h, &t)
// if entry is found modify it
.and_modify(|_k, v| {
.from_key_hashed_nocheck(h, &t);

match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(h, t, vec![idx]);
}
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
v.push(idx);
})
// otherwise we insert both the key and new Vec without hashing
.or_insert_with(|| (t, vec![idx]));
}
}
});
hash_tbl
}
Expand Down
24 changes: 24 additions & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2020,4 +2020,28 @@ mod test {
[Some(6), Some(0), Some(0)]
);
}

#[test]
fn test_lazy_partitioned_grouping() {
std::env::set_var("POLARS_NEW_PARTITION", "1");
let df = df! {
"a" => [1, 1, 1, 2, 2, 3, 1],
"b" => [1i64, 2, 3, 4, 5, 6, 7]
}
.unwrap();

// test if it runs in groupby context
let out = df
.lazy()
.groupby(vec![col("a")])
.agg(vec![col("b").sum()])
.sort("a", false)
.collect()
.unwrap();

assert_eq!(
Vec::from(out.column("b_sum").unwrap().i64().unwrap()),
[Some(13), Some(9), Some(6)]
);
}
}
54 changes: 23 additions & 31 deletions polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::utils::rename_aexpr_root_name;
use polars_core::utils::{accumulate_dataframes_vertical, num_cpus, split_df};
use polars_core::POOL;
use rayon::prelude::*;
use std::time::Instant;

/// Take an input Executor and a multiple expressions
pub struct GroupByExec {
Expand Down Expand Up @@ -86,21 +87,21 @@ impl Executor for GroupByExec {
/// Take an input Executor and a multiple expressions
pub struct PartitionGroupByExec {
input: Box<dyn Executor>,
keys: Vec<Arc<dyn PhysicalExpr>>,
key: Arc<dyn PhysicalExpr>,
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Expr>,
}

impl PartitionGroupByExec {
pub(crate) fn new(
input: Box<dyn Executor>,
keys: Vec<Arc<dyn PhysicalExpr>>,
key: Arc<dyn PhysicalExpr>,
phys_aggs: Vec<Arc<dyn PhysicalExpr>>,
aggs: Vec<Expr>,
) -> Self {
Self {
input,
keys,
key,
phys_aggs,
aggs,
}
Expand All @@ -115,24 +116,26 @@ impl Executor for PartitionGroupByExec {
// If the column is a categorical, we know the number of groups we have and can decide to continue
// partitioned or go for the standard groupby. The partitioned is likely to be faster on a small number
// of groups.
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&original_df, state))
.collect::<Result<Vec<_>>>()?;
let key = self.key.evaluate(&original_df, state)?;

debug_assert_eq!(keys.len(), 1);
let s = &keys[0];
if let Ok(ca) = s.categorical() {
if let Ok(ca) = key.categorical() {
let cat_map = ca
.get_categorical_map()
.expect("categorical type has categorical_map");
let frac = cat_map.len() as f32 / ca.len() as f32;
// TODO! proper benchmark which boundary should be chosen.
if frac > 0.3 {
return groupby_helper(original_df, keys, &self.phys_aggs, None, state);
return groupby_helper(original_df, vec![key], &self.phys_aggs, None, state);
}
}
if std::env::var("POLARS_NO_PARTITION").is_ok() {
dbg!("RUN STANDARD");
let now = Instant::now();
let a = groupby_helper(original_df, vec![key], &self.phys_aggs, None, state);
println!("standard took {} ms", now.elapsed().as_millis());
return a;
}

let mut expr_arena = Arena::with_capacity(64);

// This will be the aggregation on the partition results. Due to the groupby
Expand Down Expand Up @@ -160,25 +163,22 @@ impl Executor for PartitionGroupByExec {
.collect::<Result<Vec<_>>>()?;

let n_threads = num_cpus::get();
// We do a partitioned groupby. Meaning that we first do the groupby operation arbitrarily
// We do a partitioned groupby.
// Meaning that we first do the groupby operation arbitrarily
// splitted on several threads. Than the final result we apply the same groupby again.
let dfs = split_df(&original_df, n_threads)?;

let dfs = POOL.install(|| {
dfs.into_par_iter()
.map(|df| {
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&df, state))
.collect::<Result<Vec<_>>>()?;
let key = self.key.evaluate(&original_df, state)?;
let phys_aggs = &self.phys_aggs;
let gb = df.groupby_with_series(keys, false)?;
let gb = df.groupby_with_series(vec![key], false)?;
let groups = gb.get_groups();

let mut columns = gb.keys();
let agg_columns = phys_aggs
.iter()
.par_iter()
.map(|expr| {
let agg_expr = expr.as_agg_expr()?;
let opt_agg = agg_expr.evaluate_partitioned(&df, groups, state)?;
Expand All @@ -194,11 +194,7 @@ impl Executor for PartitionGroupByExec {
Ok(opt_agg)
}).collect::<Result<Vec<_>>>()?;

for agg in agg_columns.into_iter().flatten() {
for agg in agg {
columns.push(agg)
}
}
columns.extend(agg_columns.into_iter().flatten().map(|v| v.into_iter()).flatten());

let df = DataFrame::new_no_checks(columns);
Ok(df)
Expand All @@ -207,14 +203,10 @@ impl Executor for PartitionGroupByExec {

let df = accumulate_dataframes_vertical(dfs)?;

let keys = self
.keys
.iter()
.map(|e| e.evaluate(&df, state))
.collect::<Result<Vec<_>>>()?;
let key = self.key.evaluate(&df, state)?;

// do the same on the outer results
let gb = df.groupby_with_series(keys, true)?;
let gb = df.groupby_with_series(vec![key], true)?;
let groups = gb.get_groups();

let mut columns = gb.keys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ use crate::physical_plan::PhysicalAggregation;
use crate::prelude::*;
use polars_arrow::array::ValueSize;
use polars_core::chunked_array::builder::get_list_builder;
use polars_core::frame::groupby::{fmt_groupby_column, GroupByMethod, GroupTuples};
use polars_core::frame::groupby::{fmt_groupby_column, GroupByMethod, GroupTuples, GroupedMap};
use polars_core::prelude::*;
use polars_core::utils::NoNull;
use polars_core::{utils::NoNull, POOL};
use rayon::prelude::*;
use std::sync::Arc;
use std::time::Instant;

pub(crate) struct AggregationExpr {
pub(crate) expr: Arc<dyn PhysicalExpr>,
Expand Down Expand Up @@ -209,6 +211,7 @@ impl PhysicalAggregation for AggregationExpr {
}
}
}

impl PhysicalAggregation for AggQuantileExpr {
fn aggregate(
&self,
Expand Down
12 changes: 11 additions & 1 deletion polars/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) mod window;

use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::frame::groupby::GroupTuples;
use polars_core::frame::groupby::{GroupTuples, GroupedMap};
use polars_core::prelude::*;
use polars_io::PhysicalIoExpr;
use std::borrow::Cow;
Expand Down Expand Up @@ -111,4 +111,14 @@ pub trait PhysicalAggregation {
) -> Result<Option<Series>> {
self.aggregate(final_df, groups, state)
}

#[allow(clippy::ptr_arg)]
fn evaluate_partitioned_2(
&self,
_df: &DataFrame,
_g_maps: &[GroupedMap<Option<u64>>],
_state: &ExecutionState,
) -> Result<Option<Series>> {
unimplemented!()
}
}
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl DefaultPlanner {
if apply.is_some() {
partitionable = false;
}
let phys_keys =
let mut phys_keys =
self.create_physical_expressions(keys, Context::Default, expr_arena)?;
if partitionable {
let phys_aggs = self.create_physical_expressions(
Expand All @@ -308,7 +308,7 @@ impl DefaultPlanner {
)?;
Ok(Box::new(PartitionGroupByExec::new(
input,
phys_keys,
phys_keys.pop().unwrap(),
phys_aggs,
aggs.into_iter()
.map(|n| node_to_exp(n, expr_arena))
Expand Down

0 comments on commit 800dc3b

Please sign in to comment.