Skip to content

Commit

Permalink
add upsample in grouped context
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 27, 2022
1 parent 81af2e8 commit db4a90c
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 70 deletions.
111 changes: 61 additions & 50 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,60 +1055,14 @@ impl DataFrame {
}
}

/// Generic join method. Can be used to join on multiple columns.
///
/// # Example
///
/// ```no_run
/// # use polars_core::prelude::*;
/// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
/// "Phosphorus (mg/100g)" => &[11, 22, 12])?;
/// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
/// "Potassium (mg/100g)" => &[107, 358, 115])?;
///
/// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinType::Inner, None)?;
/// assert_eq!(df3.shape(), (3, 3));
/// println!("{}", df3);
/// # Ok::<(), PolarsError>(())
/// ```
///
/// Output:
///
/// ```text
/// shape: (3, 3)
/// +--------+----------------------+---------------------+
/// | Fruit | Phosphorus (mg/100g) | Potassium (mg/100g) |
/// | --- | --- | --- |
/// | str | i32 | i32 |
/// +========+======================+=====================+
/// | Apple | 11 | 107 |
/// +--------+----------------------+---------------------+
/// | Banana | 22 | 358 |
/// +--------+----------------------+---------------------+
/// | Pear | 12 | 115 |
/// +--------+----------------------+---------------------+
/// ```
pub fn join<I, S>(
fn join_impl(
&self,
other: &DataFrame,
left_on: I,
right_on: I,
selected_left: Vec<Series>,
selected_right: Vec<Series>,
how: JoinType,
suffix: Option<String>,
) -> Result<DataFrame>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
return self.cross_join(other);
}

#[allow(unused_mut)]
let mut selected_left = self.select_series(left_on)?;
#[allow(unused_mut)]
let mut selected_right = other.select_series(right_on)?;
) -> Result<DataFrame> {
if selected_right.len() != selected_left.len() {
return Err(PolarsError::ValueError(
"the number of columns given as join key should be equal".into(),
Expand Down Expand Up @@ -1247,6 +1201,63 @@ impl DataFrame {
}
}

/// Generic join method. Can be used to join on multiple columns.
///
/// # Example
///
/// ```no_run
/// # use polars_core::prelude::*;
/// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
/// "Phosphorus (mg/100g)" => &[11, 22, 12])?;
/// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
/// "Potassium (mg/100g)" => &[107, 358, 115])?;
///
/// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinType::Inner, None)?;
/// assert_eq!(df3.shape(), (3, 3));
/// println!("{}", df3);
/// # Ok::<(), PolarsError>(())
/// ```
///
/// Output:
///
/// ```text
/// shape: (3, 3)
/// +--------+----------------------+---------------------+
/// | Fruit | Phosphorus (mg/100g) | Potassium (mg/100g) |
/// | --- | --- | --- |
/// | str | i32 | i32 |
/// +========+======================+=====================+
/// | Apple | 11 | 107 |
/// +--------+----------------------+---------------------+
/// | Banana | 22 | 358 |
/// +--------+----------------------+---------------------+
/// | Pear | 12 | 115 |
/// +--------+----------------------+---------------------+
/// ```
pub fn join<I, S>(
&self,
other: &DataFrame,
left_on: I,
right_on: I,
how: JoinType,
suffix: Option<String>,
) -> Result<DataFrame>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
return self.cross_join(other);
}

#[allow(unused_mut)]
let mut selected_left = self.select_series(left_on)?;
#[allow(unused_mut)]
let mut selected_right = other.select_series(right_on)?;
self.join_impl(other, selected_left, selected_right, how, suffix)
}

/// Perform an inner join on two DataFrames.
///
/// # Example
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod series;
pub mod testing;
#[cfg(test)]
mod tests;
#[cfg(feature = "temporal")]
#[cfg(all(feature = "temporal", feature = "dtype-datetime"))]
pub mod time;
pub(crate) mod vector_hasher;

Expand Down
4 changes: 4 additions & 0 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ impl Series {
Arc::get_mut(&mut self.0).expect("implementation error")
}

pub fn into_frame(self) -> DataFrame {
DataFrame::new_no_checks(vec![self])
}

/// Rename series.
pub fn rename(&mut self, name: &str) -> &mut Series {
self.get_inner_mut().rename(name);
Expand Down
151 changes: 151 additions & 0 deletions polars/polars-core/src/time.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::datatypes::Int64Chunked;
use crate::export::chrono::NaiveDateTime;
use crate::prelude::*;
use crate::prelude::{DatetimeChunked, TimeUnit};
use polars_time::export::chrono::Datelike;
pub use polars_time::*;
Expand All @@ -23,3 +24,153 @@ pub fn date_range(
)
.into_datetime(tu, None)
}

impl DataFrame {
/// Upsample a DataFrame at a regular frequency.
///
/// # Arguments
/// * `by` - First group by these columns and then upsample for every group
/// * `time_column` - Will be used to determine a date_range.
/// Note that this column has to be sorted for the output to make sense.
/// * `every` - interval will start 'every' duration
/// * `offset` - change the start of the date_range by this offset.
///
/// The `period` and `offset` arguments are created with
/// the following string language:
/// - 1ns (1 nanosecond)
/// - 1us (1 microsecond)
/// - 1ms (1 millisecond)
/// - 1s (1 second)
/// - 1m (1 minute)
/// - 1h (1 hour)
/// - 1d (1 day)
/// - 1w (1 week)
/// - 1mo (1 calendar month)
/// - 1y (1 calendar year)
/// - 1i (1 index count)
/// Or combine them:
/// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
pub fn upsample<I: IntoVec<String>>(
&self,
by: I,
time_column: &str,
every: Duration,
offset: Duration,
) -> Result<DataFrame> {
let by = by.into_vec();
self.upsample_impl(by, time_column, every, offset, false)
}

/// Upsample a DataFrame at a regular frequency.
///
/// # Arguments
/// * `by` - First group by these columns and then upsample for every group
/// * `time_column` - Will be used to determine a date_range.
/// Note that this column has to be sorted for the output to make sense.
/// * `every` - interval will start 'every' duration
/// * `offset` - change the start of the date_range by this offset.
///
/// The `period` and `offset` arguments are created with
/// the following string language:
/// - 1ns (1 nanosecond)
/// - 1us (1 microsecond)
/// - 1ms (1 millisecond)
/// - 1s (1 second)
/// - 1m (1 minute)
/// - 1h (1 hour)
/// - 1d (1 day)
/// - 1w (1 week)
/// - 1mo (1 calendar month)
/// - 1y (1 calendar year)
/// - 1i (1 index count)
/// Or combine them:
/// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
pub fn upsample_stable<I: IntoVec<String>>(
&self,
by: I,
time_column: &str,
every: Duration,
offset: Duration,
) -> Result<DataFrame> {
let by = by.into_vec();
self.upsample_impl(by, time_column, every, offset, true)
}

fn upsample_impl(
&self,
by: Vec<String>,
index_column: &str,
every: Duration,
offset: Duration,
stable: bool,
) -> Result<DataFrame> {
let s = self.column(index_column)?;
if matches!(s.dtype(), DataType::Date) {
let mut df = self.clone();
df.try_apply(index_column, |s| {
s.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
})
.unwrap();
let mut out = df
.upsample_impl(by, index_column, every, offset, stable)
.unwrap();
out.try_apply(index_column, |s| s.cast(&DataType::Date))
.unwrap();
Ok(out)
} else if by.is_empty() {
let index_column = self.column(index_column)?;
self.upsample_single_impl(index_column, every, offset)
} else {
let gb = if stable {
self.groupby_stable(by)
} else {
self.groupby(by)
};
gb?.par_apply(|df| df.upsample_impl(vec![], index_column, every, offset, false))
}
}

fn upsample_single_impl(
&self,
index_column: &Series,
every: Duration,
offset: Duration,
) -> Result<DataFrame> {
let index_col_name = index_column.name();

use DataType::*;
match index_column.dtype() {
Datetime(tu, _) => {
let s = index_column.cast(&DataType::Int64).unwrap();
let ca = s.i64().unwrap();
let first = ca.into_iter().flatten().next();
let last = ca.into_iter().flatten().next_back();
match (first, last) {
(Some(first), Some(last)) => {
let first = match tu {
TimeUnit::Milliseconds => offset.add_ms(first),
TimeUnit::Nanoseconds => offset.add_ns(first),
};
let range =
date_range(index_col_name, first, last, every, ClosedWindow::Both, *tu)
.into_series()
.into_frame();
range.join(
self,
&[index_col_name],
&[index_col_name],
JoinType::Left,
None,
)
}
_ => Err(PolarsError::ComputeError(
"Cannot determine upsample boundaries. All elements are null.".into(),
)),
}
}
dt => Err(PolarsError::ComputeError(
format!("upsample not allowed for index_column of dtype {:?}", dt).into(),
)),
}
}
}
3 changes: 2 additions & 1 deletion polars/polars-time/src/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ impl Bounds {
pub fn new_checked(start: i64, stop: i64) -> Self {
assert!(
start <= stop,
"boundary start must be smaller than stop; is your time column sorted in ascending order?"
"boundary start must be smaller than stop; is your time column sorted in ascending order?\
\nIf you did a groupby, note that null values are a separate group."
);
Self::new(start, stop)
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-time/src/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl Duration {
}
}

pub(crate) fn add_ms(&self, t: i64) -> i64 {
pub fn add_ms(&self, t: i64) -> i64 {
let d = self;
let mut new_t = t;

Expand Down

0 comments on commit db4a90c

Please sign in to comment.