Skip to content

Commit

Permalink
python: recursive from_dicts (#3270)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 1, 2022
1 parent 308a1f4 commit 69a03e5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 35 deletions.
57 changes: 23 additions & 34 deletions polars/polars-core/src/frame/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::POOL;
use arrow::bitmap::Bitmap;
use rayon::prelude::*;
use std::borrow::Borrow;
use std::fmt::{Debug, Formatter};
use std::fmt::Debug;
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Row<'a>(pub Vec<AnyValue<'a>>);

Expand Down Expand Up @@ -80,7 +80,7 @@ impl DataFrame {

rows.try_for_each::<_, Result<()>>(|row| {
for (value, buf) in row.0.iter().zip(&mut buffers) {
buf.add_falible(value)?
buf.add_fallible(value)?
}
Ok(())
})?;
Expand Down Expand Up @@ -139,10 +139,15 @@ impl DataFrame {
})
.collect::<Vec<_>>();

let columns = self
.columns
.iter()
.map(|s| s.cast(dtype).unwrap())
.collect::<Vec<_>>();

// this is very expensive. A lot of cache misses here.
// This is the part that is performance critical.
self.columns.iter().for_each(|s| {
let s = s.cast(dtype).unwrap();
columns.iter().for_each(|s| {
s.iter().zip(buffers.iter_mut()).for_each(|(av, buf)| {
let _out = buf.add(av);
debug_assert!(_out.is_some());
Expand Down Expand Up @@ -294,7 +299,11 @@ impl<'a> From<&AnyValue<'a>> for DataType {
#[cfg(feature = "dtype-time")]
Time(_) => DataType::Time,
List(s) => DataType::List(Box::new(s.dtype().clone())),
_ => unimplemented!(),
#[cfg(feature = "dtype-struct")]
StructOwned(payload) => DataType::Struct(payload.1.to_vec()),
#[cfg(feature = "dtype-struct")]
Struct(_, fields) => DataType::Struct(fields.to_vec()),
av => panic!("{:?} not implemented", av),
}
}
}
Expand All @@ -310,7 +319,7 @@ impl From<&Row<'_>> for Schema {
}
}

pub(crate) enum AnyValueBuffer {
pub(crate) enum AnyValueBuffer<'a> {
Boolean(BooleanChunkedBuilder),
Int32(PrimitiveChunkedBuilder<Int32Type>),
Int64(PrimitiveChunkedBuilder<Int64Type>),
Expand All @@ -330,33 +339,11 @@ pub(crate) enum AnyValueBuffer {
Float64(PrimitiveChunkedBuilder<Float64Type>),
Utf8(Utf8ChunkedBuilder),
List(Box<dyn ListBuilderTrait>),
All(Vec<AnyValue<'a>>),
}

impl Debug for AnyValueBuffer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
use AnyValueBuffer::*;
match self {
Boolean(_) => f.write_str("boolean"),
Int32(_) => f.write_str("i32"),
Int64(_) => f.write_str("i64"),
UInt32(_) => f.write_str("u32"),
UInt64(_) => f.write_str("u64"),
#[cfg(feature = "dtype-date")]
Date(_) => f.write_str("Date"),
#[cfg(feature = "dtype-datetime")]
Datetime(_, _, _) => f.write_str("datetime"),
#[cfg(feature = "dtype-time")]
Time(_) => f.write_str("time"),
Float32(_) => f.write_str("f32"),
Float64(_) => f.write_str("f64"),
Utf8(_) => f.write_str("utf8"),
List(_) => f.write_str("list"),
}
}
}

impl AnyValueBuffer {
pub(crate) fn add(&mut self, val: AnyValue) -> Option<()> {
impl<'a> AnyValueBuffer<'a> {
pub(crate) fn add(&mut self, val: AnyValue<'a>) -> Option<()> {
use AnyValueBuffer::*;
match (self, val) {
(Boolean(builder), AnyValue::Boolean(v)) => builder.append_value(v),
Expand Down Expand Up @@ -389,12 +376,13 @@ impl AnyValueBuffer {
(Utf8(builder), AnyValue::Null) => builder.append_null(),
(List(builder), AnyValue::List(v)) => builder.append_series(&v),
(List(builder), AnyValue::Null) => builder.append_null(),
(All(vals), v) => vals.push(v),
_ => return None,
};
Some(())
}

pub(crate) fn add_falible(&mut self, val: &AnyValue) -> Result<()> {
pub(crate) fn add_fallible(&mut self, val: &AnyValue<'a>) -> Result<()> {
self.add(val.clone()).ok_or_else(|| {
PolarsError::ComputeError(format!("Could not append {:?} to builder; make sure that all rows have the same schema.", val).into())
})
Expand All @@ -418,12 +406,13 @@ impl AnyValueBuffer {
Float64(b) => b.finish().into_series(),
Utf8(b) => b.finish().into_series(),
List(mut b) => b.finish().into_series(),
All(vals) => Series::new("", vals),
}
}
}

// datatype and length
impl From<(&DataType, usize)> for AnyValueBuffer {
impl From<(&DataType, usize)> for AnyValueBuffer<'_> {
fn from(a: (&DataType, usize)) -> Self {
let (dt, len) = a;
use DataType::*;
Expand All @@ -445,7 +434,7 @@ impl From<(&DataType, usize)> for AnyValueBuffer {
Float64 => AnyValueBuffer::Float64(PrimitiveChunkedBuilder::new("", len)),
Utf8 => AnyValueBuffer::Utf8(Utf8ChunkedBuilder::new("", len, len * 5)),
List(inner) => AnyValueBuffer::List(get_list_builder(inner, len * 10, len, "")),
_ => unimplemented!(),
_ => AnyValueBuffer::All(Vec::with_capacity(len)),
}
}
}
Expand Down
25 changes: 24 additions & 1 deletion polars/polars-core/src/series/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,30 @@ impl Series {
AnyValue::Duration(_, tu) => any_values_to_primitive::<Int64Type>(av)
.into_duration(*tu)
.into_series(),
_ => todo!(),
#[cfg(feature = "dtype-struct")]
AnyValue::StructOwned(payload) => {
let vals = &payload.0;
let fields = &payload.1;

// the fields of the struct
let mut series_fields = Vec::with_capacity(vals.len());
for (i, field) in fields.iter().enumerate() {
let mut field_avs = Vec::with_capacity(av.len());

av.iter().for_each(|av| match av {
AnyValue::StructOwned(pl) => {
let av_val = pl.0[i].clone();
field_avs.push(av_val)
}
_ => field_avs.push(AnyValue::Null),
});
series_fields.push(Series::new(field.name(), &field_avs))
}
return StructChunked::new(name, &series_fields)
.unwrap()
.into_series();
}
av => panic!("av {:?} not implemented", av),
};
s.rename(name);
s
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/test_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,13 @@ def test_struct_with_validity() -> None:
df = pl.from_arrow(tbl)
assert isinstance(df, pl.DataFrame)
assert df["a"].to_list() == [{"b": 1}, {"b": None}]


def test_from_dicts_struct() -> None:
assert pl.from_dicts([{"a": 1, "b": {"a": 1, "b": 2}}]).to_series(1).to_list() == [
{"a": 1, "b": 2}
]

assert pl.from_dicts(
[{"a": 1, "b": {"a_deep": 1, "b_deep": {"a_deeper": [1, 2, 4]}}}]
).to_series(1).to_list() == [{"a_deep": 1, "b_deep": {"a_deeper": [1, 2, 4]}}]

0 comments on commit 69a03e5

Please sign in to comment.