Skip to content

Commit

Permalink
improve structs (#2836)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 6, 2022
1 parent 5d342a6 commit 7e85a3d
Show file tree
Hide file tree
Showing 23 changed files with 237 additions and 39 deletions.
2 changes: 1 addition & 1 deletion polars/benches/groupby.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lazy_static::lazy_static;
use polars::prelude::*;
use polars_lazy::functions::pearson_corr;
use polars_lazy::dsl::functions::pearson_corr;

lazy_static! {
static ref DATA: DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ impl From<StructChunked> for DataFrame {

impl DataFrame {
pub fn into_struct(self, name: &str) -> StructChunked {
StructChunked::new(name, self.columns)
StructChunked::new(name, &self.columns).unwrap()
}
}
55 changes: 46 additions & 9 deletions polars/polars-core/src/chunked_array/logical/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,53 @@ use super::*;
pub struct StructChunked {
fields: Vec<Series>,
field: Field,
// needed by iterators
arrow_array: ArrayRef,
}

fn fields_to_struct_array(fields: &[Series]) -> (ArrayRef, Vec<Series>) {
let fields = fields.iter().map(|s| s.rechunk()).collect::<Vec<_>>();

let new_fields = fields.iter().map(|s| s.field().to_arrow()).collect();
let field_arrays = fields
.iter()
.map(|s| s.chunks()[0].clone() as ArrayRef)
.collect::<Vec<_>>();
let arr = StructArray::new(ArrowDataType::Struct(new_fields), field_arrays, None);
(Arc::new(arr), fields)
}

impl StructChunked {
pub fn new(name: &str, fields: Vec<Series>) -> Self {
pub fn new(name: &str, fields: &[Series]) -> Result<Self> {
if !fields.iter().map(|s| s.len()).all_equal() {
Err(PolarsError::ShapeMisMatch(
"expected all fields to have equal length".into(),
))
} else {
Ok(Self::new_unchecked(name, fields))
}
}

pub(crate) fn arrow_array(&self) -> &ArrayRef {
&self.arrow_array
}

/// Does not check the lengths of the fields
pub(crate) fn new_unchecked(name: &str, fields: &[Series]) -> Self {
let dtype = DataType::Struct(
fields
.iter()
.map(|s| Field::new(s.name(), s.dtype().clone()))
.collect(),
);
let field = Field::new(name, dtype);
let (arrow_array, fields) = fields_to_struct_array(fields);

Self { fields, field }
Self {
fields,
field,
arrow_array,
}
}

/// Get access to one of this `[StructChunked]`'s fields
Expand All @@ -37,9 +71,12 @@ impl StructChunked {
.map(|s| s.clone())
}

pub(crate) fn len(&self) -> usize {
pub fn len(&self) -> usize {
self.fields.get(0).map(|s| s.len()).unwrap_or(0)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Get a reference to the [`Field`] of array.
pub fn ref_field(&self) -> &Field {
Expand All @@ -66,16 +103,16 @@ impl StructChunked {
where
F: Fn(&Series) -> Result<Series>,
{
let fields = self.fields.iter().map(func).collect::<Result<_>>()?;
Ok(Self::new(self.field.name(), fields))
let fields = self.fields.iter().map(func).collect::<Result<Vec<_>>>()?;
Ok(Self::new_unchecked(self.field.name(), &fields))
}

pub(crate) fn apply_fields<F>(&self, func: F) -> Self
where
F: Fn(&Series) -> Series,
{
let fields = self.fields.iter().map(func).collect();
Self::new(self.field.name(), fields)
let fields = self.fields.iter().map(func).collect::<Vec<_>>();
Self::new_unchecked(self.field.name(), &fields)
}
}

Expand All @@ -95,7 +132,7 @@ impl LogicalType for StructChunked {
.fields
.iter()
.map(|s| s.cast(dtype))
.collect::<Result<_>>()?;
Ok(Self::new(self.field.name(), fields).into_series())
.collect::<Result<Vec<_>>>()?;
Ok(Self::new_unchecked(self.field.name(), &fields).into_series())
}
}
11 changes: 11 additions & 0 deletions polars/polars-core/src/chunked_array/ops/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ pub(crate) unsafe fn arr_to_any_value<'a>(

AnyValue::List(s)
}
#[cfg(feature = "dtype-struct")]
DataType::Struct(flds) => {
let arr = &*(arr as *const dyn Array as *const StructArray);
let vals = arr
.values()
.iter()
.zip(flds)
.map(|(arr, fld)| arr_to_any_value(&**arr, idx, fld.data_type()))
.collect();
AnyValue::Struct(vals)
}
#[cfg(feature = "object")]
DataType::Object(_) => panic!("should not be here"),
_ => unimplemented!(),
Expand Down
30 changes: 30 additions & 0 deletions polars/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use arrow::temporal_conversions::MILLISECONDS;
#[cfg(feature = "dtype-time")]
use arrow::temporal_conversions::NANOSECONDS;
use polars_arrow::compute::cast::cast;
#[cfg(feature = "dtype-struct")]
use polars_arrow::kernels::concatenate::concatenate_owned_unchecked;
use std::convert::TryFrom;

impl Series {
Expand Down Expand Up @@ -63,6 +65,8 @@ impl Series {
Boolean => BooleanChunked::from_chunks(name, chunks).into_series(),
Float32 => Float32Chunked::from_chunks(name, chunks).into_series(),
Float64 => Float64Chunked::from_chunks(name, chunks).into_series(),
#[cfg(feature = "dtype-struct")]
Struct(_) => Series::try_from_arrow_unchecked(name, chunks, &dtype.to_arrow()).unwrap(),
#[cfg(feature = "object")]
Object(_) => todo!(),
_ => unreachable!(),
Expand Down Expand Up @@ -376,6 +380,32 @@ impl Series {
};
Ok(s)
}
#[cfg(feature = "dtype-struct")]
ArrowDataType::Struct(_) => {
let arr = if chunks.len() > 1 {
concatenate_owned_unchecked(&chunks).unwrap() as ArrayRef
} else {
chunks[0].clone()
};
let struct_arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
assert!(
struct_arr.validity().is_none(),
"polars struct does not support validity"
);
let fields = struct_arr
.values()
.iter()
.zip(struct_arr.fields())
.map(|(arr, field)| {
Series::try_from_arrow_unchecked(
&field.name,
vec![arr.clone()],
&field.data_type,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(StructChunked::new_unchecked(name, &fields).into_series())
}
dt => Err(PolarsError::InvalidOperation(
format!("Cannot create polars series from {:?} type", dt).into(),
)),
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/src/series/implementations/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl private::PrivateSeries for SeriesWrap<StructChunked> {
.iter()
.zip(other.fields())
.map(|(lhs, rhs)| lhs.zip_with_same_type(mask, rhs))
.collect::<Result<_>>()?;
Ok(StructChunked::new(self.0.name(), fields).into_series())
.collect::<Result<Vec<_>>>()?;
Ok(StructChunked::new_unchecked(self.0.name(), &fields).into_series())
}

fn agg_list(&self, groups: &GroupsProxy) -> Option<Series> {
Expand All @@ -51,8 +51,8 @@ impl private::PrivateSeries for SeriesWrap<StructChunked> {
.fields()
.iter()
.map(|s| s.agg_list(groups))
.collect::<Option<_>>()?;
Some(StructChunked::new(self.name(), fields).into_series())
.collect::<Option<Vec<_>>>()?;
Some(StructChunked::new_unchecked(self.name(), &fields).into_series())
}

fn group_tuples(&self, multithreaded: bool, sorted: bool) -> GroupsProxy {
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-core/src/series/into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl Series {
let arr = cast(&*self.chunks()[chunk_idx], &DataType::Time.to_arrow()).unwrap();
Arc::from(arr)
}
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => {
let ca = self.struct_().unwrap();
ca.arrow_array().clone()
}
_ => self.chunks()[chunk_idx].clone(),
}
}
Expand Down
23 changes: 18 additions & 5 deletions polars/polars-core/src/series/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::prelude::any_value::arr_to_any_value;
use crate::prelude::*;
use crate::utils::NoNull;
use std::iter::FromIterator;

macro_rules! from_iterator {
($native:ty, $variant:ident) => {
Expand Down Expand Up @@ -60,11 +59,19 @@ impl FromIterator<String> for Series {

#[cfg(feature = "rows")]
impl Series {
pub(crate) fn iter(&self) -> impl Iterator<Item = AnyValue> {
assert_eq!(self.chunks().len(), 1, "impl error");
pub fn iter(&self) -> SeriesIter<'_> {
let dtype = self.dtype();

let arr = &*self.chunks()[0];
let arr = match dtype {
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => {
let ca = self.struct_().unwrap();
&**ca.arrow_array()
}
_ => {
assert_eq!(self.chunks().len(), 1, "impl error");
&*self.chunks()[0]
}
};
let len = arr.len();
SeriesIter {
arr,
Expand Down Expand Up @@ -95,8 +102,14 @@ impl<'a> Iterator for SeriesIter<'a> {
unsafe { Some(arr_to_any_value(self.arr, idx, self.dtype)) }
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl ExactSizeIterator for SeriesIter<'_> {}

#[cfg(test)]
mod test {
use crate::prelude::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,3 +733,18 @@ impl_into_range!(u32);
pub fn range<T: Range<T>>(low: T, high: T) -> Expr {
low.into_range(high)
}

/// Take several expressions and collect them into a [`StructChunked`].
#[cfg(feature = "dtype-struct")]
pub fn as_struct(exprs: &[Expr]) -> Expr {
map_multiple(
|s| StructChunked::new("", s).map(|ca| ca.into_series()),
exprs,
GetOutput::map_fields(|fld| Field::new("", DataType::Struct(fld.to_vec()))),
)
.with_function_options(|mut options| {
options.input_wildcard_expansion = true;
options.fmt_str = "as_struct";
options
})
}
10 changes: 9 additions & 1 deletion polars/polars-lazy/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub mod cat;
pub use cat::*;
#[cfg(feature = "temporal")]
mod dt;
#[cfg(feature = "compile")]
mod functions;
#[cfg(feature = "list")]
mod list;
mod options;
Expand Down Expand Up @@ -32,8 +34,8 @@ use std::{
};
// reexport the lazy method
pub use crate::frame::IntoLazy;
pub use crate::functions::*;
pub use crate::logical_plan::lit;
pub use functions::*;
pub use options::*;

use polars_arrow::array::default_arrays::FromData;
Expand Down Expand Up @@ -183,6 +185,12 @@ impl GetOutput {
}))
}

pub fn map_fields<F: 'static + Fn(&[Field]) -> Field + Send + Sync>(f: F) -> Self {
NoEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
f(flds)
}))
}

pub fn map_dtype<F: 'static + Fn(&DataType) -> DataType + Send + Sync>(f: F) -> Self {
NoEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
let mut fld = flds[0].clone();
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::functions::concat;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::csv::{CsvEncoding, NullValues};
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::functions::concat;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::RowCount;
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::functions::concat;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::RowCount;
Expand Down
2 changes: 0 additions & 2 deletions polars/polars-lazy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ pub mod frame;
#[cfg(feature = "compile")]
mod from;
#[cfg(feature = "compile")]
pub mod functions;
#[cfg(feature = "compile")]
pub mod logical_plan;
#[cfg(feature = "compile")]
pub mod physical_plan;
Expand Down
1 change: 0 additions & 1 deletion polars/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub struct RollingGroupOptions {
pub use crate::{
dsl::*,
frame::*,
functions::*,
logical_plan::{
optimizer::{type_coercion::TypeCoercionRule, Optimize, *},
options::*,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use polars_core::prelude::*;
use polars_io::prelude::*;
use std::io::Cursor;

use crate::functions::{argsort_by, pearson_corr};
use crate::dsl::{argsort_by, pearson_corr};
use crate::logical_plan::iterator::ArenaLpIter;
use crate::logical_plan::optimizer::simplify_expr::SimplifyExprRule;
use crate::logical_plan::optimizer::stack_opt::{OptimizationRule, StackOptimizer};
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/expression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ These functions can be used as expression and sometimes also in eager contexts.
exclude
datetime
date
struct

Constructor
-----------
Expand Down
1 change: 1 addition & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def version() -> str:
select,
spearman_rank_corr,
std,
struct,
sum,
tail,
)
Expand Down

0 comments on commit 7e85a3d

Please sign in to comment.