Skip to content

Commit

Permalink
asof join
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 7, 2021
1 parent e4f809e commit a82b128
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 5 deletions.
4 changes: 3 additions & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ checked_arithmetic = ["polars-core/checked_arithmetic"]
repeat_by = ["polars-core/repeat_by", "polars-lazy/repeat_by"]
is_first = ["polars-core/is_first", "polars-lazy/is_first"]
is_last = ["polars-core/is_last"]
asof_join = ["polars-core/asof_join", "polars-lazy/asof_join"]

# don't use this
private = []
Expand Down Expand Up @@ -131,7 +132,8 @@ docs-selection = [
"downsample",
"repeat_by",
"is_first",
"is_last"
"is_last",
"asof_join"
]

[dependencies]
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ checked_arithmetic = []
repeat_by = []
is_first = []
is_last = []
asof_join = []


# opt-in datatypes for Series
Expand Down Expand Up @@ -89,6 +90,7 @@ docs-selection = [
"repeat_by",
"is_first",
"is_last",
"asof_join"
]

[dependencies]
Expand Down
6 changes: 4 additions & 2 deletions polars/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ pub struct ChunkedArray<T> {
}

impl<T> ChunkedArray<T> {
pub fn is_sorted(&self) -> bool {
#[cfg(feature = "asof_join")]
pub(crate) fn is_sorted(&self) -> bool {
self.bit_settings & 1 != 0
}

pub fn is_sorted_reverse(&self) -> bool {
#[cfg(feature = "asof_join")]
pub(crate) fn is_sorted_reverse(&self) -> bool {
self.bit_settings & 1 << 1 != 0
}

Expand Down
185 changes: 185 additions & 0 deletions polars/polars-core/src/frame/asof_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
use crate::prelude::*;
use crate::utils::CustomIterTools;
use num::Bounded;
use polars_arrow::trusted_len::PushUnchecked;

pub(crate) trait JoinAsof<T: PolarsDataType> {
fn join_asof(&self, _other: &ChunkedArray<T>) -> Result<Vec<Option<u32>>> {
Err(PolarsError::InvalidOperation(
format!(
"asof join not implemented for key with dtype: {:?}",
T::get_dtype()
)
.into(),
))
}
}

impl<T> JoinAsof<T> for ChunkedArray<T>
where
T: PolarsNumericType,
T::Native: Bounded + PartialOrd,
{
fn join_asof(&self, other: &ChunkedArray<T>) -> Result<Vec<Option<u32>>> {
let mut rhs_iter = other.into_iter();
let mut tuples = Vec::with_capacity(self.len());
if self.null_count() > 0 {
return Err(PolarsError::Other(
"keys of asof join should not have null values".into(),
));
}
if !(other.is_sorted_reverse() | other.is_sorted()) {
eprintln!("right key of asof join is not explicitly sorted, this may lead to unexpected results");
}

let mut count = 0;
let mut rhs_idx = 0;

let mut previous_rhs_val: T::Native = Bounded::min_value();
let mut previous_lhs_val: T::Native = Bounded::min_value();

for arr in self.downcast_iter() {
for &lhs_val in arr.values().as_slice() {
if lhs_val < previous_lhs_val {
return Err(PolarsError::Other(
"left key of asof join must be sorted".into(),
));
}
previous_lhs_val = lhs_val;

loop {
match rhs_iter.next() {
Some(Some(rhs_val)) => {
if rhs_val > lhs_val {
if previous_rhs_val <= lhs_val && rhs_idx > 0 {
tuples.push(Some(rhs_idx - 1));
rhs_idx += 1;
previous_rhs_val = rhs_val;
break;
} else {
rhs_idx += 1;
previous_rhs_val = rhs_val;
tuples.push(None);
break;
}
}
previous_rhs_val = rhs_val;
rhs_idx += 1;
}
Some(None) => {
rhs_idx += 1;
}
// exhausted rhs
None => {
let mut remaining = self.len() - count;
if previous_rhs_val < lhs_val {
remaining -= 1;
tuples.push(Some(rhs_idx - 1));
}

let iter = std::iter::repeat(None)
.take(remaining)
.trust_my_length(remaining);
tuples.extend_trusted_len(iter);
return Ok(tuples);
}
}
}
count += 1;
}
}

Ok(tuples)
}
}

impl JoinAsof<BooleanType> for BooleanChunked {}
impl JoinAsof<Utf8Type> for Utf8Chunked {}
impl JoinAsof<ListType> for ListChunked {}
impl JoinAsof<CategoricalType> for CategoricalChunked {}

impl DataFrame {
/// This is similar to a left-join except that we match on nearest key rather than equal keys.
/// The keys must be sorted to perform an asof join
pub fn join_asof(&self, other: &DataFrame, left_on: &str, right_on: &str) -> Result<DataFrame> {
let left_key = self.column(left_on)?;
let right_key = other.column(right_on)?;

let take_idx = left_key.join_asof(right_key)?;
// Safety:
// join tuples are in bounds
let right_df = unsafe {
other.take_opt_iter_unchecked(
take_idx
.into_iter()
.map(|opt_idx| opt_idx.map(|idx| idx as usize)),
)
};

self.finish_join(self.clone(), right_df)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::df;

#[test]
fn test_join_asof() -> Result<()> {
let left = df![
"a" => [1, 5, 10],
"left_val" => ["a", "b", "c"]
]?;

let right = df![
"b" => [1, 2, 3, 6, 7],
"right_val" => [1, 2, 3, 6, 7]
]?;

let out = left.join_asof(&right, "a", "b")?;
let expected = df![
"a" => [1, 5, 10],
"left_val" => ["a", "b", "c"],
"b" => [1, 3, 7],
"right_val" => [1, 3, 7]
]?;
assert!(out.frame_equal_missing(&expected));

let left = df![
"a" => [2, 5, 10, 12],
"left_val" => ["a", "b", "c", "d"]
]?;

let right = df![
"b" => [1, 2, 3],
"right_val" => [1, 2, 3]
]?;
let out = left.join_asof(&right, "a", "b")?;
let expected = df![
"a" => [2, 5, 10, 12],
"left_val" => ["a", "b", "c", "d"],
"b" => [Some(2), Some(3), None, None],
"right_val" => [Some(2), Some(3), None, None]
]?;
assert!(out.frame_equal_missing(&expected));

let left = df![
"a" => [-10, 5, 10],
"left_val" => ["a", "b", "c"]
]?;

let right = df![
"b" => [1, 2, 3, 6, 7]
]?;

let out = left.join_asof(&right, "a", "b")?;
let expected = df![
"a" => [-10, 5, 10],
"left_val" => ["a", "b", "c"],
"b" => [None, Some(3), Some(7)]
]?;
assert!(out.frame_equal_missing(&expected));
Ok(())
}
}
16 changes: 15 additions & 1 deletion polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum JoinType {
Left,
Inner,
Outer,
#[cfg(feature = "asof_join")]
AsOf,
}

unsafe fn get_hash_tbl_threaded_join_partitioned<T, H>(
Expand Down Expand Up @@ -947,7 +949,11 @@ impl_zip_outer_join!(Utf8Chunked);

impl DataFrame {
/// Utility method to finish a join.
fn finish_join(&self, mut df_left: DataFrame, mut df_right: DataFrame) -> Result<DataFrame> {
pub(crate) fn finish_join(
&self,
mut df_left: DataFrame,
mut df_right: DataFrame,
) -> Result<DataFrame> {
let mut left_names = HashSet::with_capacity_and_hasher(df_left.width(), RandomState::new());

df_left.columns.iter().for_each(|series| {
Expand Down Expand Up @@ -1007,6 +1013,10 @@ impl DataFrame {
JoinType::Outer => {
self.outer_join(other, selected_left[0].name(), selected_right[0].name())
}
#[cfg(feature = "asof_join")]
JoinType::AsOf => {
self.join_asof(other, selected_left[0].name(), selected_right[0].name())
}
};
}

Expand Down Expand Up @@ -1096,6 +1106,10 @@ impl DataFrame {
}
self.finish_join(df_left, df_right)
}
#[cfg(feature = "asof_join")]
JoinType::AsOf => Err(PolarsError::ValueError(
"asof join not supported for join on multiple keys".into(),
)),
}
}

Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ use crate::utils::{
};

mod arithmetic;
#[cfg(feature = "asof_join")]
pub(crate) mod asof_join;
pub mod explode;
pub mod groupby;
pub mod hash_join;
#[cfg(feature = "rows")]
pub mod row;
pub mod select;
mod upstream_traits;

#[cfg(feature = "sort_multiple")]
use crate::prelude::sort::prepare_argsort;
use crate::POOL;
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-core/src/series/implementations/dates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ macro_rules! impl_dyn_series {
}

impl private::PrivateSeries for SeriesWrap<$ca> {
#[cfg(feature = "asof_join")]
fn join_asof(&self, other: &Series) -> Result<Vec<Option<u32>>> {
cast_and_apply!(self, join_asof, other)
}

fn set_sorted(&mut self, reverse: bool) {
self.0.set_sorted(reverse)
}
Expand Down
7 changes: 7 additions & 0 deletions polars/polars-core/src/series/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::chunked_array::{
AsSinglePtr, ChunkIdIter,
};
use crate::fmt::FmtList;
#[cfg(feature = "asof_join")]
use crate::frame::asof_join::JoinAsof;
#[cfg(feature = "pivot")]
use crate::frame::groupby::pivot::*;
use crate::frame::groupby::*;
Expand Down Expand Up @@ -59,6 +61,11 @@ macro_rules! impl_dyn_series {
}

impl private::PrivateSeries for SeriesWrap<$ca> {
#[cfg(feature = "asof_join")]
fn join_asof(&self, other: &Series) -> Result<Vec<Option<u32>>> {
self.0.join_asof(other.as_ref().as_ref())
}

fn set_sorted(&mut self, reverse: bool) {
self.0.set_sorted(reverse)
}
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub(crate) mod private {
}

pub trait PrivateSeries {
#[cfg(feature = "asof_join")]
fn join_asof(&self, _other: &Series) -> Result<Vec<Option<u32>>> {
unimplemented!()
}

fn set_sorted(&mut self, _reverse: bool) {
unimplemented!()
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ is_in = ["polars-core/is_in"]
repeat_by = ["polars-core/repeat_by"]
round_series = ["polars-core/round_series"]
is_first = ["polars-core/is_first"]
asof_join = ["polars-core/asof_join"]

# no guarantees whatsoever
private = []
Expand Down
1 change: 1 addition & 0 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
//! - `sort_multiple` - Allow sorting a `DataFrame` on multiple columns
//! - `rows` - Create `DataFrame` from rows and extract rows from `DataFrames`.
//! - `downsample` - [downsample operation](crate::frame::DataFrame::downsample) on `DataFrame`s
//! - `asof_join` - Join as of, to join on nearest keys instead of exact equality match.
//! * `Series` operations:
//! - `is_in` - [Check for membership in `Series`](crate::chunked_array::ops::IsIn)
//! - `zip_with` - [Zip two Series/ ChunkedArrays](crate::chunked_array::ops::ChunkZip)
Expand Down
3 changes: 2 additions & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ features = [
"rows",
"private",
"round_series",
"is_first"
"is_first",
"asof_join"
]

#[patch.crates-io]
Expand Down
5 changes: 5 additions & 0 deletions py-polars/polars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,7 @@ def join(
- "inner"
- "left"
- "outer"
- "asof"
Example
---
Expand Down Expand Up @@ -1418,6 +1419,10 @@ def join(
╰──────┴──────┴─────┴───────╯
```
# Asof joins
This is similar to a left-join except that we match on nearest key rather than equal keys.
The keys must be sorted to perform an asof join
Returns
-------
Joined DataFrame
Expand Down

0 comments on commit a82b128

Please sign in to comment.