Skip to content

Commit

Permalink
pivot init; todo, make work in type system
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 4, 2020
1 parent dfe67f5 commit 6f8ad5f
Showing 1 changed file with 126 additions and 5 deletions.
131 changes: 126 additions & 5 deletions polars/src/frame/group_by.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use super::hash_join::prepare_hashed_relation;
use crate::chunked_array::builder::AlignedAlloc;
use crate::chunked_array::builder::PrimitiveChunkedBuilder;
use crate::frame::select::Selection;
use crate::prelude::*;
use arrow::array::{PrimitiveBuilder, StringBuilder};
use enum_dispatch::enum_dispatch;
use fnv::FnvHashMap;
use itertools::Itertools;
use num::{Num, NumCast, ToPrimitive, Zero};
use rayon::prelude::*;
use std::collections::HashMap;
use std::hash::Hash;

fn groupby<T>(a: impl Iterator<Item = T>) -> Vec<(usize, Vec<usize>)>
Expand Down Expand Up @@ -238,13 +242,13 @@ impl DataFrame {
/// ```
///
#[derive(Debug, Clone)]
pub struct GroupBy<'a, 'b> {
df: &'a DataFrame,
pub struct GroupBy<'df, 'selection_str> {
df: &'df DataFrame,
selected_keys: Vec<Series>,
// [first idx, [other idx]]
groups: Vec<(usize, Vec<usize>)>,
// columns selected for aggregation
selected_agg: Option<Vec<&'b str>>,
selected_agg: Option<Vec<&'selection_str str>>,
}

#[enum_dispatch(Series)]
Expand Down Expand Up @@ -429,12 +433,12 @@ impl AggFirst for Utf8Chunked {

impl AggFirst for LargeListChunked {}

impl<'a, 'b> GroupBy<'a, 'b> {
impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
/// Select the column by which the determine the groups.
/// You can select a single column or a slice of columns.
pub fn select<S>(mut self, selection: S) -> Self
where
S: Selection<'b>,
S: Selection<'selection_str>,
{
self.selected_agg = Some(selection.to_selection_vec());
self
Expand Down Expand Up @@ -764,6 +768,123 @@ impl<'a, 'b> GroupBy<'a, 'b> {
}
DataFrame::new(cols)
}

///
/// ```rust
/// use polars::prelude::*;
/// let s0 = Series::new("foo", ["A", "A", "B", "B", "C"].as_ref());
/// let s1 = Series::new("N", [1, 2, 2, 4, 2].as_ref());
/// let s2 = Series::new("bar", ["k", "l", "m", "n", "o"].as_ref());
/// // create a new DataFrame
/// let df = DataFrame::new(vec![s0, s1, s2]).unwrap();
/// println!("{:?}", df);
///
/// fn example(df: DataFrame) -> Result<()> {
/// df.groupby("foo")?
/// .pivot("bar", "N")
/// .first();
///
/// Ok(())
/// }
/// example(df);
///
///
/// assert!(false)
///
/// ```
pub fn pivot(
&mut self,
pivot_column: &'selection_str str,
values_column: &'selection_str str,
) -> Pivot {
// same as select method
self.selected_agg = Some(vec![pivot_column, values_column]);

let pivot = Pivot {
gb: self,
pivot_column,
values_column,
};
pivot
}
}

pub struct Pivot<'df, 'selection_str> {
gb: &'df GroupBy<'df, 'selection_str>,
pivot_column: &'selection_str str,
values_column: &'selection_str str,
}

impl<'df, 'sel_str> Pivot<'df, 'sel_str> {
pub fn first(&self) -> DataFrame {
let pivot_series = self.gb.df.column(self.pivot_column).unwrap();

let pivot_ca = pivot_series.utf8().unwrap();
let values_ca = self
.gb
.df
.column(self.values_column)
.unwrap()
.i32()
.unwrap();

let pivot_taker = pivot_ca.take_rand();
let values_taker = values_ca.take_rand();

let new_column_map = |size| {
// create a new hashmap that will be filled with new Vecs that later will be aggegrated
let mut columns_agg_map = HashMap::with_capacity(size);
for column_name in pivot_ca {
columns_agg_map
.entry(column_name)
.or_insert_with(|| Vec::new());
}

columns_agg_map
};

// create a hash map that will be filled with the results of the aggregation.
let mut columns_agg_map_main = new_column_map(self.gb.groups.len());

// iterate over the groups that need to be aggregated
// idxes are the indexes of the groups in the keys, pivot, and values columns
for (_first, idx) in &self.gb.groups {
// for every group do the aggregation by adding them to the vector belonging by that column
// the columns are hashed with the pivot values
let mut columns_agg_map_group = new_column_map(idx.len());
for &i in idx {
let pivot_val = pivot_taker.get(i);
let values_val = values_taker.get(i);
columns_agg_map_group
.get_mut(&pivot_val)
.map(|v| v.push(values_val));
}

// After the vectors are filled we really do the aggregation and add the result to the main
// hash map, mapping pivot values as column to aggregate result.
for (k, v) in &columns_agg_map_group {
let main_vec = columns_agg_map_main.get_mut(k).unwrap();

match v.len() {
0 => main_vec.push(None),
/// NOTE: now we take first, but this is the place where all aggregations happen
_ => main_vec.push(v[0]),
}
}
}
// todo: increase capacity
let mut cols = self.gb.keys();

columns_agg_map_main.iter().for_each(|(k, v)| {
let s = Series::new(&format!("{:?}", k), v);
cols.push(s);
});

let df = DataFrame::new(cols).unwrap();
println!("{:?}", df);

todo!()
}
}

#[cfg(test)]
Expand Down

0 comments on commit 6f8ad5f

Please sign in to comment.