Skip to content

Commit

Permalink
fix outer join schema (#3021)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 31, 2022
1 parent 12044c0 commit 42bd8a0
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 6 deletions.
10 changes: 7 additions & 3 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl DataFrame {
let opt_join_tuples = outer_join_multiple_keys(&left, &right, swap);

// Take the left and right dataframes by join tuples
let (mut df_left, df_right) = POOL.join(
let (df_left, df_right) = POOL.join(
|| unsafe {
remove_selected(self, &selected_left).take_opt_iter_unchecked(
opt_join_tuples
Expand All @@ -407,11 +407,15 @@ impl DataFrame {
)
},
);
// Allocate a new vec for df_left so that the keys are left and then other values.
let mut keys = Vec::with_capacity(selected_left.len() + df_left.width());
for (s_left, s_right) in selected_left.iter().zip(&selected_right) {
let mut s = s_left.zip_outer_join_column(s_right, &opt_join_tuples);
s.rename(s_left.name());
df_left.with_column(s)?;
keys.push(s)
}
keys.extend_from_slice(df_left.get_columns());
let df_left = DataFrame::new_no_checks(keys);
self.finish_join(df_left, df_right, suffix)
}
#[cfg(feature = "asof_join")]
Expand Down Expand Up @@ -1198,9 +1202,9 @@ mod test {
assert_eq!(
out.dtypes(),
&[
DataType::Utf8,
DataType::Float64,
DataType::Float64,
DataType::Utf8,
DataType::Utf8
]
);
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,9 +1046,10 @@ impl DataFrame {
}

fn add_column_by_schema(&mut self, s: Series, schema: &Schema) -> Result<()> {
if let Some((idx, name, _)) = schema.get_full(s.name()) {
let name = s.name();
if let Some((idx, _, _)) = schema.get_full(name) {
// schema is incorrect fallback to search
if name != s.name() {
if self.columns.get(idx).map(|s| s.name()) != Some(name) {
self.add_column_by_search(s)?;
} else {
self.replace_at_idx(idx, s)?;
Expand Down
46 changes: 46 additions & 0 deletions polars/tests/it/lazy/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,49 @@ fn test_swap_rename() -> Result<()> {
assert!(df.frame_equal(&expected));
Ok(())
}

#[test]
fn test_outer_join_with_column_2988() -> Result<()> {
let ldf1 = df![
"key1" => ["foo", "bar"],
"key2" => ["foo", "bar"],
"val1" => [3, 1]
]?
.lazy();

let ldf2 = df![
"key1" => ["bar", "baz"],
"key2" => ["bar", "baz"],
"val2" => [6, 8]
]?
.lazy();

let out = ldf1
.join(
ldf2,
[col("key1"), col("key2")],
[col("key1"), col("key2")],
JoinType::Outer,
)
.with_columns([col("key1")])
.collect()?;
assert_eq!(out.get_column_names(), &["key1", "key2", "val1", "val2"]);
assert_eq!(
Vec::from(out.column("key1")?.utf8()?),
&[Some("bar"), Some("baz"), Some("foo")]
);
assert_eq!(
Vec::from(out.column("key2")?.utf8()?),
&[Some("bar"), Some("baz"), Some("foo")]
);
assert_eq!(
Vec::from(out.column("val1")?.i32()?),
&[Some(1), None, Some(3)]
);
assert_eq!(
Vec::from(out.column("val2")?.i32()?),
&[Some(6), Some(8), None]
);

Ok(())
}
2 changes: 2 additions & 0 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,8 @@ def with_columns(self: LDF, exprs: Union[List["pli.Expr"], "pli.Expr"]) -> LDF:
pyexprs.append(e._pyexpr)
elif isinstance(e, pli.Series):
pyexprs.append(pli.lit(e)._pyexpr)
else:
raise ValueError(f"expected and expression, got {e}")

return self._from_pyldf(self._ldf.with_columns(pyexprs))

Expand Down
2 changes: 1 addition & 1 deletion py-polars/tests/test_categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_categorical_outer_join() -> None:
).lazy()

out = df1.join(df2, on=["key1", "key2"], how="outer").collect()
expected = pl.DataFrame({"val1": [1], "key1": [42], "key2": ["bar"], "val2": [2]})
expected = pl.DataFrame({"key1": [42], "key2": ["bar"], "val1": [1], "val2": [2]})

assert out.frame_equal(expected)
with pl.StringCache():
Expand Down

0 comments on commit 42bd8a0

Please sign in to comment.