Skip to content

Commit

Permalink
Lazy: extend arguments in groupby context that do not belong to a group
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 31, 2021
1 parent b8957ce commit 968033e
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 26 deletions.
78 changes: 60 additions & 18 deletions polars/polars-lazy/src/dsl.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
//! Domain specific language for the Lazy api.
use crate::logical_plan::Context;
use crate::prelude::*;
#[cfg(feature = "is_in")]
use crate::utils::expr_to_root_column_name;
use crate::utils::{has_expr, has_wildcard};
use crate::utils::{has_expr, has_root_literal_expr, has_wildcard};
use polars_core::export::arrow::{array::BooleanArray, bitmap::MutableBitmap};
use polars_core::prelude::*;

Expand Down Expand Up @@ -886,7 +884,7 @@ impl Expr {

/// Apply a function/closure once the logical plan get executed.
///
/// This function is very similar to [apply](Expr::apply), but differs in how it handles aggregations.
/// This function is very similar to [`apply`], but differs in how it handles aggregations.
///
/// * `map` should be used for operations that are independent of groups, e.g. `multiply * 2`, or `raise to the power`
/// * `apply` should be used for operations that work on a group of data. e.g. `sum`, `count`, etc.
Expand All @@ -910,6 +908,27 @@ impl Expr {
}
}

/// Apply a function/closure once the logical plan get executed with many arguments
///
/// See the [`map`] function for the differences between [`map`] and [`apply`].
pub fn map_many<F>(self, function: F, arguments: &[Expr], output_type: GetOutput) -> Self
where
F: Fn(&mut [Series]) -> Result<Series> + 'static + Send + Sync,
{
let mut input = vec![self];
input.extend_from_slice(arguments);

Expr::Function {
input,
function: NoEq::new(Arc::new(function)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
input_wildcard_expansion: false,
},
}
}

/// Apply a function/closure once the logical plan get executed.
///
/// This function is very similar to [apply](Expr::apply), but differs in how it handles aggregations.
Expand Down Expand Up @@ -960,6 +979,27 @@ impl Expr {
}
}

/// Apply a function/closure over the groups with many arguments. This should only be used in a groupby aggregation.
///
/// See the [`apply`] function for the differences between [`map`] and [`apply`].
pub fn apply_many<F>(self, function: F, arguments: &[Expr], output_type: GetOutput) -> Self
where
F: Fn(&mut [Series]) -> Result<Series> + 'static + Send + Sync,
{
let mut input = vec![self];
input.extend_from_slice(arguments);

Expr::Function {
input,
function: NoEq::new(Arc::new(function)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyGroups,
input_wildcard_expansion: false,
},
}
}

/// Get mask of finite values if dtype is Float
#[allow(clippy::wrong_self_convention)]
pub fn is_finite(self) -> Self {
Expand Down Expand Up @@ -1281,20 +1321,22 @@ impl Expr {
#[cfg(feature = "is_in")]
#[cfg_attr(docsrs, doc(cfg(feature = "is_in")))]
pub fn is_in(self, other: Expr) -> Self {
let name = expr_to_root_column_name(&self).unwrap();
let output_field = Some(Field::new(&name, DataType::Boolean));
map_binary(
self,
other,
move |left, other| {
left.is_in(&other).map(|ca| {
let mut s = ca.into_series();
s.rename(&name);
s
})
},
output_field,
)
let has_literal = has_root_literal_expr(&other);
let f = |s: &mut [Series]| {
let left = &s[0];
let other = &s[1];

left.is_in(other).map(|ca| ca.into_series())
};
let arguments = &[other];
let output_type = GetOutput::from_type(DataType::Boolean);

// we don't have to apply on groups, so this is faster
if has_literal {
self.map_many(f, arguments, output_type)
} else {
self.apply_many(f, arguments, output_type)
}
}

/// Get the year of a Date/Datetime
Expand Down
53 changes: 46 additions & 7 deletions polars/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::prelude::*;
use polars_core::frame::groupby::GroupTuples;
use polars_core::prelude::*;
use rayon::prelude::*;
use std::borrow::Cow;
use std::convert::TryFrom;
use std::sync::Arc;

pub struct ApplyExpr {
Expand Down Expand Up @@ -236,20 +238,57 @@ impl PhysicalAggregation for ApplyExpr {
ApplyOptions::ApplyGroups => {
let mut container = vec![Default::default(); acs.len()];
let name = acs[0].series().name().to_string();
let first_len = acs[0].len();

// aggregate representation of the aggregation contexts
// then unpack the lists and finaly create iterators from this list chunked arrays.
let lists = acs
// the arguments of an apply can be a group, but can also be the result of a separate aggregation
// in the last case we may not aggregate, but see that Series as its own input.
// this part we make sure that we get owned series in the proper state (aggregated or not aggregated)
// so that we can make iterators from them next.
let owned_series = acs
.iter_mut()
.map(|ac| {
let s = ac.aggregated();
s.list().unwrap().clone()
let not_aggregated_len = ac.len();
let original_len = ac.is_original_len();

// this branch we see the argument per group, so we must aggregate
// every group will have a different argument
let s = if not_aggregated_len == first_len && original_len {
ac.aggregated()
// this branch we see the argument as a constant, that will be applied per group
} else {
Cow::Borrowed(ac.series())
};
(s, not_aggregated_len, original_len)
})
.collect::<Vec<_>>();

// now we make the iterators
let mut iters = owned_series
.iter()
.map(|(s, not_aggregated_len, original_len)| {
// this branch we see the arguments per group. every group has a different argument
if *not_aggregated_len == first_len && *original_len {
let ca = s.list().unwrap();
Box::new(
ca.downcast_iter()
.map(|arr| arr.iter())
.flatten()
.map(|arr| {
arr.map(|arr| Series::try_from(("", arr)).unwrap())
}),
)
as Box<dyn Iterator<Item = Option<Series>>>
// this branch we repeat the argument per group
} else {
dbg!("here");
let s = s.clone().into_owned();
Box::new(std::iter::repeat(Some(s)))
}
})
.collect::<Vec<_>>();
let mut iters = lists.iter().map(|ca| ca.into_iter()).collect::<Vec<_>>();

// length of the items to iterate over
let len = lists[0].len();
let len = groups.len();

let mut ca: ListChunked = (0..len)
.map(|_| {
Expand Down
22 changes: 22 additions & 0 deletions polars/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) mod window;

use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_arrow::array::ValueSize;
use polars_core::frame::groupby::GroupTuples;
use polars_core::prelude::*;
use polars_io::PhysicalIoExpr;
Expand Down Expand Up @@ -205,6 +206,10 @@ impl<'a> AggregationContext<'a> {
}
}

pub(crate) fn is_original_len(&self) -> bool {
self.original_len
}

pub(crate) fn set_original_len(&mut self, original_len: bool) -> &mut Self {
self.original_len = original_len;
self
Expand All @@ -229,6 +234,7 @@ impl<'a> AggregationContext<'a> {
self
}

/// Update the group tuples
pub(crate) fn with_groups(&mut self, groups: GroupTuples) -> &mut Self {
// In case of new groups, a series always needs to be flattened
self.with_series(self.flat().into_owned(), false);
Expand All @@ -238,6 +244,7 @@ impl<'a> AggregationContext<'a> {
self
}

/// Get the aggregated version of the series.
pub(crate) fn aggregated(&mut self) -> Cow<'_, Series> {
// we clone, because we only want to call `self.groups()` if needed.
// self groups may instantiate new groups and thus can be expensive.
Expand Down Expand Up @@ -278,6 +285,7 @@ impl<'a> AggregationContext<'a> {
}
}

/// Get the not-aggregated version of the series.
pub(crate) fn flat(&self) -> Cow<'_, Series> {
match &self.series {
AggState::NotAggregated(s) => Cow::Borrowed(s),
Expand All @@ -287,6 +295,20 @@ impl<'a> AggregationContext<'a> {
}
}

/// Get the length of the Series when it is not aggregated
pub(crate) fn len(&self) -> usize {
match &self.series {
AggState::NotAggregated(s) => s.len(),
AggState::AggregatedFlat(s) => s.len(),
AggState::AggregatedList(s) => {
let list = s.list().unwrap();
list.get_values_size()
}
AggState::None => unreachable!(),
}
}

/// Take the series.
pub(crate) fn take(&mut self) -> Series {
match std::mem::take(&mut self.series) {
AggState::NotAggregated(s)
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn scan_foods_csv() -> LazyFrame {
LazyCsvReader::new(path.to_string()).finish().unwrap()
}

#[cfg(feature = "parquet")]
fn scan_foods_parquet(par: bool) -> LazyFrame {
let path = "../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv";
let out_path = path.replace(".csv", ".parquet");
Expand Down
32 changes: 32 additions & 0 deletions polars/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2472,6 +2472,7 @@ fn test_agg_unique_first() -> Result<()> {
}

#[test]
#[cfg(feature = "parquet")]
fn test_parquet_exec() -> Result<()> {
// filter
for par in [true, false] {
Expand Down Expand Up @@ -2500,3 +2501,34 @@ fn test_parquet_exec() -> Result<()> {

Ok(())
}

#[test]
#[cfg(feature = "is_in")]
fn test_is_in() -> Result<()> {
let df = fruits_cars();

// TODO! fix this
// // this will be executed by apply (still incorrect)
// let out = df
// .lazy()
// .groupby_stable([col("fruits")])
// .agg([col("cars").is_in(col("cars").filter(col("cars").eq(lit("beetle"))))])
// .collect()?;

// this will be executed by map
let out = df
.lazy()
.groupby_stable([col("fruits")])
.agg([col("cars").is_in(lit(Series::new("a", ["beetle", "vw"])))])
.collect()?;

let out = out.column("cars").unwrap();
let out = out.explode()?;
let out = out.bool().unwrap();
assert_eq!(
Vec::from(out),
&[Some(true), Some(false), Some(true), Some(true), Some(true)]
);

Ok(())
}
5 changes: 5 additions & 0 deletions polars/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ where
current_expr.into_iter().any(matches)
}

/// Check if root expression is a literal
pub(crate) fn has_root_literal_expr(e: &Expr) -> bool {
matches!(e.into_iter().last(), Some(Expr::Literal(_)))
}

// this one is used so much that it has its own function, to reduce inlining
pub(crate) fn has_wildcard(current_expr: &Expr) -> bool {
has_expr(current_expr, |e| matches!(e, Expr::Wildcard))
Expand Down
1 change: 0 additions & 1 deletion polars/src/docs/eager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
//! * [Joins](#joins)
//! * [GroupBy](#groupby)
//! - [pivot](#pivot)
//! - [downsample](#downsample)
//! * [Melt](#melt)
//! * [Explode](#explode)
//! * [IO](#IO)
Expand Down

0 comments on commit 968033e

Please sign in to comment.