Skip to content

Commit

Permalink
slice pushdown for cross joins (#4194)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 31, 2022
1 parent fad1c77 commit a1eff29
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 19 deletions.
58 changes: 47 additions & 11 deletions polars/polars-core/src/frame/cross_join.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,52 @@
use crate::prelude::*;
use crate::series::IsSorted;
use crate::utils::{concat_df_unchecked, CustomIterTools, NoNull};
use crate::utils::{concat_df_unchecked, slice_offsets, CustomIterTools, NoNull};
use crate::POOL;

fn slice_take(
total_rows: IdxSize,
n_rows_right: IdxSize,
slice: Option<(i64, usize)>,
inner: fn(IdxSize, IdxSize, IdxSize) -> IdxCa,
) -> IdxCa {
match slice {
None => inner(0, total_rows, n_rows_right),
Some((offset, len)) => {
let (offset, len) = slice_offsets(offset, len, total_rows as usize);
inner(offset as IdxSize, (len + offset) as IdxSize, n_rows_right)
}
}
}

fn take_left(total_rows: IdxSize, n_rows_right: IdxSize, slice: Option<(i64, usize)>) -> IdxCa {
fn inner(offset: IdxSize, total_rows: IdxSize, n_rows_right: IdxSize) -> IdxCa {
let mut take: NoNull<IdxCa> = (offset..total_rows)
.map(|i| i / n_rows_right)
.collect_trusted();
take.set_sorted2(IsSorted::Ascending);
take.into_inner()
}
slice_take(total_rows, n_rows_right, slice, inner)
}

fn take_right(total_rows: IdxSize, n_rows_right: IdxSize, slice: Option<(i64, usize)>) -> IdxCa {
fn inner(offset: IdxSize, total_rows: IdxSize, n_rows_right: IdxSize) -> IdxCa {
let take: NoNull<IdxCa> = (offset..total_rows)
.map(|i| i % n_rows_right)
.collect_trusted();
take.into_inner()
}
slice_take(total_rows, n_rows_right, slice, inner)
}

impl DataFrame {
/// Creates the cartesian product from both frames, preserves the order of the left keys.
pub fn cross_join(&self, other: &DataFrame, suffix: Option<String>) -> Result<DataFrame> {
pub(crate) fn cross_join(
&self,
other: &DataFrame,
suffix: Option<String>,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
let n_rows_left = self.height() as IdxSize;
let n_rows_right = other.height() as IdxSize;
let total_rows = n_rows_right * n_rows_left;
Expand All @@ -19,24 +60,19 @@ impl DataFrame {
// right take idx: 012301230123

let create_left_df = || {
let mut take_left: NoNull<IdxCa> =
(0..total_rows).map(|i| i / n_rows_right).collect_trusted();
take_left.set_sorted2(IsSorted::Ascending);
// Safety:
// take left is in bounds
unsafe { self.take_unchecked(&take_left.into_inner()) }
unsafe { self.take_unchecked(&take_left(total_rows, n_rows_right, slice)) }
};

let create_right_df = || {
// concatenation of dataframes is very expensive if we need to make the series mutable
// many times, these are atomic operations
// so we choose a different strategy at > 100 rows (arbitrarily small number)
if n_rows_left > 100 {
let take_right: NoNull<IdxCa> =
(0..total_rows).map(|i| i % n_rows_right).collect_trusted();
if n_rows_left > 100 || slice.is_some() {
// Safety:
// take right is in bounds
unsafe { other.take_unchecked(&take_right.into_inner()) }
unsafe { other.take_unchecked(&take_right(total_rows, n_rows_right, slice)) }
} else {
let iter = (0..n_rows_left).map(|_| other);
concat_df_unchecked(iter)
Expand Down Expand Up @@ -65,7 +101,7 @@ mod test {
"b" => ["a", "b", "c"]
]?;

let out = df_a.cross_join(&df_b, None)?;
let out = df_a.cross_join(&df_b, None, None)?;
let expected = df![
"a" => [1, 1, 1, 2, 2, 2],
"b" => ["foo", "foo", "foo", "spam", "spam", "spam"],
Expand Down
10 changes: 2 additions & 8 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,7 @@ impl DataFrame {
) -> Result<DataFrame> {
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
let out = self.cross_join(other, suffix)?;
return Ok(if let Some((offset, len)) = slice {
// todo! don't materialize whole frame before slicing.
out.slice(offset, len)
} else {
out
});
return self.cross_join(other, suffix, slice);
}

#[cfg(feature = "chunked_ids")]
Expand Down Expand Up @@ -620,7 +614,7 @@ impl DataFrame {
{
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
return self.cross_join(other, suffix);
return self.cross_join(other, suffix, None);
}
let selected_left = self.select_series(left_on)?;
let selected_right = other.select_series(right_on)?;
Expand Down
25 changes: 25 additions & 0 deletions py-polars/tests/db-benchmark/various.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,31 @@ def test_cross_join() -> None:
assert df2.join(df1, how="cross").slice(0, 100).frame_equal(out)


def test_cross_join_slice_pushdown() -> None:
# this will likely go out of memory if we did not pushdown the slice
df = pl.DataFrame(
[
pl.Series("x", pl.arange(0, 2**16 - 1, eager=True) % 2**15).cast(
pl.UInt16
)
]
)

assert df.lazy().join(df.lazy(), how="cross", suffix="_").slice(
-5, 10
).collect().to_dict(False) == {
"x": [32766, 32766, 32766, 32766, 32766],
"x_": [32762, 32763, 32764, 32765, 32766],
}

assert df.lazy().join(df.lazy(), how="cross", suffix="_").slice(
2, 10
).collect().to_dict(False) == {
"x": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"x_": [2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
}


if __name__ == "__main__":
test_windows_not_cached()
test_cross_join()

0 comments on commit a1eff29

Please sign in to comment.