Skip to content

Commit

Permalink
quantile, median groupby agg and parallel n_unique execution; #44
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent 6d885a3 commit d5692eb
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 6 deletions.
21 changes: 21 additions & 0 deletions polars/src/chunked_array/upstream_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,27 @@ fn get_capacity_from_par_results<T>(ll: &LinkedList<Vec<T>>) -> usize {
ll.iter().map(|list| list.len()).sum()
}

impl<T> FromParallelIterator<T::Native> for Xob<ChunkedArray<T>>
where
T: ArrowPrimitiveType,
{
fn from_par_iter<I: IntoParallelIterator<Item = T::Native>>(iter: I) -> Self {
// Get linkedlist filled with different vec result from different threads
let vectors = collect_into_linked_list(iter);
let capacity: usize = get_capacity_from_par_results(&vectors);

let mut builder = PrimitiveChunkedBuilder::new("", capacity);
// Unpack all these results and append them single threaded
vectors.iter().for_each(|vec| {
for val in vec {
builder.append_value(*val);
}
});

Xob::new(builder.finish())
}
}

impl<T> FromParallelIterator<Option<T::Native>> for ChunkedArray<T>
where
T: ArrowPrimitiveType,
Expand Down
8 changes: 8 additions & 0 deletions polars/src/doc/changelog/v0_7.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! # Changelog v0.7
//!
//! * More group by aggregations:
//! - n_unique
//! - quantile
//! - median
//! - last
//!
97 changes: 95 additions & 2 deletions polars/src/frame/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ trait AggNUnique {
macro_rules! impl_agg_n_unique {
($self:ident, $groups:ident, $ca_type:ty) => {{
$groups
.into_iter()
.into_par_iter()
.map(|(_first, idx)| {
if $self.null_count() == 0 {
let mut set = HashSet::with_hasher(FnvBuildHasher::default());
Expand All @@ -513,7 +513,7 @@ macro_rules! impl_agg_n_unique {

impl<T> AggNUnique for ChunkedArray<T>
where
T: PolarsIntegerType + Send,
T: PolarsIntegerType + Sync,
T::Native: Hash + Eq,
{
fn agg_n_unique(&self, groups: &Vec<(usize, Vec<usize>)>) -> UInt32Chunked {
Expand All @@ -524,6 +524,8 @@ where
impl AggNUnique for Float32Chunked {}
impl AggNUnique for Float64Chunked {}
impl AggNUnique for LargeListChunked {}

// TODO: could be faster as it can only be null, true, or false
impl AggNUnique for BooleanChunked {
fn agg_n_unique(&self, groups: &Vec<(usize, Vec<usize>)>) -> UInt32Chunked {
impl_agg_n_unique!(self, groups, Xob<UInt32Chunked>)
Expand All @@ -536,6 +538,35 @@ impl AggNUnique for Utf8Chunked {
}
}

#[enum_dispatch(Series)]
trait AggQuantile {
fn agg_quantile(&self, _groups: &Vec<(usize, Vec<usize>)>, _quantile: f64) -> Series {
unimplemented!()
}
}

impl<T> AggQuantile for ChunkedArray<T>
where T: PolarsNumericType + Sync,
T::Native: PartialEq
{
fn agg_quantile(&self, groups: &Vec<(usize, Vec<usize>)>, quantile: f64) -> Series {
groups.into_par_iter()
.map(|(_first, idx)| {
let group_vals = unsafe { self.take_unchecked(idx.iter().copied(), Some(idx.len())) };
let sorted_idx = group_vals.argsort(false);
let quant_idx = (quantile * (sorted_idx.len() -1) as f64) as usize;
let value_idx = sorted_idx[quant_idx];
group_vals.get(value_idx)
}).collect::<ChunkedArray<T>>().into_series()
}
}

impl AggQuantile for Utf8Chunked {}
impl AggQuantile for BooleanChunked {}
impl AggQuantile for LargeListChunked {}



impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
/// Select the column by which the determine the groups.
/// You can select a single column or a slice of columns.
Expand Down Expand Up @@ -833,6 +864,52 @@ impl<'df, 'selection_str> GroupBy<'df, 'selection_str> {
DataFrame::new(cols)
}

/// Aggregate grouped `Series` and determine the quantile per group.
///
/// # Example
///
/// ```rust
/// # use polars::prelude::*;
/// fn example(df: DataFrame) -> Result<DataFrame> {
/// df.groupby("date")?.select("temp").quantile(0.2)
/// }
/// ```
pub fn quantile(&self, quantile: f64) -> Result<DataFrame> {
if quantile < 0.0 || quantile > 1.0 {
return Err(PolarsError::Other("quantile should be within 0.0 and 1.0".into()))
}
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_quantile_{:.2}", agg_col.name(), quantile];
let mut agg = agg_col.agg_quantile(&self.groups, quantile).into_series();
agg.rename(&new_name);
cols.push(agg);
}
DataFrame::new(cols)
}

/// Aggregate grouped `Series` and determine the median per group.
///
/// # Example
///
/// ```rust
/// # use polars::prelude::*;
/// fn example(df: DataFrame) -> Result<DataFrame> {
/// df.groupby("date")?.select("temp").median()
/// }
/// ```
pub fn median(&self) -> Result<DataFrame> {
let (mut cols, agg_cols) = self.prepare_agg()?;
for agg_col in agg_cols {
let new_name = format!["{}_median", agg_col.name()];
let mut agg = agg_col.agg_quantile(&self.groups, 0.5).into_series();
agg.rename(&new_name);
cols.push(agg);
}
DataFrame::new(cols)
}


/// Aggregate grouped series and compute the number of values per group.
///
/// # Example
Expand Down Expand Up @@ -1370,6 +1447,22 @@ mod test {
.n_unique()
.unwrap()
);
println!(
"{:?}",
df.groupby("date")
.unwrap()
.select("temp")
.quantile(0.2)
.unwrap()
);
println!(
"{:?}",
df.groupby("date")
.unwrap()
.select("temp")
.median()
.unwrap()
);
}

#[test]
Expand Down
12 changes: 12 additions & 0 deletions py-polars/pypolars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,15 @@ def n_unique(self) -> DataFrame:
Count the unique values per group.
"""
return wrap_df(self._df.groupby(self.by, self.selection, "n_unique"))

def quantile(self, quantile: float) -> DataFrame:
"""
Count the unique values per group.
"""
return wrap_df(self._df.groupby_quantile(self.by, self.selection, quantile))

def median(self) -> DataFrame:
"""
Return the median per group.
"""
return wrap_df(self._df.groupby(self.by, self.selection, "median"))
14 changes: 14 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,26 @@ impl PyDataFrame {
"sum" => selection.sum(),
"count" => selection.count(),
"n_unique" => selection.n_unique(),
"median" => selection.median(),
a => Err(PolarsError::Other(format!("agg fn {} does not exists", a))),
};
let df = df.map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

pub fn groupby_quantile(
&self,
by: Vec<&str>,
select: Vec<String>,
quantile: f64,
) -> PyResult<Self> {
let gb = self.df.groupby(&by).map_err(PyPolarsEr::from)?;
let selection = gb.select(&select);
let df = selection.quantile(quantile);
let df = df.map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

pub fn pivot(
&self,
by: Vec<String>,
Expand Down
10 changes: 6 additions & 4 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ def test_groupby():
.last()
.frame_equal(DataFrame({"a": ["a", "b", "c"], "": [3, 5, 6]}))
)
# check if it runs
(df.groupby("a").select("b").n_unique())

(df.groupby("a").select("b").quantile(0.3))

#
# # TODO: is false because count is u32
# df.groupby(by="a", select="b", agg="count").frame_equal(
Expand Down Expand Up @@ -165,9 +170,6 @@ def test_file_buffer():

def test_set():
np.random.seed(1)
df = DataFrame({"foo": np.random.rand(10),
"bar": np.arange(10),
"ham": ["h"] * 10})
df = DataFrame({"foo": np.random.rand(10), "bar": np.arange(10), "ham": ["h"] * 10})
df["new"] = np.random.rand(10)
df[df["new"] > 0.5, "new"] = 1

0 comments on commit d5692eb

Please sign in to comment.