Skip to content

Commit

Permalink
throw error on schema failure (#4178)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 29, 2022
1 parent bf92899 commit f5617af
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 56 deletions.
20 changes: 13 additions & 7 deletions polars/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,13 @@ impl LogicalPlan {
}
}
Projection { expr, input, .. } => {
let current_node = format!(
"π {}/{} [{:?}]",
expr.len(),
input.schema().len(),
(branch, id)
);
let schema = input.schema().map_err(|_| {
eprintln!("could not determine schema");
std::fmt::Error
})?;

let current_node =
format!("π {}/{} [{:?}]", expr.len(), schema.len(), (branch, id));
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
Expand All @@ -251,10 +252,15 @@ impl LogicalPlan {
input.dot(acc_str, (branch, id + 1), &current_node)
}
LocalProjection { expr, input, .. } => {
let schema = input.schema().map_err(|_| {
eprintln!("could not determine schema");
std::fmt::Error
})?;

let current_node = format!(
"LOCAL π {}/{} [{:?}]",
expr.len(),
input.schema().len(),
schema.len(),
(branch, id)
);
self.write_dot(acc_str, prev_node, &current_node, id)?;
Expand Down
13 changes: 8 additions & 5 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ pub type AllowedOptimizations = OptState;

impl LazyFrame {
/// Get a hold on the schema of the current LazyFrame computation.
pub fn schema(&self) -> SchemaRef {
pub fn schema(&self) -> Result<SchemaRef> {
let logical_plan = self.clone().get_plan_builder().build();
logical_plan.schema().into_owned()
logical_plan.schema().map(|schema| schema.into_owned())
}

pub(crate) fn get_plan_builder(self) -> LogicalPlanBuilder {
Expand Down Expand Up @@ -351,7 +351,9 @@ impl LazyFrame {
// schema after renaming
let mut new_schema = s.clone();
for (old, new) in existing2.iter().zip(new2.iter()) {
new_schema.rename(old, new.to_string()).unwrap();
new_schema
.rename(old, new.to_string())
.ok_or_else(|| PolarsError::NotFound(old.into()))?
}
Ok(Arc::new(new_schema))
};
Expand Down Expand Up @@ -460,7 +462,8 @@ impl LazyFrame {
.into_iter()
.map(|a| a.as_ref().to_string())
.collect::<Vec<_>>();
let schema = &*self.schema();
// todo! make delayed
let schema = &*self.schema().unwrap();
// a column gets swapped
if new.iter().any(|name| schema.get(name).is_some()) {
self.rename_impl_swapping(existing, new)
Expand Down Expand Up @@ -564,7 +567,7 @@ impl LazyFrame {

// during debug we check if the optimizations have not modified the final schema
#[cfg(debug_assertions)]
let prev_schema = logical_plan.schema().into_owned();
let prev_schema = logical_plan.schema()?.into_owned();

let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena)?;

Expand Down
34 changes: 19 additions & 15 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ impl LogicalPlanBuilder {
}

pub fn project(self, exprs: Vec<Expr>) -> Self {
let (exprs, schema) =
try_delayed!(prepare_projection(exprs, &self.0.schema()), &self.0, into);
let schema = try_delayed!(self.0.schema(), &self.0, into);
let (exprs, schema) = try_delayed!(prepare_projection(exprs, &schema), &self.0, into);

if exprs.is_empty() {
self.map(
Expand All @@ -250,8 +250,8 @@ impl LogicalPlanBuilder {
}

pub fn project_local(self, exprs: Vec<Expr>) -> Self {
let (exprs, schema) =
try_delayed!(prepare_projection(exprs, &self.0.schema()), &self.0, into);
let schema = try_delayed!(self.0.schema(), &self.0, into);
let (exprs, schema) = try_delayed!(prepare_projection(exprs, &schema), &self.0, into);
LogicalPlan::LocalProjection {
expr: exprs,
input: Box::new(self.0),
Expand All @@ -261,7 +261,7 @@ impl LogicalPlanBuilder {
}

pub fn fill_null(self, fill_value: Expr) -> Self {
let schema = self.0.schema();
let schema = try_delayed!(self.0.schema(), &self.0, into);
let exprs = schema
.iter_names()
.map(|name| {
Expand All @@ -275,7 +275,7 @@ impl LogicalPlanBuilder {
}

pub fn fill_nan(self, fill_value: Expr) -> Self {
let schema = self.0.schema();
let schema = try_delayed!(self.0.schema(), &self.0, into);

let exprs = schema
.iter()
Expand All @@ -291,7 +291,7 @@ impl LogicalPlanBuilder {

pub fn with_columns(self, exprs: Vec<Expr>) -> Self {
// current schema
let schema = self.0.schema();
let schema = try_delayed!(self.0.schema(), &self.0, into);
let mut new_schema = (**schema).clone();
let (exprs, _) = try_delayed!(prepare_projection(exprs, &schema), &self.0, into);

Expand All @@ -315,7 +315,8 @@ impl LogicalPlanBuilder {
Expr::Wildcard | Expr::RenameAlias { .. } | Expr::Columns(_) => true,
_ => false,
}) {
let rewritten = rewrite_projections(vec![predicate], &self.0.schema(), &[]);
let schema = try_delayed!(self.0.schema(), &self.0, into);
let rewritten = rewrite_projections(vec![predicate], &schema, &[]);
combine_predicates_expr(rewritten.into_iter())
} else {
predicate
Expand All @@ -336,7 +337,7 @@ impl LogicalPlanBuilder {
dynamic_options: Option<DynamicGroupOptions>,
rolling_options: Option<RollingGroupOptions>,
) -> Self {
let current_schema = self.0.schema();
let current_schema = try_delayed!(self.0.schema(), &self.0, into);
let current_schema = current_schema.as_ref();
let aggs = rewrite_projections(aggs.as_ref().to_vec(), current_schema, keys.as_ref());

Expand Down Expand Up @@ -403,7 +404,8 @@ impl LogicalPlanBuilder {
}

pub fn sort(self, by_column: Vec<Expr>, reverse: Vec<bool>, null_last: bool) -> Self {
let by_column = rewrite_projections(by_column, &self.0.schema(), &[]);
let schema = try_delayed!(self.0.schema(), &self.0, into);
let by_column = rewrite_projections(by_column, &schema, &[]);
LogicalPlan::Sort {
input: Box::new(self.0),
by_column,
Expand All @@ -417,9 +419,10 @@ impl LogicalPlanBuilder {
}

pub fn explode(self, columns: Vec<Expr>) -> Self {
let columns = rewrite_projections(columns, &self.0.schema(), &[]);
let schema = try_delayed!(self.0.schema(), &self.0, into);
let columns = rewrite_projections(columns, &schema, &[]);

let mut schema = (**self.0.schema()).clone();
let mut schema = (**schema).clone();

// columns to string
let columns = columns
Expand All @@ -446,7 +449,8 @@ impl LogicalPlanBuilder {
}

pub fn melt(self, args: Arc<MeltArgs>) -> Self {
let schema = det_melt_schema(&args, &self.0.schema());
let schema = try_delayed!(self.0.schema(), &self.0, into);
let schema = det_melt_schema(&args, &schema);
LogicalPlan::Melt {
input: Box::new(self.0),
args,
Expand Down Expand Up @@ -479,8 +483,8 @@ impl LogicalPlanBuilder {
right_on: Vec<Expr>,
options: JoinOptions,
) -> Self {
let schema_left = self.0.schema();
let schema_right = other.schema();
let schema_left = try_delayed!(self.0.schema(), &self.0, into);
let schema_right = try_delayed!(other.schema(), &self.0, into);

// column names of left table
let mut names: PlHashSet<&str> = PlHashSet::default();
Expand Down
38 changes: 19 additions & 19 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,37 +215,37 @@ impl LogicalPlan {
}

impl LogicalPlan {
pub(crate) fn schema(&self) -> Cow<'_, SchemaRef> {
pub(crate) fn schema(&self) -> Result<Cow<'_, SchemaRef>> {
use LogicalPlan::*;
match self {
#[cfg(feature = "python")]
PythonScan { options } => Cow::Borrowed(&options.schema),
PythonScan { options } => Ok(Cow::Borrowed(&options.schema)),
Union { inputs, .. } => inputs[0].schema(),
Cache { input } => input.schema(),
Sort { input, .. } => input.schema(),
Explode { schema, .. } => Cow::Borrowed(schema),
Explode { schema, .. } => Ok(Cow::Borrowed(schema)),
#[cfg(feature = "parquet")]
ParquetScan { schema, .. } => Cow::Borrowed(schema),
ParquetScan { schema, .. } => Ok(Cow::Borrowed(schema)),
#[cfg(feature = "ipc")]
IpcScan { schema, .. } => Cow::Borrowed(schema),
DataFrameScan { schema, .. } => Cow::Borrowed(schema),
AnonymousScan { schema, .. } => Cow::Borrowed(schema),
IpcScan { schema, .. } => Ok(Cow::Borrowed(schema)),
DataFrameScan { schema, .. } => Ok(Cow::Borrowed(schema)),
AnonymousScan { schema, .. } => Ok(Cow::Borrowed(schema)),
Selection { input, .. } => input.schema(),
#[cfg(feature = "csv-file")]
CsvScan { schema, .. } => Cow::Borrowed(schema),
Projection { schema, .. } => Cow::Borrowed(schema),
LocalProjection { schema, .. } => Cow::Borrowed(schema),
Aggregate { schema, .. } => Cow::Borrowed(schema),
Join { schema, .. } => Cow::Borrowed(schema),
HStack { schema, .. } => Cow::Borrowed(schema),
CsvScan { schema, .. } => Ok(Cow::Borrowed(schema)),
Projection { schema, .. } => Ok(Cow::Borrowed(schema)),
LocalProjection { schema, .. } => Ok(Cow::Borrowed(schema)),
Aggregate { schema, .. } => Ok(Cow::Borrowed(schema)),
Join { schema, .. } => Ok(Cow::Borrowed(schema)),
HStack { schema, .. } => Ok(Cow::Borrowed(schema)),
Distinct { input, .. } => input.schema(),
Slice { input, .. } => input.schema(),
Melt { schema, .. } => Cow::Borrowed(schema),
Melt { schema, .. } => Ok(Cow::Borrowed(schema)),
Udf { input, schema, .. } => {
let input_schema = input.schema();
let input_schema = input.schema()?;
match schema {
Some(schema) => Cow::Owned(schema.get_schema(&input_schema).unwrap()),
None => input_schema,
Some(schema) => schema.get_schema(&input_schema).map(Cow::Owned),
None => Ok(input_schema),
}
}
Error { input, .. } => input.schema(),
Expand Down Expand Up @@ -313,14 +313,14 @@ mod test {
.select(&[col("variety").alias("foo")])
.logical_plan;

assert!(lp.schema().get("foo").is_some());
assert!(lp.schema().unwrap().get("foo").is_some());

let lp = df
.lazy()
.groupby([col("variety")])
.agg([col("sepal.width").min()])
.logical_plan;
assert!(lp.schema().get("sepal.width").is_some());
assert!(lp.schema().unwrap().get("sepal.width").is_some());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ fn test_when_then_schema() -> Result<()> {
.then(Null {}.lit())
.otherwise(col("A"))])
.schema();
assert_ne!(schema.get_index(0).unwrap().1, &DataType::Null);
assert_ne!(schema?.get_index(0).unwrap().1, &DataType::Null);

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jemallocator = { version = "0.5", features = ["disable_initial_exec_tls"] }
ahash = "0.7"
bincode = "1.3"
# todo: unfix when compilation problem is solved
libc = "=0.2.125"
libc = "0.2"
ndarray = "0.15"
numpy = "0.16"
once_cell = "1"
Expand Down
23 changes: 15 additions & 8 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ pub struct PyLazyFrame {
pub ldf: LazyFrame,
}

impl PyLazyFrame {
fn get_schema(&self) -> PyResult<SchemaRef> {
let schema = self.ldf.schema().map_err(PyPolarsErr::from)?;
Ok(schema)
}
}

impl From<LazyFrame> for PyLazyFrame {
fn from(ldf: LazyFrame) -> Self {
PyLazyFrame { ldf }
Expand Down Expand Up @@ -741,28 +748,28 @@ impl PyLazyFrame {
self.ldf.clone().into()
}

pub fn columns(&self) -> Vec<String> {
self.ldf.schema().iter_names().cloned().collect()
pub fn columns(&self) -> PyResult<Vec<String>> {
Ok(self.get_schema()?.iter_names().cloned().collect())
}

pub fn dtypes(&self, py: Python) -> PyObject {
let schema = self.ldf.schema();
pub fn dtypes(&self, py: Python) -> PyResult<PyObject> {
let schema = self.get_schema()?;
let iter = schema
.iter_dtypes()
.map(|dt| Wrap(dt.clone()).to_object(py));
PyList::new(py, iter).to_object(py)
Ok(PyList::new(py, iter).to_object(py))
}

pub fn schema(&self, py: Python) -> PyObject {
let schema = self.ldf.schema();
pub fn schema(&self, py: Python) -> PyResult<PyObject> {
let schema = self.get_schema()?;
let schema_dict = PyDict::new(py);

schema.iter_fields().for_each(|fld| {
schema_dict
.set_item(fld.name(), Wrap(fld.data_type().clone()))
.unwrap()
});
schema_dict.to_object(py)
Ok(schema_dict.to_object(py))
}

pub fn unnest(&self, cols: Vec<String>) -> PyLazyFrame {
Expand Down
11 changes: 11 additions & 0 deletions py-polars/tests/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,14 @@ def test_projection_update_schema_missing_column() -> None:
.agg([pl.col("colB").sum().alias("result")])
.collect()
)


def test_not_found_on_rename() -> None:
df = pl.DataFrame({"exists": [1, 2, 3]})

with pytest.raises(pl.NotFoundError):
df.rename(
{
"does_not_exist": "exists",
}
)

0 comments on commit f5617af

Please sign in to comment.