Skip to content

Commit

Permalink
fix(python,rust!): Raise error when schema_overrides contains nonex…
Browse files Browse the repository at this point in the history
…istent column name (#15290)
  • Loading branch information
stinodego committed Mar 27, 2024
1 parent 9fca017 commit e829823
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 99 deletions.
14 changes: 9 additions & 5 deletions crates/polars-core/src/frame/row/dataframe.rs
Expand Up @@ -37,14 +37,18 @@ impl DataFrame {
});
}

/// Create a new [`DataFrame`] from rows. This should only be used when you have row wise data,
/// as this is a lot slower than creating the [`Series`] in a columnar fashion
/// Create a new [`DataFrame`] from rows.
///
/// This should only be used when you have row wise data, as this is a lot slower
/// than creating the [`Series`] in a columnar fashion
pub fn from_rows_and_schema(rows: &[Row], schema: &Schema) -> PolarsResult<Self> {
Self::from_rows_iter_and_schema(rows.iter(), schema)
}

/// Create a new [`DataFrame`] from an iterator over rows. This should only be used when you have row wise data,
/// as this is a lot slower than creating the [`Series`] in a columnar fashion
/// Create a new [`DataFrame`] from an iterator over rows.
///
/// This should only be used when you have row wise data, as this is a lot slower
/// than creating the [`Series`] in a columnar fashion.
pub fn from_rows_iter_and_schema<'a, I>(mut rows: I, schema: &Schema) -> PolarsResult<Self>
where
I: Iterator<Item = &'a Row<'a>>,
Expand Down Expand Up @@ -130,7 +134,7 @@ impl DataFrame {
/// Create a new [`DataFrame`] from rows. This should only be used when you have row wise data,
/// as this is a lot slower than creating the [`Series`] in a columnar fashion
pub fn from_rows(rows: &[Row]) -> PolarsResult<Self> {
let schema = rows_to_schema_first_non_null(rows, Some(50));
let schema = rows_to_schema_first_non_null(rows, Some(50))?;
let has_nulls = schema
.iter_dtypes()
.any(|dtype| matches!(dtype, DataType::Null));
Expand Down
45 changes: 26 additions & 19 deletions crates/polars-core/src/frame/row/mod.rs
Expand Up @@ -11,7 +11,7 @@ pub use av_buffer::*;
use rayon::prelude::*;

use crate::prelude::*;
use crate::utils::{dtypes_to_supertype, try_get_supertype};
use crate::utils::{dtypes_to_schema, dtypes_to_supertype, try_get_supertype};
use crate::POOL;

#[derive(Debug, Clone, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -83,40 +83,47 @@ pub fn coerce_data_type<A: Borrow<DataType>>(datatypes: &[A]) -> DataType {
try_get_supertype(lhs, rhs).unwrap_or(String)
}

/// Infer schema from rows and set the supertypes of the columns as column data type.
/// Infer the schema of rows by determining the supertype of the values.
///
/// Field names are set as `column_0`, `column_1`, and so on.
pub fn rows_to_schema_supertypes(
rows: &[Row],
infer_schema_length: Option<usize>,
) -> PolarsResult<Schema> {
let dtypes = rows_to_supertypes(rows, infer_schema_length)?;
let schema = dtypes_to_schema(dtypes);
Ok(schema)
}

/// Infer the schema data types of rows by determining the supertype of the values.
pub fn rows_to_supertypes(
rows: &[Row],
infer_schema_length: Option<usize>,
) -> PolarsResult<Vec<DataType>> {
polars_ensure!(!rows.is_empty(), NoData: "no rows, cannot infer schema");

// no of rows to use to infer dtype
let max_infer = infer_schema_length.unwrap_or(rows.len());
let mut dtypes: Vec<PlIndexSet<DataType>> = vec![PlIndexSet::new(); rows[0].0.len()];

let mut dtypes: Vec<PlIndexSet<DataType>> = vec![PlIndexSet::new(); rows[0].0.len()];
for row in rows.iter().take(max_infer) {
for (val, types_set) in row.0.iter().zip(dtypes.iter_mut()) {
types_set.insert(val.into());
for (val, dtypes_set) in row.0.iter().zip(dtypes.iter_mut()) {
dtypes_set.insert(val.into());
}
}

dtypes
.into_iter()
.enumerate()
.map(|(i, types_set)| {
let dtype = if types_set.is_empty() {
DataType::Unknown
} else {
dtypes_to_supertype(&types_set)?
};
Ok(Field::new(format!("column_{i}").as_ref(), dtype))
})
.collect::<PolarsResult<_>>()
.map(|dtypes_set| dtypes_to_supertype(&dtypes_set))
.collect()
}

/// Infer schema from rows and set the first no null type as column data type.
pub fn rows_to_schema_first_non_null(rows: &[Row], infer_schema_length: Option<usize>) -> Schema {
// no of rows to use to infer dtype
pub fn rows_to_schema_first_non_null(
rows: &[Row],
infer_schema_length: Option<usize>,
) -> PolarsResult<Schema> {
polars_ensure!(!rows.is_empty(), NoData: "no rows, cannot infer schema");

let max_infer = infer_schema_length.unwrap_or(rows.len());
let mut schema: Schema = (&rows[0]).into();

Expand Down Expand Up @@ -152,7 +159,7 @@ pub fn rows_to_schema_first_non_null(rows: &[Row], infer_schema_length: Option<u
}
}
}
schema
Ok(schema)
}

impl<'a> From<&AnyValue<'a>> for Field {
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-core/src/utils/mod.rs
Expand Up @@ -4,6 +4,7 @@ pub(crate) mod series;
mod supertype;
use std::borrow::Cow;
use std::ops::{Deref, DerefMut};
mod schema;

pub use any_value::*;
use arrow::bitmap::bitmask::BitMask;
Expand All @@ -13,6 +14,7 @@ pub use arrow::trusted_len::TrustMyLength;
use flatten::*;
use num_traits::{One, Zero};
use rayon::prelude::*;
pub use schema::*;
pub use series::*;
use smartstring::alias::String as SmartString;
pub use supertype::*;
Expand Down
17 changes: 17 additions & 0 deletions crates/polars-core/src/utils/schema.rs
@@ -0,0 +1,17 @@
use crate::prelude::*;

/// Convert a collection of [`DataType`] into a schema.
///
/// Field names are set as `column_0`, `column_1`, and so on.
///
/// [`DataType`]: crate::datatypes::DataType
pub fn dtypes_to_schema<I>(dtypes: I) -> Schema
where
I: IntoIterator<Item = DataType>,
{
dtypes
.into_iter()
.enumerate()
.map(|(i, dtype)| Field::new(format!("column_{i}").as_ref(), dtype))
.collect()
}
12 changes: 8 additions & 4 deletions py-polars/polars/_utils/construction/dataframe.py
Expand Up @@ -560,8 +560,8 @@ def _sequence_of_sequence_to_pydf(
else:
pydf = PyDataFrame.from_rows(
data,
local_schema_override or None,
infer_schema_length,
schema=local_schema_override or None,
infer_schema_length=infer_schema_length,
)
if column_names or schema_overrides:
pydf = _post_apply_columns(
Expand Down Expand Up @@ -777,7 +777,9 @@ def _sequence_of_dataclasses_to_pydf(
pydf = PyDataFrame.from_dicts(dicts, infer_schema_length=infer_schema_length)
else:
rows = [astuple(dc) for dc in data]
pydf = PyDataFrame.from_rows(rows, overrides or None, infer_schema_length)
pydf = PyDataFrame.from_rows(
rows, schema=overrides or None, infer_schema_length=infer_schema_length
)

if overrides:
structs = {c: tp for c, tp in overrides.items() if isinstance(tp, Struct)}
Expand Down Expand Up @@ -827,7 +829,9 @@ def _sequence_of_pydantic_models_to_pydf(
# 'from_rows' is the faster codepath for models with a lot of fields...
get_values = itemgetter(*model_fields)
rows = [get_values(md.__dict__) for md in data]
pydf = PyDataFrame.from_rows(rows, overrides, infer_schema_length)
pydf = PyDataFrame.from_rows(
rows, schema=overrides, infer_schema_length=infer_schema_length
)
else:
# ...and 'from_dicts' is faster otherwise
dicts = [md.__dict__ for md in data]
Expand Down
151 changes: 81 additions & 70 deletions py-polars/src/dataframe/construction.rs
@@ -1,4 +1,4 @@
use polars::frame::row::{rows_to_schema_supertypes, Row};
use polars::frame::row::{rows_to_schema_supertypes, rows_to_supertypes, Row};
use pyo3::prelude::*;

use super::*;
Expand All @@ -14,10 +14,9 @@ impl PyDataFrame {
schema: Option<Wrap<Schema>>,
infer_schema_length: Option<usize>,
) -> PyResult<Self> {
let rows = vec_extract_wrapped(data);
py.allow_threads(move || {
finish_from_rows(rows, schema.map(|wrap| wrap.0), None, infer_schema_length)
})
let data = vec_extract_wrapped(data);
let schema = schema.map(|wrap| wrap.0);
py.allow_threads(move || finish_from_rows(data, schema, None, infer_schema_length))
}

#[staticmethod]
Expand All @@ -29,41 +28,25 @@ impl PyDataFrame {
schema_overrides: Option<Wrap<Schema>>,
infer_schema_length: Option<usize>,
) -> PyResult<Self> {
let schema = schema.map(|wrap| wrap.0);
let schema_overrides = schema_overrides.map(|wrap| wrap.0);

// If given, read dict fields in schema order.
let mut schema_columns = PlIndexSet::new();
if let Some(s) = &schema {
schema_columns.extend(s.0.iter_names().map(|n| n.to_string()))
if let Some(ref s) = schema {
schema_columns.extend(s.iter_names().map(|n| n.to_string()))
}

let (rows, names) = dicts_to_rows(data, infer_schema_length, schema_columns)?;

py.allow_threads(move || {
let mut schema_overrides_by_idx: Vec<(usize, DataType)> = Vec::new();
if let Some(overrides) = schema_overrides {
for (idx, name) in names.iter().enumerate() {
if let Some(dtype) = overrides.0.get(name) {
schema_overrides_by_idx.push((idx, dtype.clone()));
}
}
}
let mut pydf = finish_from_rows(
rows,
schema.map(|wrap| wrap.0),
Some(schema_overrides_by_idx),
infer_schema_length,
)?;
unsafe {
for (s, name) in pydf.df.get_columns_mut().iter_mut().zip(&names) {
s.rename(name);
}
}
let length = names.len();
if names.into_iter().collect::<PlHashSet<_>>().len() != length {
let err = PolarsError::Duplicate("duplicate column names found".into());
Err(PyPolarsErr::Polars(err))?;
}
let schema = schema.or_else(|| {
Some(columns_names_to_empty_schema(
names.iter().map(String::as_str),
))
});

Ok(pydf)
py.allow_threads(move || {
finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
})
}

Expand All @@ -77,54 +60,82 @@ impl PyDataFrame {
fn finish_from_rows(
rows: Vec<Row>,
schema: Option<Schema>,
schema_overrides_by_idx: Option<Vec<(usize, DataType)>>,
schema_overrides: Option<Schema>,
infer_schema_length: Option<usize>,
) -> PyResult<PyDataFrame> {
/// Infer the schema from the row values
fn infer_schema(rows: &[Row], infer_schema_length: Option<usize>) -> PolarsResult<Schema> {
let mut schema =
rows_to_schema_supertypes(rows, infer_schema_length.map(|n| std::cmp::max(1, n)))?;

// Erase scale from inferred decimals.
for dtype in schema.iter_dtypes_mut() {
if let DataType::Decimal(_, _) = dtype {
*dtype = DataType::Decimal(None, None)
}
}
let mut schema = if let Some(mut schema) = schema {
resolve_schema_overrides(&mut schema, schema_overrides)?;
update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
schema
} else {
rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
};

Ok(schema)
}
// TODO: Remove this step when Decimals are supported properly.
// Erasing the decimal precision/scale here will just require us to infer it again later.
// https://github.com/pola-rs/polars/issues/14427
erase_decimal_precision_scale(&mut schema);

let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
Ok(df.into())
}

let mut final_schema = infer_schema(&rows, infer_schema_length).map_err(PyPolarsErr::from)?;
fn update_schema_from_rows(
schema: &mut Schema,
rows: &[Row],
infer_schema_length: Option<usize>,
) -> PyResult<()> {
let schema_is_complete = schema.iter_dtypes().all(|dtype| dtype.is_known());
if schema_is_complete {
return Ok(());
}

// Integrate explicit/inferred schema.
if let Some(schema) = schema {
for (i, (name, dtype)) in schema.into_iter().enumerate() {
if let Some((name_, dtype_)) = final_schema.get_at_index_mut(i) {
*name_ = name;
// TODO: Only infer dtypes for columns with an unknown dtype
let inferred_dtypes =
rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
let inferred_dtypes_slice = inferred_dtypes.as_slice();

for (i, dtype) in schema.iter_dtypes_mut().enumerate() {
if !dtype.is_known() {
*dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
})
.map_err(PyPolarsErr::from)?
.clone();
}
}
Ok(())
}

// If schema dtype is Unknown, overwrite with inferred datatype.
if !matches!(dtype, DataType::Unknown) {
*dtype_ = dtype;
}
} else {
final_schema.with_column(name, dtype);
}
/// Override the data type of certain schema fields.
fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) -> PyResult<()> {
if let Some(overrides) = schema_overrides {
for (name, dtype) in overrides.into_iter() {
schema.set_dtype(name.as_str(), dtype).ok_or_else(|| {
polars_err!(SchemaMismatch: "nonexistent column specified in `schema_overrides`: {name}")
}).map_err(PyPolarsErr::from)?;
}
}
Ok(())
}

// Optional per-field overrides; these supersede default/inferred dtypes.
if let Some(overrides) = schema_overrides_by_idx {
for (i, dtype) in overrides {
if let Some((_, dtype_)) = final_schema.get_at_index_mut(i) {
if !matches!(dtype, DataType::Unknown) {
*dtype_ = dtype;
}
}
/// Erase precision/scale information from Decimal types.
fn erase_decimal_precision_scale(schema: &mut Schema) {
for dtype in schema.iter_dtypes_mut() {
if let DataType::Decimal(_, _) = dtype {
*dtype = DataType::Decimal(None, None)
}
}
let df = DataFrame::from_rows_and_schema(&rows, &final_schema).map_err(PyPolarsErr::from)?;
Ok(df.into())
}

fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
where
I: IntoIterator<Item = &'a str>,
{
let fields = column_names
.into_iter()
.map(|c| Field::new(c, DataType::Unknown));
Schema::from_iter(fields)
}

fn dicts_to_rows(
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/map/dataframe.rs
Expand Up @@ -289,7 +289,7 @@ pub fn apply_lambda_with_rows_output<'a>(
buf.push(v?.clone());
}

let schema = rows_to_schema_first_non_null(&buf, Some(50));
let schema = rows_to_schema_first_non_null(&buf, Some(50))?;

if init_null_count > 0 {
// SAFETY: we know the iterators size
Expand Down
5 changes: 5 additions & 0 deletions py-polars/tests/unit/constructors/test_dataframe.py
Expand Up @@ -121,3 +121,8 @@ def test_df_init_from_series_strict() -> None:

assert df["a"].to_list() == [None, 0, 1]
assert df["a"].dtype == pl.UInt8


def test_df_init_rows_overrides_non_existing() -> None:
with pytest.raises(pl.SchemaError, match="nonexistent column"):
pl.DataFrame([{"a": 1, "b": 2}], schema_overrides={"c": pl.Int8})

0 comments on commit e829823

Please sign in to comment.