Skip to content

Commit

Permalink
Lazy: update schema in explode op (#3084)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 7, 2022
1 parent 9e3b929 commit 02fa05f
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 12 deletions.
8 changes: 6 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub enum ALogicalPlan {
Explode {
input: Node,
columns: Vec<String>,
schema: SchemaRef,
},
Cache {
input: Node,
Expand Down Expand Up @@ -145,7 +146,7 @@ impl ALogicalPlan {
Union { inputs, .. } => arena.get(inputs[0]).schema(arena),
Cache { input } => arena.get(*input).schema(arena),
Sort { input, .. } => arena.get(*input).schema(arena),
Explode { input, .. } => arena.get(*input).schema(arena),
Explode { schema, .. } => schema,
#[cfg(feature = "parquet")]
ParquetScan {
schema,
Expand Down Expand Up @@ -258,9 +259,12 @@ impl ALogicalPlan {
by_column: by_column.clone(),
args: args.clone(),
},
Explode { columns, .. } => Explode {
Explode {
columns, schema, ..
} => Explode {
input: inputs[0],
columns: columns.clone(),
schema: schema.clone(),
},
Cache { .. } => Cache { input: inputs[0] },
Distinct { options, .. } => Distinct {
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,19 @@ impl LogicalPlanBuilder {

pub fn explode(self, columns: Vec<Expr>) -> Self {
let columns = rewrite_projections(columns, self.0.schema(), &[]);

let mut schema = (**self.0.schema()).clone();

// columns to string
let columns = columns
.iter()
.map(|e| {
if let Expr::Column(name) = e {
if let Some(DataType::List(inner)) = schema.get(name) {
let inner = *inner.clone();
schema.with_column(name.to_string(), inner)
}

(**name).to_owned()
} else {
panic!("expected column expression")
Expand All @@ -386,6 +394,7 @@ impl LogicalPlanBuilder {
LogicalPlan::Explode {
input: Box::new(self.0),
columns,
schema: Arc::new(schema),
}
.into()
}
Expand Down
24 changes: 20 additions & 4 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,17 @@ pub(crate) fn to_alp(
args,
}
}
LogicalPlan::Explode { input, columns } => {
LogicalPlan::Explode {
input,
columns,
schema,
} => {
let input = to_alp(*input, expr_arena, lp_arena)?;
ALogicalPlan::Explode { input, columns }
ALogicalPlan::Explode {
input,
columns,
schema,
}
}
LogicalPlan::Cache { input } => {
let input = to_alp(*input, expr_arena, lp_arena)?;
Expand Down Expand Up @@ -739,9 +747,17 @@ pub(crate) fn node_to_lp(
args,
}
}
ALogicalPlan::Explode { input, columns } => {
ALogicalPlan::Explode {
input,
columns,
schema,
} => {
let input = Box::new(node_to_lp(input, expr_arena, lp_arena));
LogicalPlan::Explode { input, columns }
LogicalPlan::Explode {
input,
columns,
schema,
}
}
ALogicalPlan::Cache { input } => {
let input = Box::new(node_to_lp(input, expr_arena, lp_arena));
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub enum LogicalPlan {
Explode {
input: Box<LogicalPlan>,
columns: Vec<String>,
schema: SchemaRef,
},
/// Slice the table
Slice {
Expand Down Expand Up @@ -199,7 +200,7 @@ impl LogicalPlan {
Union { inputs, .. } => inputs[0].schema(),
Cache { input } => input.schema(),
Sort { input, .. } => input.schema(),
Explode { input, .. } => input.schema(),
Explode { schema, .. } => schema,
#[cfg(feature = "parquet")]
ParquetScan { schema, .. } => schema,
#[cfg(feature = "ipc")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,13 @@ impl PredicatePushDown {
};
Ok(lp)
}
Explode { input, columns } => {
Explode { input, columns, schema } => {
let condition = |name: Arc<str>| columns.iter().any(|s| s.as_str() == &*name);
let local_predicates =
transfer_to_local(expr_arena, &mut acc_predicates, condition);

self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
let lp = Explode { input, columns };
let lp = Explode { input, columns, schema };
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
Distinct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,11 @@ impl ProjectionPushDown {
args,
})
}
Explode { input, columns } => {
Explode {
input,
columns,
schema,
} => {
columns.iter().for_each(|name| {
add_str_to_accumulated(
name,
Expand All @@ -511,7 +515,11 @@ impl ProjectionPushDown {
lp_arena,
expr_arena,
)?;
Ok(Explode { input, columns })
Ok(Explode {
input,
columns,
schema,
})
}
Distinct { input, options } => {
// make sure that the set of unique columns is projected
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl DefaultPlanner {
args,
}))
}
Explode { input, columns } => {
Explode { input, columns, .. } => {
let input = self.create_physical_plan(input, lp_arena, expr_arena)?;
Ok(Box::new(ExplodeExec { input, columns }))
}
Expand Down
24 changes: 24 additions & 0 deletions polars/tests/it/lazy/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,27 @@ fn test_filter_sort_diff_2984() -> Result<()> {
assert_eq!(Vec::from(out.column("id")?.i32()?), &[Some(1), None]);
Ok(())
}

#[test]
fn test_filter_after_tail() -> Result<()> {
let df = df![
"a" => ["foo", "foo", "bar"],
"b" => [1, 2, 3]
]?;

let out = df
.lazy()
.groupby_stable([col("a")])
.tail(Some(1))
.filter(col("b").eq(lit(3)))
.with_predicate_pushdown(false)
.collect()?;

let expected = df![
"a" => ["bar"],
"b" => [3]
]?;
assert!(out.frame_equal(&expected));

Ok(())
}

0 comments on commit 02fa05f

Please sign in to comment.