Skip to content

Commit

Permalink
fix(rust, python): fix streaming empty join panic (#5534)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 17, 2022
1 parent 90da88e commit c59e9cb
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 2 deletions.
3 changes: 3 additions & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ ahash = "0.7"
criterion = "0.3"
rand = "0.8"

[build-dependencies]
version_check = "0.9.4"

# see: https://bheisler.github.io/criterion.rs/book/faq.html
[lib]
bench = false
Expand Down
7 changes: 7 additions & 0 deletions polars/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fn main() {
println!("cargo:rerun-if-changed=build.rs");
let channel = version_check::Channel::read().unwrap();
if channel.is_nightly() {
println!("cargo:rustc-cfg=feature=\"nightly\"");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ pub(super) fn compare_fn(
}

impl GenericBuild {
fn is_empty(&self) -> bool {
match self.chunks.len() {
0 => true,
1 => self.chunks[0].data.height() == 0,
_ => false,
}
}

#[inline]
fn number_of_keys(&self) -> usize {
self.join_columns_left.len()
Expand Down Expand Up @@ -175,7 +183,17 @@ impl GenericBuild {

impl Sink for GenericBuild {
fn sink(&mut self, context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult> {
// we do some juggling here so that we don't
// end up with empty chunks
// But we always want one empty chunk if all is empty as we need
// to finish the join
if self.chunks.len() == 1 && self.chunks[0].data.height() == 0 {
self.chunks.pop().unwrap();
}
if chunk.is_empty() {
if self.chunks.is_empty() {
self.chunks.push(chunk)
}
return Ok(SinkResult::CanHaveMoreInput);
}
let mut hashes = std::mem::take(&mut self.hashes);
Expand Down Expand Up @@ -225,7 +243,15 @@ impl Sink for GenericBuild {
}

fn combine(&mut self, mut other: Box<dyn Sink>) {
if self.is_empty() {
let other = other.as_any().downcast_mut::<Self>().unwrap();
std::mem::swap(self, other);
return;
}
let other = other.as_any().downcast_ref::<Self>().unwrap();
if other.is_empty() {
return;
}
let mut tuple_buf = Vec::with_capacity(self.number_of_keys());

let chunks_offset = self.chunks.len() as IdxSize;
Expand Down Expand Up @@ -300,7 +326,9 @@ impl Sink for GenericBuild {
.map(|chunk| chunk.data),
);
if let Ok(n_chunks) = left_df.n_chunks() {
assert_eq!(n_chunks, chunks_len);
if left_df.height() > 0 {
assert_eq!(n_chunks, chunks_len);
}
}
let materialized_join_cols =
Arc::new(std::mem::take(&mut self.materialized_join_cols));
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion py-polars/tests/unit/test_predicates.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@ def test_streaming_empty_df() -> None:

assert df.lazy().join(df.lazy(), on="a", how="inner").filter(
2 == 1 # noqa: SIM300
).collect().to_dict(False) == {"a": [], "b": [], "b_right": []}
).collect(allow_streaming=True).to_dict(False) == {"a": [], "b": [], "b_right": []}

0 comments on commit c59e9cb

Please sign in to comment.