Skip to content

Commit

Permalink
Improve list builders, iteration and construction (#3419)
Browse files Browse the repository at this point in the history
* list namespace to polars-ops

* Improve list builders, iteration and construction

- Greatly improves performance of the list builders
 by: jorgecarleitao/arrow2#991

- List builders now also support nested dtypes like List and Struct

- Python DataFrame and Series constructor now support better nested dtype
construction

* fix tests and fix struct::agg_list
  • Loading branch information
ritchie46 committed May 18, 2022
1 parent 50f7bd9 commit 6e71b89
Show file tree
Hide file tree
Showing 36 changed files with 800 additions and 511 deletions.
2 changes: 1 addition & 1 deletion nodejs-polars/src/list_construction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub fn js_arr_to_list(name: &str, arr: &Array, dtype: &DataType) -> napi::Result
builder.finish().into_series()
}
DataType::Datetime(_, _) => {
let mut builder = ListPrimitiveChunkedBuilder::<i64>::new(
let mut builder = ListPrimitiveChunkedBuilder::<Int64Type>::new(
name,
len as usize,
(len as usize) * 5,
Expand Down
4 changes: 2 additions & 2 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ lazy_regex = ["polars-lazy/regex"]
cum_agg = ["polars-core/cum_agg", "polars-core/cum_agg"]
rolling_window = ["polars-core/rolling_window", "polars-lazy/rolling_window"]
interpolate = ["polars-core/interpolate", "polars-lazy/interpolate"]
list = ["polars-core/list", "polars-lazy/list"]
list = ["polars-lazy/list", "polars-ops/list"]
rank = ["polars-core/rank", "polars-lazy/rank"]
diff = ["polars-core/diff", "polars-lazy/diff"]
diff = ["polars-core/diff", "polars-lazy/diff", "polars-ops/diff"]
pct_change = ["polars-core/pct_change", "polars-lazy/pct_change"]
moment = ["polars-core/moment", "polars-lazy/moment"]
arange = ["polars-lazy/arange"]
Expand Down
4 changes: 3 additions & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "aafba7b4eb4991e016638cbc1d4df676912e8236", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.11", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
Expand All @@ -20,4 +21,5 @@ thiserror = "^1.0"
[features]
strings = []
compute = ["arrow/compute_cast"]
temporal = ["arrow/compute_temporal"]
bigidx = []
1 change: 1 addition & 0 deletions polars/polars-arrow/src/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl<'a> AnonymousBuilder<'a> {
self.arrays.is_empty()
}

#[inline]
pub fn push(&mut self, arr: &'a dyn Array) {
self.size += arr.len() as i64;
self.offsets.push(self.size);
Expand Down
6 changes: 2 additions & 4 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ cum_agg = []
# rolling window functions
rolling_window = []
interpolate = []
# additional list utils
list = []
rank = []
diff = []
pct_change = ["diff"]
Expand Down Expand Up @@ -133,7 +131,6 @@ docs-selection = [
"moment",
"dtype-categorical",
"rank",
"list",
"diagonal_concat",
"horizontal_concat",
"abs",
Expand Down Expand Up @@ -176,7 +173,8 @@ thiserror = "^1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184"
rev = "aafba7b4eb4991e016638cbc1d4df676912e8236"
# path = "../../../arrow2"
# branch = "polars"
# version = "0.11"
default-features = false
Expand Down
129 changes: 105 additions & 24 deletions polars/polars-core/src/chunked_array/builder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ where

pub struct ListPrimitiveChunkedBuilder<T>
where
T: NumericNative,
T: PolarsNumericType,
{
pub builder: LargePrimitiveBuilder<T>,
pub builder: LargePrimitiveBuilder<T::Native>,
field: Field,
fast_explode: bool,
}
Expand All @@ -62,16 +62,16 @@ macro_rules! finish_list_builder {

impl<T> ListPrimitiveChunkedBuilder<T>
where
T: NumericNative,
T: PolarsNumericType,
{
pub fn new(
name: &str,
capacity: usize,
values_capacity: usize,
logical_type: DataType,
) -> Self {
let values = MutablePrimitiveArray::<T>::with_capacity(values_capacity);
let builder = LargePrimitiveBuilder::<T>::new_with_capacity(values, capacity);
let values = MutablePrimitiveArray::<T::Native>::with_capacity(values_capacity);
let builder = LargePrimitiveBuilder::<T::Native>::new_with_capacity(values, capacity);
let field = Field::new(name, DataType::List(Box::new(logical_type)));

Self {
Expand All @@ -81,7 +81,7 @@ where
}
}

pub fn append_slice(&mut self, opt_v: Option<&[T]>) {
pub fn append_slice(&mut self, opt_v: Option<&[T::Native]>) {
match opt_v {
Some(items) => {
let values = self.builder.mut_values();
Expand All @@ -99,7 +99,7 @@ where
}
/// Appends from an iterator over values
#[inline]
pub fn append_iter_values<I: Iterator<Item = T> + TrustedLen>(&mut self, iter: I) {
pub fn append_iter_values<I: Iterator<Item = T::Native> + TrustedLen>(&mut self, iter: I) {
let values = self.builder.mut_values();

if iter.size_hint().0 == 0 {
Expand All @@ -113,7 +113,7 @@ where

/// Appends from an iterator over values
#[inline]
pub fn append_iter<I: Iterator<Item = Option<T>> + TrustedLen>(&mut self, iter: I) {
pub fn append_iter<I: Iterator<Item = Option<T::Native>> + TrustedLen>(&mut self, iter: I) {
let values = self.builder.mut_values();

if iter.size_hint().0 == 0 {
Expand All @@ -128,7 +128,7 @@ where

impl<T> ListBuilderTrait for ListPrimitiveChunkedBuilder<T>
where
T: NumericNative,
T: PolarsNumericType,
{
#[inline]
fn append_opt_series(&mut self, opt_s: Option<&Series>) {
Expand All @@ -151,12 +151,10 @@ where
if s.is_empty() {
self.fast_explode = false;
}
let arrays = s.chunks();
let ca = s.unpack::<T>().unwrap();
let values = self.builder.mut_values();

arrays.iter().for_each(|x| {
let arr = x.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();

ca.downcast_iter().for_each(|arr| {
if !arr.has_validity() {
values.extend_from_slice(arr.values().as_slice())
} else {
Expand Down Expand Up @@ -350,14 +348,23 @@ pub fn get_list_builder(
#[cfg(feature = "object")]
DataType::Object(_) => _err(),
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => _err(),
DataType::Struct(_) => Ok(Box::new(AnonymousOwnedListBuilder::new(
name,
list_capacity,
physical_type,
))),
DataType::List(_) => Ok(Box::new(AnonymousOwnedListBuilder::new(
name,
list_capacity,
physical_type,
))),
#[cfg(feature = "dtype-categorical")]
DataType::Categorical(_) => _err(),
_ => {
macro_rules! get_primitive_builder {
($type:ty) => {{
let builder = ListPrimitiveChunkedBuilder::<$type>::new(
&name,
name,
list_capacity,
value_capacity,
dt.clone(),
Expand All @@ -379,7 +386,7 @@ pub fn get_list_builder(
Box::new(builder)
}};
}
Ok(match_dtype_to_physical_apply_macro!(
Ok(match_dtype_to_logical_apply_macro!(
physical_type,
get_primitive_builder,
get_utf8_builder,
Expand All @@ -395,12 +402,18 @@ pub struct AnonymousListBuilder<'a> {
pub dtype: DataType,
}

impl Default for AnonymousListBuilder<'_> {
fn default() -> Self {
Self::new("", 0, Default::default())
}
}

impl<'a> AnonymousListBuilder<'a> {
pub fn new(name: &str, capacity: usize, dtype: DataType) -> Self {
pub fn new(name: &str, capacity: usize, inner_dtype: DataType) -> Self {
Self {
name: name.into(),
builder: AnonymousBuilder::new(capacity),
dtype,
dtype: inner_dtype,
}
}

Expand Down Expand Up @@ -440,17 +453,85 @@ impl<'a> AnonymousListBuilder<'a> {
}
}

pub fn finish(self) -> ListChunked {
if self.builder.is_empty() {
ListChunked::full_null_with_dtype(&self.name, 0, &self.dtype)
pub fn finish(&mut self) -> ListChunked {
let slf = std::mem::take(self);
if slf.builder.is_empty() {
ListChunked::full_null_with_dtype(&slf.name, 0, &slf.dtype)
} else {
let arr = self
let arr = slf
.builder
.finish(Some(&self.dtype.to_physical().to_arrow()))
.finish(Some(&slf.dtype.to_physical().to_arrow()))
.unwrap();
let mut ca = ListChunked::from_chunks("", vec![Arc::new(arr)]);
ca.field = Arc::new(Field::new(&self.name, DataType::List(Box::new(self.dtype))));
ca.field = Arc::new(Field::new(&slf.name, DataType::List(Box::new(slf.dtype))));
ca
}
}
}

pub struct AnonymousOwnedListBuilder {
name: String,
builder: AnonymousBuilder<'static>,
owned: Vec<Series>,
inner_dtype: DataType,
}

impl Default for AnonymousOwnedListBuilder {
fn default() -> Self {
Self::new("", 0, Default::default())
}
}

impl ListBuilderTrait for AnonymousOwnedListBuilder {
fn append_series(&mut self, s: &Series) {
// Safety
// we deref a raw pointer with a lifetime that is not static
// it is safe because we also clone Series (Arc +=1) and therefore the &dyn Arrays
// will not be dropped until the owned series are dropped
unsafe {
match s.dtype() {
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => self.builder.push(&*(&**s.array_ref(0) as *const dyn Array)),
_ => {
self.builder
.push_multiple(&*(s.chunks().as_ref() as *const [ArrayRef]));
}
}
}
// this make sure that the underlying ArrayRef's are not dropped
self.owned.push(s.clone());
}

fn append_null(&mut self) {
self.builder.push_null()
}

fn finish(&mut self) -> ListChunked {
let slf = std::mem::take(self);
if slf.builder.is_empty() {
ListChunked::full_null_with_dtype(&slf.name, 0, &slf.inner_dtype)
} else {
let arr = slf
.builder
.finish(Some(&slf.inner_dtype.to_physical().to_arrow()))
.unwrap();
let mut ca = ListChunked::from_chunks("", vec![Arc::new(arr)]);
ca.field = Arc::new(Field::new(
&slf.name,
DataType::List(Box::new(slf.inner_dtype)),
));
ca
}
}
}

impl AnonymousOwnedListBuilder {
pub fn new(name: &str, capacity: usize, inner_dtype: DataType) -> Self {
Self {
name: name.into(),
builder: AnonymousBuilder::new(capacity),
owned: Vec::with_capacity(capacity),
inner_dtype,
}
}
}
6 changes: 4 additions & 2 deletions polars/polars-core/src/chunked_array/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ mod test {

#[test]
fn test_list_builder() {
let mut builder = ListPrimitiveChunkedBuilder::<i32>::new("a", 10, 5, DataType::Int32);
let mut builder =
ListPrimitiveChunkedBuilder::<Int32Type>::new("a", 10, 5, DataType::Int32);

// create a series containing two chunks
let mut s1 = Int32Chunked::from_slice("a", &[1, 2, 3]).into_series();
Expand All @@ -212,7 +213,8 @@ mod test {
assert_eq!(out.get(0).unwrap().len(), 6);
assert_eq!(out.get(1).unwrap().len(), 3);

let mut builder = ListPrimitiveChunkedBuilder::<i32>::new("a", 10, 5, DataType::Int32);
let mut builder =
ListPrimitiveChunkedBuilder::<Int32Type>::new("a", 10, 5, DataType::Int32);
builder.append_series(&s1);
builder.append_null();

Expand Down
3 changes: 2 additions & 1 deletion polars/polars-core/src/chunked_array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ mod test {

#[test]
fn test_cast_list() -> Result<()> {
let mut builder = ListPrimitiveChunkedBuilder::<i32>::new("a", 10, 10, DataType::Int32);
let mut builder =
ListPrimitiveChunkedBuilder::<Int32Type>::new("a", 10, 10, DataType::Int32);
builder.append_slice(Some(&[1i32, 2, 3]));
builder.append_slice(Some(&[1i32, 2, 3]));
let ca = builder.finish();
Expand Down
27 changes: 27 additions & 0 deletions polars/polars-core/src/chunked_array/list/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub struct AmortizedListIter<'a, I: Iterator<Item = Option<ArrayBox>>> {
inner: NonNull<ArrayRef>,
lifetime: PhantomData<&'a ArrayRef>,
iter: I,
// used only if feature="dtype-struct"
#[allow(dead_code)]
inner_dtype: DataType,
}

impl<'a, I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'a, I> {
Expand All @@ -20,7 +23,30 @@ impl<'a, I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'a
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|opt_val| {
opt_val.map(|array_ref| {
#[cfg(feature = "dtype-struct")]
// structs arrays are bound to the series not to the arrayref
// so we must get a hold to the new array
if matches!(self.inner_dtype, DataType::Struct(_)) {
// Safety
// dtype is known
unsafe {
let array_ref = Arc::from(array_ref);
let mut s = Series::from_chunks_and_dtype_unchecked(
"",
vec![array_ref],
&self.inner_dtype,
);
// swap the new series with the container
std::mem::swap(&mut *self.series_container, &mut s);
// return a reference to the container
// this lifetime is now bound to 'a
return UnstableSeries::new(&*(&*self.series_container as *const Series));
}
}

// update the inner state
unsafe { *self.inner.as_mut() = array_ref.into() };

// Safety
// we cannot control the lifetime of an iterators `next` method.
// but as long as self is alive the reference to the series container is valid
Expand Down Expand Up @@ -85,6 +111,7 @@ impl ListChunked {
inner: NonNull::new(ptr).unwrap(),
lifetime: PhantomData,
iter: self.downcast_iter().flat_map(|arr| arr.iter()),
inner_dtype: self.inner_dtype(),
}
}

Expand Down
3 changes: 0 additions & 3 deletions polars/polars-core/src/chunked_array/list/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! Special list utility methods
mod iterator;
#[cfg(feature = "list")]
#[cfg_attr(docsrs, doc(cfg(feature = "list")))]
pub mod namespace;

use crate::prelude::*;

Expand Down

0 comments on commit 6e71b89

Please sign in to comment.