Skip to content

Commit

Permalink
cross join
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 14, 2021
1 parent 9941c0d commit 62c2903
Show file tree
Hide file tree
Showing 16 changed files with 160 additions and 15 deletions.
6 changes: 4 additions & 2 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ checked_arithmetic = ["polars-core/checked_arithmetic"]
repeat_by = ["polars-core/repeat_by", "polars-lazy/repeat_by"]
is_first = ["polars-core/is_first", "polars-lazy/is_first"]
is_last = ["polars-core/is_last"]
asof_join = ["polars-core/asof_join", "polars-lazy/asof_join"]
asof_join = ["polars-core/asof_join"]
cross_join = ["polars-core/cross_join", "polars-lazy/cross_join"]

# don't use this
private = []
Expand Down Expand Up @@ -133,7 +134,8 @@ docs-selection = [
"repeat_by",
"is_first",
"is_last",
"asof_join"
"asof_join",
"cross_join"
]

[dependencies]
Expand Down
4 changes: 3 additions & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ repeat_by = []
is_first = []
is_last = []
asof_join = []
cross_join = []


# opt-in datatypes for Series
Expand Down Expand Up @@ -90,7 +91,8 @@ docs-selection = [
"repeat_by",
"is_first",
"is_last",
"asof_join"
"asof_join",
"cross_join"
]

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/chunked_array/ops/unique.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ mod test {
}

#[test]
#[cfg(feature = "is_first")]
fn is_first() {
let ca = UInt32Chunked::new_from_opt_slice(
"a",
Expand Down
66 changes: 66 additions & 0 deletions polars/polars-core/src/frame/cross_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::prelude::*;
use crate::utils::{concat_df, CustomIterTools, NoNull};
use crate::POOL;

impl DataFrame {
/// Creates the cartesian product from both frames, preserves the order of the left keys.
pub fn cross_join(&self, other: &DataFrame) -> Result<DataFrame> {
let n_rows_left = self.height() as u32;
let n_rows_right = other.height() as u32;
let total_rows = n_rows_right * n_rows_left;

// the left side has the Nth row combined with every row from right.
// So let's say we have the following no. of rows
// left: 3
// right: .as_slice()4
//
// left take idx: 000011112222
// right take idx: 012301230123

let create_left_df = || {
let take_left: NoNull<UInt32Chunked> =
(0..total_rows).map(|i| i / n_rows_right).collect_trusted();
// Safety:
// take left is in bounds
unsafe { self.take_unchecked(&take_left.into_inner()) }
};

let create_right_df = || {
let iter = (0..n_rows_left).map(|_| other);
concat_df(iter).unwrap()
};

let (l_df, r_df) = POOL.install(|| rayon::join(create_left_df, create_right_df));

self.finish_join(l_df, r_df)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::df;

#[test]
fn test_cross_join() -> Result<()> {
let df_a = df![
"a" => [1, 2],
"b" => ["foo", "spam"]
]?;

let df_b = df![
"b" => ["a", "b", "c"]
]?;

let out = df_a.cross_join(&df_b)?;
let expected = df![
"a" => [1, 1, 1, 2, 2, 2],
"b" => ["foo", "foo", "foo", "spam", "spam", "spam"],
"b_right" => ["a", "b", "c", "a", "b", "c"]
]?;

assert!(out.frame_equal(&expected));

Ok(())
}
}
15 changes: 15 additions & 0 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub enum JoinType {
Outer,
#[cfg(feature = "asof_join")]
AsOf,
#[cfg(feature = "cross_join")]
Cross,
}

unsafe fn get_hash_tbl_threaded_join_partitioned<T, H>(
Expand Down Expand Up @@ -994,6 +996,11 @@ impl DataFrame {
right_on: S2,
how: JoinType,
) -> Result<DataFrame> {
#[cfg(feature = "cross_join")]
if let JoinType::Cross = how {
return self.cross_join(other);
}

let selected_left = self.select_series(left_on)?;
let selected_right = other.select_series(right_on)?;
assert_eq!(selected_right.len(), selected_left.len());
Expand All @@ -1017,6 +1024,10 @@ impl DataFrame {
JoinType::AsOf => {
self.join_asof(other, selected_left[0].name(), selected_right[0].name())
}
#[cfg(feature = "cross_join")]
JoinType::Cross => {
unreachable!()
}
};
}

Expand Down Expand Up @@ -1110,6 +1121,10 @@ impl DataFrame {
JoinType::AsOf => Err(PolarsError::ValueError(
"asof join not supported for join on multiple keys".into(),
)),
#[cfg(feature = "cross_join")]
JoinType::Cross => {
unreachable!()
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::utils::{
mod arithmetic;
#[cfg(feature = "asof_join")]
pub(crate) mod asof_join;
#[cfg(feature = "cross_join")]
pub(crate) mod cross_join;
pub mod explode;
pub mod groupby;
pub mod hash_join;
Expand Down
14 changes: 14 additions & 0 deletions polars/polars-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ fn _get_supertype(l: &DataType, r: &DataType) -> Option<DataType> {
}
}

/// This takes ownership of the DataFrame so that drop is called earlier.
pub fn accumulate_dataframes_vertical<I>(dfs: I) -> Result<DataFrame>
where
I: IntoIterator<Item = DataFrame>,
Expand All @@ -677,6 +678,19 @@ where
Ok(acc_df)
}

/// Concat the DataFrames to a single DataFrame.
pub fn concat_df<'a, I>(dfs: I) -> Result<DataFrame>
where
I: IntoIterator<Item = &'a DataFrame>,
{
let mut iter = dfs.into_iter();
let mut acc_df = iter.next().unwrap().clone();
for df in iter {
acc_df.vstack_mut(df)?;
}
Ok(acc_df)
}

pub fn accumulate_dataframes_horizontal(dfs: Vec<DataFrame>) -> Result<DataFrame> {
let mut iter = dfs.into_iter();
let mut acc_df = iter.next().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ is_in = ["polars-core/is_in"]
repeat_by = ["polars-core/repeat_by"]
round_series = ["polars-core/round_series"]
is_first = ["polars-core/is_first"]
asof_join = ["polars-core/asof_join"]
cross_join = ["polars-core/cross_join"]

# no guarantees whatsoever
private = []
Expand Down
24 changes: 24 additions & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,12 @@ impl LazyFrame {
self.join(other, vec![left_on], vec![right_on], JoinType::Inner)
}

/// Creates the cartesian product from both frames, preserves the order of the left keys.
#[cfg(feature = "cross_join")]
pub fn cross_join(self, other: LazyFrame) -> LazyFrame {
self.join(other, vec![], vec![], JoinType::Cross)
}

/// Generic join function that can join on multiple columns.
///
/// # Example
Expand Down Expand Up @@ -2343,4 +2349,22 @@ mod test {
assert_eq!(a.null_count(), 1);
Ok(())
}

#[cfg(feature = "cross_join")]
#[test]
fn test_cross_join() -> Result<()> {
let df1 = df![
"a" => ["a", "b", "a"],
"b" => [Some(1), None, None]
]?;

let df2 = df![
"a" => [1, 2],
"b" => [None, Some(12)]
]?;

let out = df1.lazy().cross_join(df2.lazy()).collect()?;
assert_eq!(out.shape(), (6, 4));
Ok(())
}
}
1 change: 1 addition & 0 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
//! - `rows` - Create `DataFrame` from rows and extract rows from `DataFrames`.
//! - `downsample` - [downsample operation](crate::frame::DataFrame::downsample) on `DataFrame`s
//! - `asof_join` - Join as of, to join on nearest keys instead of exact equality match.
//! - `cross_join` - Create the cartesian product of two DataFrames.
//! * `Series` operations:
//! - `is_in` - [Check for membership in `Series`](crate::chunked_array::ops::IsIn)
//! - `zip_with` - [Zip two Series/ ChunkedArrays](crate::chunked_array::ops::ChunkZip)
Expand Down
3 changes: 2 additions & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ features = [
"private",
"round_series",
"is_first",
"asof_join"
"asof_join",
"cross_join"
]

#[patch.crates-io]
Expand Down
8 changes: 5 additions & 3 deletions py-polars/polars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,7 @@ def join(
- "left"
- "outer"
- "asof"
- "cross"
Example
---
Expand Down Expand Up @@ -1442,6 +1443,9 @@ def join(
-------
Joined DataFrame
"""
if how == "cross":
return wrap_df(self._df.join(df._df, [], [], how))

if isinstance(left_on, str):
left_on = [left_on]
if isinstance(right_on, str):
Expand All @@ -1458,9 +1462,7 @@ def join(
if isinstance(left_on[0], pl.Expr) or isinstance(right_on[0], pl.Expr): # type: ignore
return self.lazy().join(df.lazy(), left_on, right_on, how=how)

out = self._df.join(df._df, left_on, right_on, how)

return wrap_df(out)
return wrap_df(self._df.join(df._df, left_on, right_on, how))

def apply(
self,
Expand Down
16 changes: 11 additions & 5 deletions py-polars/polars/lazy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ def join(
"inner"
"left"
"outer"
"join"
"asof",
"cross"
allow_parallel
Allow the physical plan to optionally evaluate the computation of both DataFrames up to the join in parallel.
Expand All @@ -499,6 +500,11 @@ def join(
The keys must be sorted to perform an asof join
"""
if how == "cross":
return wrap_ldf(
self._ldf.join(ldf._ldf, [], [], allow_parallel, force_parallel, how)
)

if isinstance(left_on, str):
left_on = [left_on]
if isinstance(right_on, str):
Expand All @@ -523,12 +529,12 @@ def join(
column = col(column)
new_right_on.append(column._pyexpr)

out = self._ldf.join(
ldf._ldf, new_left_on, new_right_on, allow_parallel, force_parallel, how
return wrap_ldf(
self._ldf.join(
ldf._ldf, new_left_on, new_right_on, allow_parallel, force_parallel, how
)
)

return wrap_ldf(out)

def with_columns(self, exprs: Union[tp.List["Expr"], "Expr"]) -> "LazyFrame":
"""
Add or overwrite multiple columns in a DataFrame.
Expand Down
1 change: 1 addition & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ impl PyDataFrame {
"inner" => JoinType::Inner,
"outer" => JoinType::Outer,
"asof" => JoinType::AsOf,
"cross" => JoinType::Cross,
_ => panic!("not supported"),
};

Expand Down
1 change: 1 addition & 0 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl PyLazyFrame {
"inner" => JoinType::Inner,
"outer" => JoinType::Outer,
"asof" => JoinType::AsOf,
"cross" => JoinType::Cross,
_ => panic!("not supported"),
};

Expand Down
11 changes: 9 additions & 2 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,17 +736,24 @@ def test_join_dates():
df.join(df, on="datetime")


def test_asof_join():
def test_asof_cross_join():
left = pl.DataFrame({"a": [-10, 5, 10], "left_val": ["a", "b", "c"]})
right = pl.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})

# only test dispatch
# only test dispatch of asof join
out = left.join(right, on="a", how="asof")
assert out.shape == (3, 4)

left.lazy().join(right.lazy(), on="a", how="asof").collect()
assert out.shape == (3, 4)

# only test dispatch of cross join
out = left.join(right, how="cross")
assert out.shape == (15, 4)

left.lazy().join(right.lazy(), how="cross").collect()
assert out.shape == (15, 4)


def test_str_concat():
df = pl.DataFrame(
Expand Down

0 comments on commit 62c2903

Please sign in to comment.