Skip to content

Commit

Permalink
New window expression rules (#2604)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 11, 2022
1 parent 5843694 commit dfac08d
Show file tree
Hide file tree
Showing 14 changed files with 618 additions and 290 deletions.
21 changes: 15 additions & 6 deletions polars/polars-core/src/chunked_array/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ fn sort_branch<T, Fd, Fr>(
}
}

#[cfg(feature = "private")]
pub fn argsort_no_nulls<Idx, T>(slice: &mut [(Idx, T)], reverse: bool)
where
T: PartialOrd + Send,
Idx: PartialOrd + Send,
{
argsort_branch(
slice,
reverse,
|(_, a), (_, b)| order_default(a, b),
|(_, a), (_, b)| order_reverse(a, b),
);
}

fn argsort_branch<T, Fd, Fr>(
slice: &mut [T],
reverse: bool,
Expand Down Expand Up @@ -304,12 +318,7 @@ where
vals.extend_trusted_len(iter);
});

argsort_branch(
vals.as_mut_slice(),
reverse,
|(_, a), (_, b)| order_default(a, b),
|(_, a), (_, b)| order_reverse(a, b),
);
argsort_no_nulls(vals.as_mut_slice(), reverse);

let ca: NoNull<UInt32Chunked> = vals.into_iter().map(|(idx, _v)| idx).collect_trusted();
let mut ca = ca.into_inner();
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use rayon::prelude::*;
use std::borrow::Cow;
use std::ops::{Deref, DerefMut};

#[cfg(feature = "private")]
pub use crate::chunked_array::ops::sort::argsort_no_nulls;

#[repr(transparent)]
pub struct Wrap<T>(pub T);

Expand Down
2 changes: 0 additions & 2 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
//! fn example() -> Result<DataFrame> {
//! // always prefer `from_path` as that is fastest.
//! CsvReader::from_path("iris_csv")?
//! .infer_schema(None)
//! .has_header(true)
//! .finish()
//! }
Expand Down Expand Up @@ -189,7 +188,6 @@ impl NullValues {
///
/// fn example() -> Result<DataFrame> {
/// CsvReader::from_path("iris_csv")?
/// .infer_schema(None)
/// .has_header(true)
/// .finish()
/// }
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@
//! }
//! ```
#![cfg_attr(docsrs, feature(doc_cfg))]
extern crate core;

#[cfg(all(feature = "dot_diagram", feature = "compile"))]
mod dot;
#[cfg(feature = "compile")]
Expand Down
16 changes: 15 additions & 1 deletion polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl PhysicalExpr for BinaryExpr {
(AggState::AggregatedFlat(s), AggState::NotAggregated(_) | AggState::Literal(_))
if s.len() != df.height() =>
{
dbg!("HIER");
// this is a flat series of len eq to group tuples
let l = ac_l.aggregated();
let l = l.as_ref();
Expand Down Expand Up @@ -145,6 +146,7 @@ impl PhysicalExpr for BinaryExpr {
ca.rename(l.name());

ac_l.with_series(ca.into_series(), true);
ac_l.with_update_groups(UpdateGroups::WithGroupsLen);
Ok(ac_l)
}
// if the groups_len == df.len we can just apply all flat.
Expand Down Expand Up @@ -191,6 +193,7 @@ impl PhysicalExpr for BinaryExpr {
ca.rename(l.name());

ac_l.with_series(ca.into_series(), true);
ac_l.with_update_groups(UpdateGroups::WithGroupsLen);
Ok(ac_l)
}
(AggState::AggregatedList(_), AggState::NotAggregated(_) | AggState::Literal(_))
Expand All @@ -209,8 +212,19 @@ impl PhysicalExpr for BinaryExpr {
ac_l.with_series(out, false);
Ok(ac_l)
}
// flatten the Series and apply the operators
(AggState::AggregatedList(_), AggState::AggregatedList(_)) => {
let out = apply_operator(
ac_l.flat_naive().as_ref(),
ac_r.flat_naive().as_ref(),
self.op,
)?;

// Both are or a flat series or aggregated into a list
ac_l.combine_groups(ac_r).with_series(out, false);
ac_l.with_update_groups(UpdateGroups::WithGroupsLen);
Ok(ac_l)
}
// Both are or a flat series
// so we can flatten the Series and apply the operators
_ => {
let out = apply_operator(
Expand Down
19 changes: 8 additions & 11 deletions polars/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use polars_core::prelude::*;
use polars_io::predicates::PhysicalIoExpr;
use std::borrow::Cow;

#[cfg_attr(debug_assertions, derive(Debug))]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) enum AggState {
/// Already aggregated: `.agg_list(group_tuples` is called
/// and produced a `Series` of dtype `List`
Expand Down Expand Up @@ -72,12 +71,6 @@ pub(crate) enum UpdateGroups {
WithSeriesLen,
}

impl Default for AggState {
fn default() -> Self {
AggState::Literal(Series::default())
}
}

#[cfg_attr(debug_assertions, derive(Debug))]
pub struct AggregationContext<'a> {
/// Can be in one of two states
Expand Down Expand Up @@ -224,7 +217,10 @@ impl<'a> AggregationContext<'a> {
}

pub(crate) fn is_not_aggregated(&self) -> bool {
matches!(&self.state, AggState::NotAggregated(_))
matches!(
&self.state,
AggState::NotAggregated(_) | AggState::Literal(_)
)
}

pub(crate) fn is_aggregated(&self) -> bool {
Expand Down Expand Up @@ -393,12 +389,13 @@ impl<'a> AggregationContext<'a> {

/// Take the series.
pub(crate) fn take(&mut self) -> Series {
match std::mem::take(&mut self.state) {
let s = match &mut self.state {
AggState::NotAggregated(s)
| AggState::AggregatedFlat(s)
| AggState::AggregatedList(s) => s,
AggState::Literal(s) => s,
}
};
std::mem::take(s)
}
}

Expand Down

0 comments on commit dfac08d

Please sign in to comment.