Skip to content

Commit

Permalink
fix lazy schema names and proper test them
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 1, 2021
1 parent 173189b commit 8a74bb2
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 32 deletions.
2 changes: 2 additions & 0 deletions polars/polars-core/src/chunked_array/list/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl ListChunked {
})
.collect_trusted();

ca.rename(self.name());
if fast_explode {
ca.set_fast_explode();
}
Expand Down Expand Up @@ -151,6 +152,7 @@ impl ListChunked {
.transpose()
})
.collect::<Result<_>>()?;
ca.rename(self.name());
if fast_explode {
ca.set_fast_explode();
}
Expand Down
62 changes: 44 additions & 18 deletions polars/polars-lazy/src/logical_plan/aexpr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,21 @@ impl AExpr {
) -> Result<Field> {
use AExpr::*;
match self {
Window { function, .. } => arena.get(*function).to_field(schema, ctxt, arena),
Window { function, .. } => {
let e = arena.get(*function);

let field = e.to_field(schema, ctxt, arena);
match e {
Agg(_) => field,
_ => {
let field = field?;
Ok(Field::new(
field.name(),
DataType::List(Box::new(field.data_type().clone())),
))
}
}
}
IsUnique(expr) => {
let field = arena.get(*expr).to_field(schema, ctxt, arena)?;
Ok(Field::new(field.name(), DataType::Boolean))
Expand All @@ -137,7 +151,15 @@ impl AExpr {
Ok(Field::new(field.name(), DataType::Boolean))
}
Reverse(expr) => arena.get(*expr).to_field(schema, ctxt, arena),
Explode(expr) => arena.get(*expr).to_field(schema, ctxt, arena),
Explode(expr) => {
let field = arena.get(*expr).to_field(schema, ctxt, arena)?;

if let DataType::List(inner) = field.data_type() {
Ok(Field::new(field.name(), *inner.clone()))
} else {
Ok(field)
}
}
Alias(expr, name) => Ok(Field::new(
name,
arena.get(*expr).get_type(schema, ctxt, arena)?,
Expand All @@ -163,15 +185,10 @@ impl AExpr {
_ => get_supertype(&left_type, &right_type)?,
};

use Operator::*;
let out_field;
let out_name = match op {
Plus | Minus | Multiply | Divide | Modulus => {
out_field = arena.get(*left).to_field(schema, ctxt, arena)?;
out_field.name().as_str()
}
Eq | Lt | GtEq | LtEq => "",
_ => "binary_expr",
let out_name = {
out_field = arena.get(*left).to_field(schema, ctxt, arena)?;
out_field.name().as_str()
};

Ok(Field::new(out_name, expr_type))
Expand All @@ -196,11 +213,17 @@ impl AExpr {
ctxt,
GroupByMethod::Max,
),
Median(expr) => field_by_context(
arena.get(*expr).to_field(schema, ctxt, arena)?,
ctxt,
GroupByMethod::Median,
),
Median(expr) => {
let mut field = field_by_context(
arena.get(*expr).to_field(schema, ctxt, arena)?,
ctxt,
GroupByMethod::Median,
);
if field.data_type() != &DataType::Utf8 {
field.coerce(DataType::Float64);
}
field
}
Mean(expr) => {
let mut field = field_by_context(
arena.get(*expr).to_field(schema, ctxt, arena)?,
Expand Down Expand Up @@ -292,11 +315,14 @@ impl AExpr {
Ok(Field::new(field.name(), data_type.clone()))
}
Ternary { truthy, falsy, .. } => {
let truthy = arena.get(*truthy).to_field(schema, ctxt, arena)?;
let mut truthy = arena.get(*truthy).to_field(schema, ctxt, arena)?;
let falsy = arena.get(*falsy).to_field(schema, ctxt, arena)?;
if let DataType::Null = *truthy.data_type() {
let falsy = arena.get(*falsy).to_field(schema, ctxt, arena)?;
Ok(Field::new(truthy.name(), falsy.data_type().clone()))
truthy.coerce(falsy.data_type().clone());
Ok(truthy)
} else {
let st = get_supertype(truthy.data_type(), falsy.data_type())?;
truthy.coerce(st);
Ok(truthy)
}
}
Expand Down
23 changes: 15 additions & 8 deletions polars/polars-lazy/src/physical_plan/executors/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,28 @@ use polars_core::prelude::*;
/// Take an input Executor (creates the input DataFrame)
/// and a multiple PhysicalExpressions (create the output Series)
pub struct ProjectionExec {
input: Box<dyn Executor>,
expr: Vec<Arc<dyn PhysicalExpr>>,
}

impl ProjectionExec {
pub(crate) fn new(input: Box<dyn Executor>, expr: Vec<Arc<dyn PhysicalExpr>>) -> Self {
Self { input, expr }
}
pub(crate) input: Box<dyn Executor>,
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
#[cfg(test)]
pub(crate) schema: SchemaRef,
}

impl Executor for ProjectionExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;

let df = evaluate_physical_expressions(&df, &self.expr, state);

#[cfg(test)]
{
// TODO: check also the types.
df.as_ref().map(|df| {
for (l, r) in df.schema().fields().iter().zip(self.schema.fields()) {
assert_eq!(l.name(), r.name());
}
});
}

state.clear_expr_cache();
df
}
Expand Down
28 changes: 24 additions & 4 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,37 @@ impl DefaultPlanner {
cache,
)))
}
Projection { expr, input, .. } => {
Projection {
expr,
input,
schema: _schema,
..
} => {
let input = self.create_initial_physical_plan(input, lp_arena, expr_arena)?;
let phys_expr =
self.create_physical_expressions(&expr, Context::Default, expr_arena)?;
Ok(Box::new(ProjectionExec::new(input, phys_expr)))
Ok(Box::new(ProjectionExec {
input,
expr: phys_expr,
#[cfg(test)]
schema: _schema,
}))
}
LocalProjection { expr, input, .. } => {
LocalProjection {
expr,
input,
schema: _schema,
..
} => {
let input = self.create_initial_physical_plan(input, lp_arena, expr_arena)?;
let phys_expr =
self.create_physical_expressions(&expr, Context::Default, expr_arena)?;
Ok(Box::new(ProjectionExec::new(input, phys_expr)))
Ok(Box::new(ProjectionExec {
input,
expr: phys_expr,
#[cfg(test)]
schema: _schema,
}))
}
DataFrameScan {
df,
Expand Down
13 changes: 11 additions & 2 deletions polars/polars-lazy/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,8 @@ fn test_lazy_update_column() {
#[test]
fn test_lazy_fill_null() {
let df = df! {
"a" => &[None, Some(2)],
"b" => &[Some(1), None]
"a" => &[None, Some(2.0)],
"b" => &[Some(1.0), None]
}
.unwrap();
let out = df.lazy().fill_null(lit(10.0)).collect().unwrap();
Expand Down Expand Up @@ -1999,3 +1999,12 @@ pub fn test_select_by_dtypes() -> Result<()> {

Ok(())
}

#[test]
fn test_binary_expr() -> Result<()> {
// test panic in schema names
let df = fruits_cars();
let out = df.lazy().select([col("A").neq(lit(1))]).collect()?;

Ok(())
}

0 comments on commit 8a74bb2

Please sign in to comment.