Skip to content

Commit

Permalink
parallelize horizontal aggregations (#2454)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 23, 2022
1 parent 90eb9e6 commit f5700ef
Showing 1 changed file with 79 additions and 52 deletions.
131 changes: 79 additions & 52 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2380,20 +2380,28 @@ impl DataFrame {
#[cfg(feature = "zip_with")]
#[cfg_attr(docsrs, doc(cfg(feature = "zip_with")))]
pub fn hmin(&self) -> Result<Option<Series>> {
let min_fn = |acc: &Series, s: &Series| {
let mask = acc.lt(s) & acc.is_not_null() | s.is_null();
acc.zip_with(&mask, s)
};

match self.columns.len() {
0 => Ok(None),
1 => Ok(Some(self.columns[0].clone())),
2 => min_fn(&self.columns[0], &self.columns[1]).map(Some),
_ => {
let first = Cow::Borrowed(&self.columns[0]);

self.columns[1..]
.iter()
.try_fold(first, |acc, s| {
let mask = acc.lt(s) & acc.is_not_null() | s.is_null();
let min = acc.zip_with(&mask, s)?;
Ok(Cow::Owned(min))
})
.map(|s| Some(s.into_owned()))
// the try_reduce_with is a bit slower in parallelism,
// but I don't think it matters here as we parallelize over columns, not over elements
POOL.install(|| {
self.columns
.par_iter()
.map(|s| Ok(Cow::Borrowed(s)))
.try_reduce_with(|l, r| min_fn(&l, &r).map(Cow::Owned))
// we can unwrap the option, because we are certain there is a column
// we started this operation on 3 columns
.unwrap()
.map(|cow| Some(cow.into_owned()))
})
}
}
}
Expand All @@ -2402,51 +2410,66 @@ impl DataFrame {
#[cfg(feature = "zip_with")]
#[cfg_attr(docsrs, doc(cfg(feature = "zip_with")))]
pub fn hmax(&self) -> Result<Option<Series>> {
let max_fn = |acc: &Series, s: &Series| {
let mask = acc.gt(s) & acc.is_not_null() | s.is_null();
acc.zip_with(&mask, s)
};

match self.columns.len() {
0 => Ok(None),
1 => Ok(Some(self.columns[0].clone())),
2 => max_fn(&self.columns[0], &self.columns[1]).map(Some),
_ => {
let first = Cow::Borrowed(&self.columns[0]);

self.columns[1..]
.iter()
.try_fold(first, |acc, s| {
let mask = acc.gt(s) & acc.is_not_null() | s.is_null();
let max = acc.zip_with(&mask, s)?;

Ok(Cow::Owned(max))
})
.map(|s| Some(s.into_owned()))
// the try_reduce_with is a bit slower in parallelism,
// but I don't think it matters here as we parallelize over columns, not over elements
POOL.install(|| {
self.columns
.par_iter()
.map(|s| Ok(Cow::Borrowed(s)))
.try_reduce_with(|l, r| max_fn(&l, &r).map(Cow::Owned))
// we can unwrap the option, because we are certain there is a column
// we started this operation on 3 columns
.unwrap()
.map(|cow| Some(cow.into_owned()))
})
}
}
}

/// Aggregate the column horizontally to their sum values.
pub fn hsum(&self, none_strategy: NullStrategy) -> Result<Option<Series>> {
let sum_fn = |acc: &Series, s: &Series, none_strategy: NullStrategy| -> Result<Series> {
let mut acc = acc.clone();
let mut s = s.clone();
if let NullStrategy::Ignore = none_strategy {
// if has nulls
if acc.has_validity() {
acc = acc.fill_null(FillNullStrategy::Zero)?;
}
if s.has_validity() {
s = s.fill_null(FillNullStrategy::Zero)?;
}
}
Ok(&acc + &s)
};

match self.columns.len() {
0 => Ok(None),
1 => Ok(Some(self.columns[0].clone())),
2 => sum_fn(&self.columns[0], &self.columns[1], none_strategy).map(Some),
_ => {
let first = Cow::Borrowed(&self.columns[0]);
self.columns[1..]
.iter()
.map(Cow::Borrowed)
.try_fold(first, |acc, s| {
let mut acc = acc.as_ref().clone();
let mut s = s.as_ref().clone();

if let NullStrategy::Ignore = none_strategy {
// if has nulls
if acc.has_validity() {
acc = acc.fill_null(FillNullStrategy::Zero)?;
}
if s.has_validity() {
s = s.fill_null(FillNullStrategy::Zero)?;
}
}
Ok(Cow::Owned(&acc + &s))
})
.map(|s| Some(s.into_owned()))
// the try_reduce_with is a bit slower in parallelism,
// but I don't think it matters here as we parallelize over columns, not over elements
POOL.install(|| {
self.columns
.par_iter()
.map(|s| Ok(Cow::Borrowed(s)))
.try_reduce_with(|l, r| sum_fn(&l, &r, none_strategy).map(Cow::Owned))
// we can unwrap the option, because we are certain there is a column
// we started this operation on 3 columns
.unwrap()
.map(|cow| Some(cow.into_owned()))
})
}
}
}
Expand All @@ -2457,17 +2480,20 @@ impl DataFrame {
0 => Ok(None),
1 => Ok(Some(self.columns[0].clone())),
_ => {
let sum = self.hsum(none_strategy)?;

let first: Cow<Series> =
Cow::Owned(self.columns[0].is_null().cast(&DataType::UInt32).unwrap());
let null_count = self.columns[1..]
.iter()
.map(Cow::Borrowed)
.fold(first, |acc, s| {
Cow::Owned(acc.as_ref() + &s.is_null().cast(&DataType::UInt32).unwrap())
})
.into_owned();
let sum = || self.hsum(none_strategy);

let null_count = || {
self.columns
.par_iter()
.map(|s| s.is_null().cast(&DataType::UInt32).unwrap())
.reduce_with(|l, r| &l + &r)
// we can unwrap the option, because we are certain there is a column
// we started this operation on 2 columns
.unwrap()
};

let (sum, null_count) = POOL.install(|| rayon::join(sum, null_count));
let sum = sum?;

// value lengths: len - null_count
let value_length: UInt32Chunked =
Expand Down Expand Up @@ -2877,6 +2903,7 @@ mod test {

#[test]
#[cfg(feature = "zip_with")]
#[cfg_attr(miri, ignore)]
fn test_h_agg() {
let a = Series::new("a", &[1, 2, 6]);
let b = Series::new("b", &[Some(1), None, None]);
Expand Down

0 comments on commit f5700ef

Please sign in to comment.