Skip to content

Commit

Permalink
use duration for dt.buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 17, 2021
1 parent 74220df commit cba58fc
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 290 deletions.
159 changes: 11 additions & 148 deletions polars/polars-core/src/chunked_array/temporal/timedelta.rs
Original file line number Diff line number Diff line change
@@ -1,159 +1,22 @@
use arrow::temporal_conversions::NANOSECONDS;
use crate::prelude::*;
use polars_arrow::trusted_len::PushUnchecked;

#[derive(Debug, Default, Copy, Clone)]
pub struct TimeDelta {
days: i64,
seconds: u32,
microseconds: u32,
}

impl TimeDelta {
fn to_milliseconds(self) -> i64 {
let mut milliseconds = self.days * 3600 * 24 * 1000;
milliseconds += (self.seconds as i64) * 1000;
milliseconds += (self.microseconds as i64) / 1000;
milliseconds
}

fn to_days(self) -> i64 {
self.days + (self.seconds as i64 / (3600 * 24))
}
}

#[derive(Debug, Default, Copy, Clone)]
pub struct TimeDeltaBuilder {
days: i64,
seconds: u32,
microseconds: u32,
}

impl TimeDeltaBuilder {
pub fn new() -> Self {
Default::default()
}

pub fn days(mut self, days: i64) -> Self {
self.days += days;
self
}

pub fn seconds(mut self, seconds: u32) -> Self {
self.seconds += seconds;
self
}

pub fn microseconds(mut self, microseconds: u32) -> Self {
self.microseconds += microseconds;
self
}

pub fn milliseconds(mut self, milliseconds: u32) -> Self {
self.microseconds += milliseconds * 1000;
self
}

pub fn hours(mut self, hours: u32) -> Self {
self.seconds += hours * 3600;
self
}

pub fn weeks(mut self, weeks: i64) -> Self {
self.days += weeks * 7;
self
}

pub fn finish(self) -> TimeDelta {
TimeDelta {
days: self.days,
seconds: self.seconds,
microseconds: self.microseconds,
}
}
}
use polars_time::Window;

#[cfg(feature = "dtype-datetime")]
impl DatetimeChunked {
pub fn buckets(&self, resolution: TimeDelta) -> Self {
let ca = self.sort(false);

match ca.first_non_null() {
None => self.clone(),
Some(idx) => {
let arr = ca.downcast_iter().next().unwrap();
let ms = arr.values().as_slice();

let mut new_ms = AlignedVec::with_capacity(self.len());

// extend nulls
new_ms.extend_from_slice(&ms[..idx]);

let timedelta = resolution.to_milliseconds();
let mut current_lower = ms[idx];
let mut current_higher = current_lower + timedelta;

for &val in ms {
if val > current_higher {
current_lower = current_higher;
current_higher += timedelta;
}
// Safety:
// we preallocated
unsafe { new_ms.push_unchecked(current_lower) };
}
let arr = PrimitiveArray::from_data(
ArrowDataType::Int64,
new_ms.into(),
arr.validity().cloned(),
);
let mut ca =
Int64Chunked::new_from_chunks(self.name(), vec![Arc::new(arr)]).into_date();
ca.set_sorted(false);
ca
}
}
pub fn buckets(&self, every: Duration, offset: Duration) -> Self {
let w = Window::new(every, every, offset);
self.apply(|t| w.truncate(t)).into_date()
}
}

#[cfg(feature = "dtype-date")]
impl DateChunked {
pub fn buckets(&self, resolution: TimeDelta) -> Self {
let ca = self.sort(false);

match ca.first_non_null() {
None => self.clone(),
Some(idx) => {
let arr = ca.downcast_iter().next().unwrap();
let days = arr.values().as_slice();

let mut new_days = AlignedVec::with_capacity(self.len());

// extend nulls
new_days.extend_from_slice(&days[..idx]);

let timedelta = resolution.to_days() as i32;
let mut current_lower = days[idx];
let mut current_higher = current_lower + timedelta;

for &val in days {
if val > current_higher {
current_lower = current_higher;
current_higher += timedelta;
}
// Safety:
// we preallocated
unsafe { new_days.push_unchecked(current_lower) };
}
let arr = PrimitiveArray::from_data(
ArrowDataType::Int32,
new_days.into(),
arr.validity().cloned(),
);
let mut ca =
Int32Chunked::new_from_chunks(self.name(), vec![Arc::new(arr)]).into_date();
ca.set_sorted(false);
ca
}
}
pub fn buckets(&self, every: Duration, offset: Duration) -> Self {
let w = Window::new(every, every, offset);
self.apply(|t| {
const NSECS_IN_DAY: i64 = NANOSECONDS * SECONDS_IN_DAY;
(w.truncate( NSECS_IN_DAY * t as i64) / NSECS_IN_DAY) as i32
}).into_date()
}
}
4 changes: 2 additions & 2 deletions py-polars/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pre-commit:
$(PYTHON_BIN)/black .
$(PYTHON_BIN)/blackdoc .
$(PYTHON_BIN)/mypy
$(PYTHON_BIN) flake8
$(PYTHON_BIN)/flake8
make -C .. fmt_toml
$(PYTHON) -m cargo fmt --all
cargo fmt --all

test: venv
$(PYTHON_BIN)/maturin develop
Expand Down
83 changes: 25 additions & 58 deletions py-polars/polars/internals/expr.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
import typing as tp
from datetime import date, datetime, timedelta
from datetime import date, datetime
from typing import Any, Callable, Optional, Sequence, Type, Union

import numpy as np
Expand Down Expand Up @@ -2441,78 +2441,45 @@ class ExprDateTimeNameSpace:
def __init__(self, expr: Expr):
self._pyexpr = expr._pyexpr

def buckets(self, interval: timedelta) -> Expr:
def buckets(self, every: str, offset: Optional[str] = None) -> Expr:
"""
.. warning::
This API is experimental and will likely change.
Divide the date/ datetime range into buckets.
Data will be sorted by this operation.
The `every` and `offset` argument are created with the
the following string language:
1ns # 1 nanosecond
1us # 1 microsecond
1ms # 1 millisecond
1s # 1 second
1m # 1 minute
1h # 1 hour
1d # 1 day
1w # 1 week
1mo # 1 calendar month
1y # 1 calendar year
3d12h4m25s # 3 days, 12 hours, 4 minutes, and 25 seconds
Parameters
----------
interval
python timedelta to indicate bucket size
every
Every interval start and period length
offset
Offset the window
Returns
-------
Date/Datetime series
Examples
--------
>>> from datetime import datetime, timedelta
>>> date_range = pl.date_range(
... low=datetime(year=2000, month=10, day=1, hour=23, minute=30),
... high=datetime(year=2000, month=10, day=2, hour=0, minute=30),
... interval=timedelta(minutes=8),
... name="date_range",
... )
>>> date_range.dt.buckets(timedelta(minutes=8))
shape: (8,)
Series: 'date_range' [datetime]
[
2000-10-01 23:30:00
2000-10-01 23:30:00
2000-10-01 23:38:00
2000-10-01 23:46:00
2000-10-01 23:54:00
2000-10-02 00:02:00
2000-10-02 00:10:00
2000-10-02 00:18:00
]
Can be used to perform a downsample operation:
>>> (
... date_range.to_frame()
... .groupby(
... pl.col("date_range").dt.buckets(timedelta(minutes=16)),
... maintain_order=True,
... )
... .agg(pl.col("date_range").count())
... )
shape: (4, 2)
┌─────────────────────┬──────────────────┐
│ date_range ┆ date_range_count │
│ --- ┆ --- │
│ datetime ┆ u32 │
╞═════════════════════╪══════════════════╡
│ 2000-10-01 23:30:00 ┆ 3 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2000-10-01 23:46:00 ┆ 2 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2000-10-02 00:02:00 ┆ 2 │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2000-10-02 00:18:00 ┆ 1 │
└─────────────────────┴──────────────────┘
"""

return wrap_expr(
self._pyexpr.date_buckets(
interval.days, interval.seconds, interval.microseconds
)
)
if offset is None:
offset = "0ns"
return wrap_expr(self._pyexpr.date_buckets(every, offset))

def strftime(self, fmt: str) -> Expr:
"""
Expand Down

0 comments on commit cba58fc

Please sign in to comment.