Skip to content

Commit

Permalink
pivot enum dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 4, 2020
1 parent 7b8ed48 commit fe3b634
Showing 1 changed file with 74 additions and 39 deletions.
113 changes: 74 additions & 39 deletions polars/src/frame/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use num::{Num, NumCast, ToPrimitive, Zero};
use rayon::prelude::*;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::hash::{Hash, Hasher};

fn groupby<T>(a: impl Iterator<Item = T>) -> Vec<(usize, Vec<usize>)>
where
Expand Down Expand Up @@ -798,15 +798,15 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
/// let df = DataFrame::new(vec![s0, s1, s2]).unwrap();
/// println!("{:?}", df);
///
/// fn example(df: DataFrame) -> Result<()> {
/// df.groupby("foo")?
/// fn example(df: DataFrame) -> Result<DataFrame> {
/// let df = df.groupby("foo")?
/// .pivot("bar", "N")
/// .first();
///
/// Ok(())
/// Ok(df)
/// }
/// example(df);
///
/// let df = example(df);
/// println!("{:?}", df);
///
/// assert!(false)
///
Expand Down Expand Up @@ -834,76 +834,111 @@ pub struct Pivot<'df, 'selection_str> {
values_column: &'selection_str str,
}

impl<'df, 'sel_str> Pivot<'df, 'sel_str> {
pub fn first(&self) -> DataFrame {
let pivot_series = self.gb.df.column(self.pivot_column).unwrap();
#[enum_dispatch(Series)]
trait ChunkPivot {
fn pivot(
&self,
pivot_series: &Series,
keys: Vec<Series>,
groups: &Vec<(usize, Vec<usize>)>,
) -> DataFrame {
unimplemented!()
}
}

impl<T> ChunkPivot for ChunkedArray<T>
where
T: PolarsNumericType,
T::Native: Copy,
{
fn pivot(
&self,
pivot_series: &Series,
keys: Vec<Series>,
groups: &Vec<(usize, Vec<usize>)>,
) -> DataFrame {
/// TODO: save an allocation by creating a random access struct for the Groupable utility type.
let pivot_vec: Vec<_> = pivot_series.as_groupable_iter().collect();

let values_ca = self
.gb
.df
.column(self.values_column)
.unwrap()
.i32()
.unwrap();

let values_taker = values_ca.take_rand();
let values_taker = self.take_rand();

let new_column_map = |size| {
// create a new hashmap that will be filled with new Vecs that later will be aggegrated
let mut columns_agg_map = HashMap::with_capacity(size);
for column_name in &pivot_vec {
columns_agg_map
.entry(column_name)
.or_insert_with(|| Vec::new());
for opt_column_name in &pivot_vec {
if let Some(column_name) = opt_column_name {
columns_agg_map
.entry(column_name)
.or_insert_with(|| Vec::new());
}
}

columns_agg_map
};

// create a hash map that will be filled with the results of the aggregation.
let mut columns_agg_map_main = new_column_map(self.gb.groups.len());
// let mut columns_agg_map_main = new_column_map(groups.len());
let mut columns_agg_map_main = HashMap::with_capacity(pivot_vec.len());
for opt_column_name in &pivot_vec {
if let Some(column_name) = opt_column_name {
columns_agg_map_main.entry(column_name).or_insert_with(|| {
PrimitiveChunkedBuilder::<T>::new(&format!("{:?}", column_name), groups.len())
});
}
}

// iterate over the groups that need to be aggregated
// idxes are the indexes of the groups in the keys, pivot, and values columns
for (_first, idx) in &self.gb.groups {
for (_first, idx) in groups {
// for every group do the aggregation by adding them to the vector belonging by that column
// the columns are hashed with the pivot values
let mut columns_agg_map_group = new_column_map(idx.len());
for &i in idx {
let pivot_val = unsafe { pivot_vec.get_unchecked(i) };
let values_val = values_taker.get(i);
columns_agg_map_group
.get_mut(&pivot_val)
.map(|v| v.push(values_val));
let opt_pivot_val = unsafe { pivot_vec.get_unchecked(i) };

if let Some(pivot_val) = opt_pivot_val {
let values_val = values_taker.get(i);
columns_agg_map_group
.get_mut(&pivot_val)
.map(|v| v.push(values_val));
}
}

// After the vectors are filled we really do the aggregation and add the result to the main
// hash map, mapping pivot values as column to aggregate result.
for (k, v) in &columns_agg_map_group {
let main_vec = columns_agg_map_main.get_mut(k).unwrap();
let main_builder = columns_agg_map_main.get_mut(k).unwrap();

match v.len() {
0 => main_vec.push(None),
0 => main_builder.append_null(),
/// NOTE: now we take first, but this is the place where all aggregations happen
_ => main_vec.push(v[0]),
_ => main_builder.append_option(v[0]),
}
}
}
// todo: increase capacity
let mut cols = self.gb.keys();
let mut cols = keys;

columns_agg_map_main.iter().for_each(|(k, v)| {
let s = Series::new(&format!("{:?}", k), v);
cols.push(s);
});
for (_, builder) in columns_agg_map_main {
let ca = builder.finish();
cols.push(ca.into_series());
}

let df = DataFrame::new(cols).unwrap();
println!("{:?}", df);
df
}
}

impl ChunkPivot for BooleanChunked {}
impl ChunkPivot for Utf8Chunked {}
impl ChunkPivot for LargeListChunked {}

impl<'df, 'sel_str> Pivot<'df, 'sel_str> {
pub fn first(&self) -> DataFrame {
let pivot_series = self.gb.df.column(self.pivot_column).unwrap();
let values_series = self.gb.df.column(self.values_column).unwrap();

todo!()
values_series.pivot(pivot_series, self.gb.keys(), &self.gb.groups)
}
}

Expand Down

0 comments on commit fe3b634

Please sign in to comment.