Skip to content

Commit

Permalink
fast path for explicitly sorted dataframes and window expressions (#2613
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ritchie46 committed Feb 11, 2022
1 parent 5acad95 commit d27546e
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 35 deletions.
8 changes: 6 additions & 2 deletions polars/polars-core/src/frame/groupby/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ fn finish_group_order(out: Vec<Vec<IdxItem>>, sorted: bool) -> GroupsProxy {
if sorted {
let mut out = out.into_iter().flatten().collect::<Vec<_>>();
out.sort_unstable_by_key(|g| g.0);
GroupsProxy::Idx(GroupsIdx::from_iter(out.into_iter()))
let mut idx = GroupsIdx::from_iter(out.into_iter());
idx.sorted = true;
GroupsProxy::Idx(idx)
} else {
GroupsProxy::Idx(GroupsIdx::from(out))
}
Expand Down Expand Up @@ -54,7 +56,9 @@ where
.map(|(_k, v)| v)
.collect_trusted::<Vec<_>>();
groups.sort_unstable_by_key(|g| g.0);
GroupsProxy::Idx(groups.into_iter().collect())
let mut idx: GroupsIdx = groups.into_iter().collect();
idx.sorted = true;
GroupsProxy::Idx(idx)
} else {
GroupsProxy::Idx(hash_tbl.into_iter().map(|(_k, v)| v).collect())
}
Expand Down
18 changes: 16 additions & 2 deletions polars/polars-core/src/frame/groupby/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use rayon::prelude::*;
/// this make sorting fast.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct GroupsIdx {
pub(crate) sorted: bool,
first: Vec<u32>,
all: Vec<Vec<u32>>,
}
Expand Down Expand Up @@ -69,7 +70,12 @@ impl GroupsIdx {
let (first, all) = POOL.install(|| rayon::join(take_first, take_all));
self.first = first;
self.all = all;
self.sorted = true
}
pub fn is_sorted(&self) -> bool {
self.sorted
}

pub fn iter(
&self,
) -> std::iter::Zip<std::iter::Copied<std::slice::Iter<u32>>, std::slice::Iter<Vec<u32>>> {
Expand Down Expand Up @@ -98,7 +104,11 @@ impl GroupsIdx {
impl FromIterator<IdxItem> for GroupsIdx {
fn from_iter<T: IntoIterator<Item = IdxItem>>(iter: T) -> Self {
let (first, all) = iter.into_iter().unzip();
GroupsIdx { first, all }
GroupsIdx {
sorted: false,
first,
all,
}
}
}

Expand Down Expand Up @@ -131,7 +141,11 @@ impl FromParallelIterator<IdxItem> for GroupsIdx {
I: IntoParallelIterator<Item = IdxItem>,
{
let (first, all) = par_iter.into_par_iter().unzip();
GroupsIdx { first, all }
GroupsIdx {
sorted: false,
first,
all,
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3043,7 +3043,7 @@ mod test {

#[test]
#[cfg_attr(miri, ignore)]
fn drop_duplicates() {
fn distinct() {
let df = df! {
"flt" => [1., 1., 2., 2., 3., 3.],
"int" => [1, 1, 2, 2, 3, 3, ],
Expand All @@ -3052,7 +3052,7 @@ mod test {
.unwrap();
dbg!(&df);
let df = df
.drop_duplicates(true, None)
.distinct_stable(None, DistinctKeepStrategy::First)
.unwrap()
.sort(["flt"], false)
.unwrap();
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::*;
use crate::chunked_array::comparison::*;
use crate::chunked_array::{
ops::{
Expand Down Expand Up @@ -107,6 +108,16 @@ impl private::PrivateSeries for SeriesWrap<BooleanChunked> {
}

impl SeriesTrait for SeriesWrap<BooleanChunked> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "interpolate")]
fn interpolate(&self) -> Series {
self.0.clone().into_series()
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/categorical.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::*;
use crate::chunked_array::comparison::*;
use crate::chunked_array::{
ops::{
Expand Down Expand Up @@ -111,6 +112,16 @@ impl private::PrivateSeries for SeriesWrap<CategoricalChunked> {
}

impl SeriesTrait for SeriesWrap<CategoricalChunked> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "interpolate")]
fn interpolate(&self) -> Series {
self.0.interpolate().into_series()
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/dates_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::SeriesWrap;
use super::*;
use crate::chunked_array::{
comparison::*,
ops::{explode::ExplodeByOffsets, ToBitRepr},
Expand Down Expand Up @@ -226,6 +227,16 @@ macro_rules! impl_dyn_series {
}

impl SeriesTrait for SeriesWrap<$ca> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "interpolate")]
fn interpolate(&self) -> Series {
self.0.interpolate().$into_logical().into_series()
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::SeriesWrap;
use super::*;
use crate::chunked_array::{
comparison::*, ops::explode::ExplodeByOffsets, AsSinglePtr, ChunkIdIter,
};
Expand Down Expand Up @@ -244,6 +245,16 @@ impl private::PrivateSeries for SeriesWrap<DatetimeChunked> {
}

impl SeriesTrait for SeriesWrap<DatetimeChunked> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "interpolate")]
fn interpolate(&self) -> Series {
self.0
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::SeriesWrap;
use super::*;
use crate::chunked_array::{
comparison::*, ops::explode::ExplodeByOffsets, AsSinglePtr, ChunkIdIter,
};
Expand Down Expand Up @@ -236,6 +237,16 @@ impl private::PrivateSeries for SeriesWrap<DurationChunked> {
}

impl SeriesTrait for SeriesWrap<DurationChunked> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "interpolate")]
fn interpolate(&self) -> Series {
self.0
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/floats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::SeriesWrap;
use super::*;
use crate::chunked_array::comparison::*;
#[cfg(feature = "rolling_window")]
use crate::chunked_array::ops::rolling_window::RollingOptions;
Expand Down Expand Up @@ -214,6 +215,16 @@ macro_rules! impl_dyn_series {
}

impl SeriesTrait for SeriesWrap<$ca> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "rolling_window")]
fn rolling_apply(
&self,
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::any::Any;
use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::*;
use crate::chunked_array::comparison::*;
#[cfg(feature = "rolling_window")]
use crate::chunked_array::ops::rolling_window::RollingOptions;
Expand Down Expand Up @@ -260,6 +261,16 @@ macro_rules! impl_dyn_series {
}

impl SeriesTrait for SeriesWrap<$ca> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "rolling_window")]
fn rolling_apply(
&self,
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-core/src/series/implementations/utf8.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::private;
use super::IntoSeries;
use super::SeriesTrait;
use super::*;
use crate::chunked_array::comparison::*;
use crate::chunked_array::{
ops::{
Expand Down Expand Up @@ -109,6 +110,16 @@ impl private::PrivateSeries for SeriesWrap<Utf8Chunked> {
}

impl SeriesTrait for SeriesWrap<Utf8Chunked> {
fn is_sorted(&self) -> IsSorted {
if self.0.is_sorted() {
IsSorted::Ascending
} else if self.0.is_sorted_reverse() {
IsSorted::Descending
} else {
IsSorted::Not
}
}

#[cfg(feature = "interpolate")]
fn interpolate(&self) -> Series {
self.0.clone().into_series()
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::Arc;

pub use series_trait::IsSorted;

/// # Series
/// The columnar data type for a DataFrame.
///
Expand Down
12 changes: 12 additions & 0 deletions polars/polars-core/src/series/series_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ use std::borrow::Cow;
use std::ops::Deref;
use std::sync::Arc;

#[derive(Debug, Copy, Clone)]
pub enum IsSorted {
Ascending,
Descending,
Not,
}

macro_rules! invalid_operation {
($s:expr) => {
Err(PolarsError::InvalidOperation(
Expand Down Expand Up @@ -254,6 +261,11 @@ pub(crate) mod private {
pub trait SeriesTrait:
Send + Sync + private::PrivateSeries + private::PrivateSeriesNumeric
{
/// Check if [`Series`] is sorted.
fn is_sorted(&self) -> IsSorted {
IsSorted::Not
}

#[cfg(feature = "interpolate")]
#[cfg_attr(docsrs, doc(cfg(feature = "interpolate")))]
fn interpolate(&self) -> Series;
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ impl PhysicalExpr for BinaryExpr {
(AggState::AggregatedFlat(s), AggState::NotAggregated(_) | AggState::Literal(_))
if s.len() != df.height() =>
{
dbg!("HIER");
// this is a flat series of len eq to group tuples
let l = ac_l.aggregated();
let l = l.as_ref();
Expand Down

0 comments on commit d27546e

Please sign in to comment.