Skip to content

Commit

Permalink
Lazy: Don't materialize whole table in JOIN followed by SLICE (#3136)
Browse files Browse the repository at this point in the history
* Joins: accept slice argument

* implement slice pushdown for joins
  • Loading branch information
ritchie46 committed Apr 13, 2022
1 parent 2b7e463 commit e51ba24
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 58 deletions.
43 changes: 37 additions & 6 deletions polars/polars-core/src/frame/asof_join/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,10 @@ where
}

impl DataFrame {
/// This is similar to a left-join except that we match on nearest key rather than equal keys.
/// The keys must be sorted to perform an asof join. This is a special implementation of an asof join
/// that searches for the nearest keys within a subgroup set by `by`.
#[cfg_attr(docsrs, doc(cfg(feature = "asof_join")))]
#[allow(clippy::too_many_arguments)]
pub fn join_asof_by<I, S>(
#[doc(hidden)]
pub fn _join_asof_by<I, S>(
&self,
other: &DataFrame,
left_on: &str,
Expand All @@ -460,6 +458,7 @@ impl DataFrame {
right_by: I,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
slice: Option<(i64, usize)>,
) -> Result<DataFrame>
where
I: IntoIterator<Item = S>,
Expand Down Expand Up @@ -575,17 +574,49 @@ impl DataFrame {
.collect();
let other = DataFrame::new_no_checks(cols);

let mut left = self.clone();
let mut right_join_tuples = &*right_join_tuples;

if let Some((offset, len)) = slice {
left = left.slice(offset, len);
right_join_tuples = slice_slice(right_join_tuples, offset, len);
}

// Safety:
// join tuples are in bounds
let right_df = unsafe {
other.take_opt_iter_unchecked(
right_join_tuples
.into_iter()
.iter()
.map(|opt_idx| opt_idx.map(|idx| idx as usize)),
)
};

self.finish_join(self.clone(), right_df, None)
self.finish_join(left, right_df, None)
}

/// This is similar to a left-join except that we match on nearest key rather than equal keys.
/// The keys must be sorted to perform an asof join. This is a special implementation of an asof join
/// that searches for the nearest keys within a subgroup set by `by`.
#[cfg_attr(docsrs, doc(cfg(feature = "asof_join")))]
#[allow(clippy::too_many_arguments)]
pub fn join_asof_by<I, S>(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
left_by: I,
right_by: I,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
) -> Result<DataFrame>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
self._join_asof_by(
other, left_on, right_on, left_by, right_by, strategy, tolerance, None,
)
}
}

Expand Down
36 changes: 30 additions & 6 deletions polars/polars-core/src/frame/asof_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod asof;
mod groups;

use crate::prelude::*;
use crate::utils::slice_slice;
use asof::*;
use num::Bounded;
use std::borrow::Cow;
Expand Down Expand Up @@ -92,17 +93,17 @@ where
}

impl DataFrame {
/// This is similar to a left-join except that we match on nearest key rather than equal keys.
/// The keys must be sorted to perform an asof join
#[cfg_attr(docsrs, doc(cfg(feature = "asof_join")))]
pub fn join_asof(
#[doc(hidden)]
#[allow(clippy::too_many_arguments)]
pub fn _join_asof(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<String>,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
let left_key = self.column(left_on)?;
let right_key = other.column(right_on)?;
Expand Down Expand Up @@ -150,16 +151,39 @@ impl DataFrame {
Cow::Borrowed(other)
};

let mut left = self.clone();
let mut take_idx = &*take_idx;

if let Some((offset, len)) = slice {
left = left.slice(offset, len);
take_idx = slice_slice(take_idx, offset, len);
}

// Safety:
// join tuples are in bounds
let right_df = unsafe {
other.take_opt_iter_unchecked(
take_idx
.into_iter()
.iter()
.map(|opt_idx| opt_idx.map(|idx| idx as usize)),
)
};

self.finish_join(self.clone(), right_df, suffix)
self.finish_join(left, right_df, suffix)
}

/// This is similar to a left-join except that we match on nearest key rather than equal keys.
/// The keys must be sorted to perform an asof join
#[cfg_attr(docsrs, doc(cfg(feature = "asof_join")))]
pub fn join_asof(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<String>,
) -> Result<DataFrame> {
self._join_asof(other, left_on, right_on, strategy, tolerance, suffix, None)
}
}
88 changes: 69 additions & 19 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::frame::hash_join::multiple_keys::{
inner_join_multiple_keys, left_join_multiple_keys, outer_join_multiple_keys,
};
use crate::prelude::*;
use crate::utils::{set_partition_size, split_ca};
use crate::utils::{set_partition_size, slice_slice, split_ca};
use crate::vector_hasher::{
create_hash_and_keys_threaded_vectorized, prepare_hashed_relation_threaded, this_partition,
AsU64, StrHash,
Expand Down Expand Up @@ -256,14 +256,27 @@ impl DataFrame {
}
}

fn join_impl(
#[doc(hidden)]
pub fn _join_impl(
&self,
other: &DataFrame,
selected_left: Vec<Series>,
selected_right: Vec<Series>,
how: JoinType,
suffix: Option<String>,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
let out = self.cross_join(other, suffix)?;
return Ok(if let Some((offset, len)) = slice {
// todo! don't materialize whole frame before slicing.
out.slice(offset, len)
} else {
out
});
}

if selected_right.len() != selected_left.len() {
return Err(PolarsError::ComputeError(
"the number of columns given as join key should be equal".into(),
Expand All @@ -287,31 +300,37 @@ impl DataFrame {
let s_left = self.column(selected_left[0].name())?;
let s_right = other.column(selected_right[0].name())?;
return match how {
JoinType::Inner => self.inner_join_from_series(other, s_left, s_right, suffix),
JoinType::Left => self.left_join_from_series(other, s_left, s_right, suffix),
JoinType::Outer => self.outer_join_from_series(other, s_left, s_right, suffix),
JoinType::Inner => {
self.inner_join_from_series(other, s_left, s_right, suffix, slice)
}
JoinType::Left => self.left_join_from_series(other, s_left, s_right, suffix, slice),
JoinType::Outer => {
self.outer_join_from_series(other, s_left, s_right, suffix, slice)
}
#[cfg(feature = "asof_join")]
JoinType::AsOf(options) => {
let left_on = selected_left[0].name();
let right_on = selected_right[0].name();

match (options.left_by, options.right_by) {
(Some(left_by), Some(right_by)) => self.join_asof_by(
(Some(left_by), Some(right_by)) => self._join_asof_by(
other,
left_on,
right_on,
left_by,
right_by,
options.strategy,
options.tolerance,
slice,
),
(None, None) => self.join_asof(
(None, None) => self._join_asof(
other,
left_on,
right_on,
options.strategy,
options.tolerance,
suffix,
slice,
),
_ => {
panic!("expected by arguments on both sides")
Expand Down Expand Up @@ -353,9 +372,14 @@ impl DataFrame {
let right = DataFrame::new_no_checks(selected_right_physical);
let (left, right, swap) = det_hash_prone_order!(left, right);
let join_tuples = inner_join_multiple_keys(&left, &right, swap);
let mut join_tuples = &*join_tuples;

if let Some((offset, len)) = slice {
join_tuples = slice_slice(join_tuples, offset, len);
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(&join_tuples, false),
|| self.create_left_df(join_tuples, false),
|| unsafe {
// remove join columns
remove_selected(other, &selected_right).take_iter_unchecked(
Expand All @@ -369,9 +393,14 @@ impl DataFrame {
let left = DataFrame::new_no_checks(selected_left_physical);
let right = DataFrame::new_no_checks(selected_right_physical);
let join_tuples = left_join_multiple_keys(&left, &right);
let mut join_tuples = &*join_tuples;

if let Some((offset, len)) = slice {
join_tuples = slice_slice(join_tuples, offset, len);
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(&join_tuples, true),
|| self.create_left_df(join_tuples, true),
|| unsafe {
// remove join columns
remove_selected(other, &selected_right).take_opt_iter_unchecked(
Expand All @@ -390,6 +419,12 @@ impl DataFrame {
let (left, right, swap) = det_hash_prone_order!(left, right);
let opt_join_tuples = outer_join_multiple_keys(&left, &right, swap);

let mut opt_join_tuples = &*opt_join_tuples;

if let Some((offset, len)) = slice {
opt_join_tuples = slice_slice(opt_join_tuples, offset, len);
}

// Take the left and right dataframes by join tuples
let (df_left, df_right) = POOL.join(
|| unsafe {
Expand All @@ -410,7 +445,7 @@ impl DataFrame {
// Allocate a new vec for df_left so that the keys are left and then other values.
let mut keys = Vec::with_capacity(selected_left.len() + df_left.width());
for (s_left, s_right) in selected_left.iter().zip(&selected_right) {
let mut s = s_left.zip_outer_join_column(s_right, &opt_join_tuples);
let mut s = s_left.zip_outer_join_column(s_right, opt_join_tuples);
s.rename(s_left.name());
keys.push(s)
}
Expand Down Expand Up @@ -477,12 +512,9 @@ impl DataFrame {
if let JoinType::Cross = how {
return self.cross_join(other, suffix);
}

#[allow(unused_mut)]
let mut selected_left = self.select_series(left_on)?;
#[allow(unused_mut)]
let mut selected_right = other.select_series(right_on)?;
self.join_impl(other, selected_left, selected_right, how, suffix)
let selected_left = self.select_series(left_on)?;
let selected_right = other.select_series(right_on)?;
self._join_impl(other, selected_left, selected_right, how, suffix, None)
}

/// Perform an inner join on two DataFrames.
Expand All @@ -509,14 +541,20 @@ impl DataFrame {
s_left: &Series,
s_right: &Series,
suffix: Option<String>,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
#[cfg(feature = "dtype-categorical")]
check_categorical_src(s_left.dtype(), s_right.dtype())?;

let join_tuples = s_left.hash_join_inner(s_right);
let mut join_tuples = &*join_tuples;

if let Some((offset, len)) = slice {
join_tuples = slice_slice(join_tuples, offset, len);
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(&join_tuples, false),
|| self.create_left_df(join_tuples, false),
|| unsafe {
other
.drop(s_right.name())
Expand Down Expand Up @@ -575,14 +613,20 @@ impl DataFrame {
s_left: &Series,
s_right: &Series,
suffix: Option<String>,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
#[cfg(feature = "dtype-categorical")]
check_categorical_src(s_left.dtype(), s_right.dtype())?;

let opt_join_tuples = s_left.hash_join_left(s_right);
let mut opt_join_tuples = &*opt_join_tuples;

if let Some((offset, len)) = slice {
opt_join_tuples = slice_slice(opt_join_tuples, offset, len);
}

let (df_left, df_right) = POOL.join(
|| self.create_left_df(&opt_join_tuples, true),
|| self.create_left_df(opt_join_tuples, true),
|| unsafe {
other.drop(s_right.name()).unwrap().take_opt_iter_unchecked(
opt_join_tuples
Expand Down Expand Up @@ -616,6 +660,7 @@ impl DataFrame {
s_left: &Series,
s_right: &Series,
suffix: Option<String>,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
#[cfg(feature = "dtype-categorical")]
check_categorical_src(s_left.dtype(), s_right.dtype())?;
Expand All @@ -625,6 +670,11 @@ impl DataFrame {

// Get the indexes of the joined relations
let opt_join_tuples = s_left.hash_join_outer(s_right);
let mut opt_join_tuples = &*opt_join_tuples;

if let Some((offset, len)) = slice {
opt_join_tuples = slice_slice(opt_join_tuples, offset, len);
}

// Take the left and right dataframes by join tuples
let (mut df_left, df_right) = POOL.join(
Expand All @@ -646,7 +696,7 @@ impl DataFrame {

let mut s = s_left
.to_physical_repr()
.zip_outer_join_column(&s_right.to_physical_repr(), &opt_join_tuples);
.zip_outer_join_column(&s_right.to_physical_repr(), opt_join_tuples);
s.rename(s_left.name());
let s = match s_left.dtype() {
#[cfg(feature = "dtype-categorical")]
Expand Down

0 comments on commit e51ba24

Please sign in to comment.