Skip to content

Commit

Permalink
add dt.buckets (#1859)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 22, 2021
1 parent 45de79c commit 922482f
Show file tree
Hide file tree
Showing 14 changed files with 373 additions and 28 deletions.
2 changes: 0 additions & 2 deletions polars/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,10 @@ pub struct ChunkedArray<T> {
}

impl<T> ChunkedArray<T> {
#[cfg(feature = "asof_join")]
pub(crate) fn is_sorted(&self) -> bool {
self.bit_settings & 1 != 0
}

#[cfg(feature = "asof_join")]
pub(crate) fn is_sorted_reverse(&self) -> bool {
self.bit_settings & 1 << 1 != 0
}
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/chunked_array/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ where
T::Native: Default,
{
fn sort_with(&self, options: SortOptions) -> ChunkedArray<T> {
if options.descending && self.is_sorted_reverse() || self.is_sorted() {
return self.clone();
}
if !self.has_validity() {
let mut vals = memcpy_values(self);
sort_branch(
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/chunked_array/temporal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod date;
mod datetime;
#[cfg(feature = "dtype-time")]
mod time;
pub mod timedelta;
mod utf8;

pub use self::conversion::*;
Expand Down
159 changes: 159 additions & 0 deletions polars/polars-core/src/chunked_array/temporal/timedelta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use crate::prelude::*;
use polars_arrow::trusted_len::PushUnchecked;

#[derive(Debug, Default, Copy, Clone)]
pub struct TimeDelta {
days: i64,
seconds: u32,
microseconds: u32,
}

impl TimeDelta {
fn to_milliseconds(self) -> i64 {
let mut milliseconds = self.days * 3600 * 24 * 1000;
milliseconds += (self.seconds as i64) * 1000;
milliseconds += (self.microseconds as i64) / 1000;
milliseconds
}

fn to_days(self) -> i64 {
self.days + (self.seconds as i64 / (3600 * 24))
}
}

#[derive(Debug, Default, Copy, Clone)]
pub struct TimeDeltaBuilder {
days: i64,
seconds: u32,
microseconds: u32,
}

impl TimeDeltaBuilder {
pub fn new() -> Self {
Default::default()
}

pub fn days(mut self, days: i64) -> Self {
self.days += days;
self
}

pub fn seconds(mut self, seconds: u32) -> Self {
self.seconds += seconds;
self
}

pub fn microseconds(mut self, microseconds: u32) -> Self {
self.microseconds += microseconds;
self
}

pub fn milliseconds(mut self, milliseconds: u32) -> Self {
self.microseconds += milliseconds * 1000;
self
}

pub fn hours(mut self, hours: u32) -> Self {
self.seconds += hours * 3600;
self
}

pub fn weeks(mut self, weeks: i64) -> Self {
self.days += weeks * 7;
self
}

pub fn finish(self) -> TimeDelta {
TimeDelta {
days: self.days,
seconds: self.seconds,
microseconds: self.microseconds,
}
}
}

#[cfg(feature = "dtype-datetime")]
impl DatetimeChunked {
pub fn buckets(&self, resolution: TimeDelta) -> Self {
let ca = self.sort(false);

match ca.first_non_null() {
None => self.clone(),
Some(idx) => {
let arr = ca.downcast_iter().next().unwrap();
let ms = arr.values().as_slice();

let mut new_ms = AlignedVec::with_capacity(self.len());

// extend nulls
new_ms.extend_from_slice(&ms[..idx]);

let timedelta = resolution.to_milliseconds();
let mut current_lower = ms[idx];
let mut current_higher = current_lower + timedelta;

for &val in ms {
if val > current_higher {
current_lower = current_higher;
current_higher += timedelta;
}
// Safety:
// we preallocated
unsafe { new_ms.push_unchecked(current_lower) };
}
let arr = PrimitiveArray::from_data(
ArrowDataType::Int64,
new_ms.into(),
arr.validity().cloned(),
);
let mut ca =
Int64Chunked::new_from_chunks(self.name(), vec![Arc::new(arr)]).into_date();
ca.set_sorted(false);
ca
}
}
}
}

#[cfg(feature = "dtype-date")]
impl DateChunked {
pub fn buckets(&self, resolution: TimeDelta) -> Self {
let ca = self.sort(false);

match ca.first_non_null() {
None => self.clone(),
Some(idx) => {
let arr = ca.downcast_iter().next().unwrap();
let days = arr.values().as_slice();

let mut new_days = AlignedVec::with_capacity(self.len());

// extend nulls
new_days.extend_from_slice(&days[..idx]);

let timedelta = resolution.to_days() as i32;
let mut current_lower = days[idx];
let mut current_higher = current_lower + timedelta;

for &val in days {
if val > current_higher {
current_lower = current_higher;
current_higher += timedelta;
}
// Safety:
// we preallocated
unsafe { new_days.push_unchecked(current_lower) };
}
let arr = PrimitiveArray::from_data(
ArrowDataType::Int32,
new_days.into(),
arr.validity().cloned(),
);
let mut ca =
Int32Chunked::new_from_chunks(self.name(), vec![Arc::new(arr)]).into_date();
ca.set_sorted(false);
ca
}
}
}
}
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ impl PhysicalAggregation for ApplyExpr {
// if not flat, the flattening sorts by group, so we must create new group tuples
// and again aggregate.
let out = self.function.call_udf(&mut [ac.flat().into_owned()]);
if ac.is_flat() {

if ac.is_not_aggregated() || !matches!(ac.series().dtype(), DataType::List(_)) {
out.map(Some)
} else {
// TODO! maybe just apply over list?
Expand Down
47 changes: 28 additions & 19 deletions polars/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use polars_io::PhysicalIoExpr;
use std::borrow::Cow;

pub(crate) enum AggState {
List(Series),
Flat(Series),
Aggregated(Series),
NotAggregated(Series),
None,
}

Expand Down Expand Up @@ -141,18 +141,18 @@ impl<'a> AggregationContext<'a> {

pub(crate) fn series(&self) -> &Series {
match &self.series {
AggState::List(s) => s,
AggState::Flat(s) => s,
AggState::Aggregated(s) => s,
AggState::NotAggregated(s) => s,
_ => unreachable!(),
}
}

pub(crate) fn is_flat(&self) -> bool {
matches!(&self.series, AggState::Flat(_))
pub(crate) fn is_not_aggregated(&self) -> bool {
matches!(&self.series, AggState::NotAggregated(_))
}

pub fn is_aggregated(&self) -> bool {
!self.is_flat()
!self.is_not_aggregated()
}

pub(crate) fn combine_groups(&mut self, other: AggregationContext) -> &mut Self {
Expand All @@ -170,9 +170,10 @@ impl<'a> AggregationContext<'a> {
groups: Cow<'a, GroupTuples>,
aggregated: bool,
) -> AggregationContext<'a> {
let series = match (aggregated, series.dtype()) {
(true, &DataType::List(_)) => AggState::List(series),
_ => AggState::Flat(series),
let series = if aggregated {
AggState::Aggregated(series)
} else {
AggState::NotAggregated(series)
};

Self {
Expand All @@ -199,8 +200,8 @@ impl<'a> AggregationContext<'a> {
/// the columns dtype)
pub(crate) fn with_series(&mut self, series: Series, aggregated: bool) -> &mut Self {
self.series = match (aggregated, series.dtype()) {
(true, &DataType::List(_)) => AggState::List(series),
_ => AggState::Flat(series),
(true, &DataType::List(_)) => AggState::Aggregated(series),
_ => AggState::NotAggregated(series),
};
self
}
Expand All @@ -218,11 +219,11 @@ impl<'a> AggregationContext<'a> {
// series we use the groups to aggregate the list
// because this is lazy, we first must to update the groups
// by calling .groups()
if let AggState::Flat(_) = self.series {
if let AggState::NotAggregated(_) = self.series {
self.groups();
}
match &self.series {
AggState::Flat(s) => {
AggState::NotAggregated(s) => {
let out = Cow::Owned(
s.agg_list(&self.groups)
.expect("should be able to aggregate this to list"),
Expand All @@ -234,23 +235,31 @@ impl<'a> AggregationContext<'a> {
};
out
}
AggState::List(s) => Cow::Borrowed(s),
AggState::Aggregated(s) => Cow::Borrowed(s),
AggState::None => unreachable!(),
}
}

pub(crate) fn flat(&self) -> Cow<'_, Series> {
match &self.series {
AggState::Flat(s) => Cow::Borrowed(s),
AggState::List(s) => Cow::Owned(s.explode().unwrap()),
AggState::NotAggregated(s) => Cow::Borrowed(s),
AggState::Aggregated(s) => {
// it is not always aggregated as list
// could for instance also be f64 by mean aggregation
if let DataType::List(_) = s.dtype() {
Cow::Owned(s.explode().unwrap())
} else {
Cow::Borrowed(s)
}
}
AggState::None => unreachable!(),
}
}

pub(crate) fn take(&mut self) -> Series {
match std::mem::take(&mut self.series) {
AggState::Flat(s) => s,
AggState::List(s) => s,
AggState::NotAggregated(s) => s,
AggState::Aggregated(s) => s,
AggState::None => panic!("implementation error"),
}
}
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/expression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ The following methods are available under the `expr.dt` attribute.
ExprDateTimeNameSpace.round
ExprDateTimeNameSpace.to_python_datetime
ExprDateTimeNameSpace.timestamp
ExprDateTimeNameSpace.buckets

Strings
-------
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ The following methods are available under the `Series.dt` attribute.
DateTimeNameSpace.median
DateTimeNameSpace.mean
DateTimeNameSpace.round
DateTimeNameSpace.buckets


Strings
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/internals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
from .frame import DataFrame, wrap_df
from .functions import concat, date_range # DataFrame.describe() & DataFrame.upsample()
from .lazy_frame import LazyFrame, wrap_ldf
from .lazy_functions import argsort_by, col, concat_list, lit
from .lazy_functions import argsort_by, col, concat_list, lit, select
from .series import Series, wrap_s
from .whenthen import when # used in expr.clip()

0 comments on commit 922482f

Please sign in to comment.