Skip to content

Commit

Permalink
fix(rust, python): streaming hstack allow duplicates (#5538)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 17, 2022
1 parent ae9f969 commit f54da71
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 11 deletions.
13 changes: 13 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,19 @@ impl DataFrame {
Ok(())
}

pub fn _add_columns(&mut self, columns: Vec<Series>, schema: &Schema) -> PolarsResult<()> {
for (i, s) in columns.into_iter().enumerate() {
// we need to branch here
// because users can add multiple columns with the same name
if i == 0 || schema.get(s.name()).is_some() {
self.with_column_and_schema(s, schema)?;
} else {
self.with_column(s.clone())?;
}
}
Ok(())
}

/// Add a new column to this `DataFrame` or replace an existing one.
/// Uses an existing schema to amortize lookups.
/// If the schema is incorrect, we will fallback to linear search.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use polars_core::error::PolarsResult;
use polars_core::frame::DataFrame;
use polars_core::schema::SchemaRef;

use crate::expressions::PhysicalPipedExpr;
use crate::operators::{DataChunk, Operator, OperatorResult, PExecutionContext};
Expand Down Expand Up @@ -53,6 +54,7 @@ impl Operator for ProjectionOperator {
#[derive(Clone)]
pub(crate) struct HstackOperator {
pub(crate) exprs: Vec<Arc<dyn PhysicalPipedExpr>>,
pub(crate) input_schema: SchemaRef,
}

impl Operator for HstackOperator {
Expand All @@ -67,7 +69,9 @@ impl Operator for HstackOperator {
.map(|e| e.evaluate(chunk, context.execution_state.as_ref()))
.collect::<PolarsResult<Vec<_>>>()?;

let df = chunk.data.hstack(&projected)?;
let mut df = chunk.data.clone();
let schema = &*self.input_schema;
df._add_columns(projected, schema)?;

let chunk = chunk.with_data(df);
Ok(OperatorResult::Finished(chunk))
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ where
};
Box::new(op) as Box<dyn Operator>
}
HStack { exprs, .. } => {
HStack { exprs, schema, .. } => {
let op = operators::HstackOperator {
exprs: exprs_to_physical(exprs, expr_arena, &to_physical)?,
input_schema: schema.clone(),
};
Box::new(op) as Box<dyn Operator>
}
Expand Down
10 changes: 1 addition & 9 deletions polars/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,7 @@ impl StackExec {
state.clear_expr_cache();

let schema = &*self.input_schema;
for (i, s) in res.into_iter().enumerate() {
// we need to branch here
// because users can add multiple columns with the same name
if i == 0 || schema.get(s.name()).is_some() {
df.with_column_and_schema(s, schema)?;
} else {
df.with_column(s.clone())?;
}
}
df._add_columns(res, schema)?;

Ok(df)
}
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,13 @@ def test_unnest_columns_available() -> None:
"genre3": ["Fantasy", "Thriller", "Drama"],
"genre4": ["Sci-Fi", None, "Romance"],
}


def test_streaming_duplicate_cols_5537() -> None:
assert pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).lazy().with_columns(
[(pl.col("a") * 2).alias("foo"), (pl.col("a") * 3)]
).collect(allow_streaming=True).to_dict(False) == {
"a": [3, 6, 9],
"b": [1, 2, 3],
"foo": [2, 4, 6],
}

0 comments on commit f54da71

Please sign in to comment.