Skip to content

Commit

Permalink
Improve list builders, iteration and construction
Browse files Browse the repository at this point in the history
- Greatly improves performance of the list builders
 by: jorgecarleitao/arrow2#991

- List builders now also support nested dtypes like List and Struct

- Python DataFrame and Series constructor now support better nested dtype
construction
  • Loading branch information
ritchie46 committed May 18, 2022
1 parent 22b01e9 commit 7b5480e
Show file tree
Hide file tree
Showing 20 changed files with 271 additions and 65 deletions.
4 changes: 3 additions & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "aafba7b4eb4991e016638cbc1d4df676912e8236", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.11", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
Expand All @@ -20,4 +21,5 @@ thiserror = "^1.0"
[features]
strings = []
compute = ["arrow/compute_cast"]
temporal = ["arrow/compute_temporal"]
bigidx = []
1 change: 1 addition & 0 deletions polars/polars-arrow/src/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl<'a> AnonymousBuilder<'a> {
self.arrays.is_empty()
}

#[inline]
pub fn push(&mut self, arr: &'a dyn Array) {
self.size += arr.len() as i64;
self.offsets.push(self.size);
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ thiserror = "^1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184"
rev = "aafba7b4eb4991e016638cbc1d4df676912e8236"
# path = "../../../arrow2"
# branch = "polars"
# version = "0.11"
default-features = false
Expand Down
129 changes: 105 additions & 24 deletions polars/polars-core/src/chunked_array/builder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ where

pub struct ListPrimitiveChunkedBuilder<T>
where
T: NumericNative,
T: PolarsNumericType,
{
pub builder: LargePrimitiveBuilder<T>,
pub builder: LargePrimitiveBuilder<T::Native>,
field: Field,
fast_explode: bool,
}
Expand All @@ -62,16 +62,16 @@ macro_rules! finish_list_builder {

impl<T> ListPrimitiveChunkedBuilder<T>
where
T: NumericNative,
T: PolarsNumericType,
{
pub fn new(
name: &str,
capacity: usize,
values_capacity: usize,
logical_type: DataType,
) -> Self {
let values = MutablePrimitiveArray::<T>::with_capacity(values_capacity);
let builder = LargePrimitiveBuilder::<T>::new_with_capacity(values, capacity);
let values = MutablePrimitiveArray::<T::Native>::with_capacity(values_capacity);
let builder = LargePrimitiveBuilder::<T::Native>::new_with_capacity(values, capacity);
let field = Field::new(name, DataType::List(Box::new(logical_type)));

Self {
Expand All @@ -81,7 +81,7 @@ where
}
}

pub fn append_slice(&mut self, opt_v: Option<&[T]>) {
pub fn append_slice(&mut self, opt_v: Option<&[T::Native]>) {
match opt_v {
Some(items) => {
let values = self.builder.mut_values();
Expand All @@ -99,7 +99,7 @@ where
}
/// Appends from an iterator over values
#[inline]
pub fn append_iter_values<I: Iterator<Item = T> + TrustedLen>(&mut self, iter: I) {
pub fn append_iter_values<I: Iterator<Item = T::Native> + TrustedLen>(&mut self, iter: I) {
let values = self.builder.mut_values();

if iter.size_hint().0 == 0 {
Expand All @@ -113,7 +113,7 @@ where

/// Appends from an iterator over values
#[inline]
pub fn append_iter<I: Iterator<Item = Option<T>> + TrustedLen>(&mut self, iter: I) {
pub fn append_iter<I: Iterator<Item = Option<T::Native>> + TrustedLen>(&mut self, iter: I) {
let values = self.builder.mut_values();

if iter.size_hint().0 == 0 {
Expand All @@ -128,7 +128,7 @@ where

impl<T> ListBuilderTrait for ListPrimitiveChunkedBuilder<T>
where
T: NumericNative,
T: PolarsNumericType,
{
#[inline]
fn append_opt_series(&mut self, opt_s: Option<&Series>) {
Expand All @@ -151,12 +151,10 @@ where
if s.is_empty() {
self.fast_explode = false;
}
let arrays = s.chunks();
let ca = s.unpack::<T>().unwrap();
let values = self.builder.mut_values();

arrays.iter().for_each(|x| {
let arr = x.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();

ca.downcast_iter().for_each(|arr| {
if !arr.has_validity() {
values.extend_from_slice(arr.values().as_slice())
} else {
Expand Down Expand Up @@ -350,14 +348,23 @@ pub fn get_list_builder(
#[cfg(feature = "object")]
DataType::Object(_) => _err(),
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => _err(),
DataType::Struct(_) => Ok(Box::new(AnonymousOwnedListBuilder::new(
name,
list_capacity,
physical_type,
))),
DataType::List(_) => Ok(Box::new(AnonymousOwnedListBuilder::new(
name,
list_capacity,
physical_type,
))),
#[cfg(feature = "dtype-categorical")]
DataType::Categorical(_) => _err(),
_ => {
macro_rules! get_primitive_builder {
($type:ty) => {{
let builder = ListPrimitiveChunkedBuilder::<$type>::new(
&name,
name,
list_capacity,
value_capacity,
dt.clone(),
Expand All @@ -379,7 +386,7 @@ pub fn get_list_builder(
Box::new(builder)
}};
}
Ok(match_dtype_to_physical_apply_macro!(
Ok(match_dtype_to_logical_apply_macro!(
physical_type,
get_primitive_builder,
get_utf8_builder,
Expand All @@ -395,12 +402,18 @@ pub struct AnonymousListBuilder<'a> {
pub dtype: DataType,
}

impl Default for AnonymousListBuilder<'_> {
fn default() -> Self {
Self::new("", 0, Default::default())
}
}

impl<'a> AnonymousListBuilder<'a> {
pub fn new(name: &str, capacity: usize, dtype: DataType) -> Self {
pub fn new(name: &str, capacity: usize, inner_dtype: DataType) -> Self {
Self {
name: name.into(),
builder: AnonymousBuilder::new(capacity),
dtype,
dtype: inner_dtype,
}
}

Expand Down Expand Up @@ -440,17 +453,85 @@ impl<'a> AnonymousListBuilder<'a> {
}
}

pub fn finish(self) -> ListChunked {
if self.builder.is_empty() {
ListChunked::full_null_with_dtype(&self.name, 0, &self.dtype)
pub fn finish(&mut self) -> ListChunked {
let slf = std::mem::take(self);
if slf.builder.is_empty() {
ListChunked::full_null_with_dtype(&slf.name, 0, &slf.dtype)
} else {
let arr = self
let arr = slf
.builder
.finish(Some(&self.dtype.to_physical().to_arrow()))
.finish(Some(&slf.dtype.to_physical().to_arrow()))
.unwrap();
let mut ca = ListChunked::from_chunks("", vec![Arc::new(arr)]);
ca.field = Arc::new(Field::new(&self.name, DataType::List(Box::new(self.dtype))));
ca.field = Arc::new(Field::new(&slf.name, DataType::List(Box::new(slf.dtype))));
ca
}
}
}

pub struct AnonymousOwnedListBuilder {
name: String,
builder: AnonymousBuilder<'static>,
owned: Vec<Series>,
inner_dtype: DataType,
}

impl Default for AnonymousOwnedListBuilder {
fn default() -> Self {
Self::new("", 0, Default::default())
}
}

impl ListBuilderTrait for AnonymousOwnedListBuilder {
fn append_series(&mut self, s: &Series) {
// Safety
// we deref a raw pointer with a lifetime that is not static
// it is safe because we also clone Series (Arc +=1) and therefore the &dyn Arrays
// will not be dropped until the owned series are dropped
unsafe {
match s.dtype() {
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => self.builder.push(&*(&**s.array_ref(0) as *const dyn Array)),
_ => {
self.builder
.push_multiple(&*(s.chunks().as_ref() as *const [ArrayRef]));
}
}
}
// this make sure that the underlying ArrayRef's are not dropped
self.owned.push(s.clone());
}

fn append_null(&mut self) {
self.builder.push_null()
}

fn finish(&mut self) -> ListChunked {
let slf = std::mem::take(self);
if slf.builder.is_empty() {
ListChunked::full_null_with_dtype(&slf.name, 0, &slf.inner_dtype)
} else {
let arr = slf
.builder
.finish(Some(&slf.inner_dtype.to_physical().to_arrow()))
.unwrap();
let mut ca = ListChunked::from_chunks("", vec![Arc::new(arr)]);
ca.field = Arc::new(Field::new(
&slf.name,
DataType::List(Box::new(slf.inner_dtype)),
));
ca
}
}
}

impl AnonymousOwnedListBuilder {
pub fn new(name: &str, capacity: usize, inner_dtype: DataType) -> Self {
Self {
name: name.into(),
builder: AnonymousBuilder::new(capacity),
owned: Vec::with_capacity(capacity),
inner_dtype,
}
}
}
27 changes: 27 additions & 0 deletions polars/polars-core/src/chunked_array/list/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub struct AmortizedListIter<'a, I: Iterator<Item = Option<ArrayBox>>> {
inner: NonNull<ArrayRef>,
lifetime: PhantomData<&'a ArrayRef>,
iter: I,
// used only if feature="dtype-struct"
#[allow(dead_code)]
inner_dtype: DataType,
}

impl<'a, I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'a, I> {
Expand All @@ -20,7 +23,30 @@ impl<'a, I: Iterator<Item = Option<ArrayBox>>> Iterator for AmortizedListIter<'a
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|opt_val| {
opt_val.map(|array_ref| {
#[cfg(feature = "dtype-struct")]
// structs arrays are bound to the series not to the arrayref
// so we must get a hold to the new array
if matches!(self.inner_dtype, DataType::Struct(_)) {
// Safety
// dtype is known
unsafe {
let array_ref = Arc::from(array_ref);
let mut s = Series::from_chunks_and_dtype_unchecked(
"",
vec![array_ref],
&self.inner_dtype,
);
// swap the new series with the container
std::mem::swap(&mut *self.series_container, &mut s);
// return a reference to the container
// this lifetime is now bound to 'a
return UnstableSeries::new(&*(&*self.series_container as *const Series));
}
}

// update the inner state
unsafe { *self.inner.as_mut() = array_ref.into() };

// Safety
// we cannot control the lifetime of an iterators `next` method.
// but as long as self is alive the reference to the series container is valid
Expand Down Expand Up @@ -85,6 +111,7 @@ impl ListChunked {
inner: NonNull::new(ptr).unwrap(),
lifetime: PhantomData,
iter: self.downcast_iter().flat_map(|arr| arr.iter()),
inner_dtype: self.inner_dtype(),
}
}

Expand Down
47 changes: 37 additions & 10 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,38 @@ pub trait NumericNative:
+ FromPrimitive
+ NativeArithmetics
{
type POLARSTYPE;
}
impl NumericNative for i8 {
type POLARSTYPE = Int8Type;
}
impl NumericNative for i16 {
type POLARSTYPE = Int16Type;
}
impl NumericNative for i32 {
type POLARSTYPE = Int32Type;
}
impl NumericNative for i64 {
type POLARSTYPE = Int64Type;
}
impl NumericNative for u8 {
type POLARSTYPE = UInt8Type;
}
impl NumericNative for u16 {
type POLARSTYPE = UInt16Type;
}
impl NumericNative for u32 {
type POLARSTYPE = UInt32Type;
}
impl NumericNative for u64 {
type POLARSTYPE = UInt64Type;
}
impl NumericNative for f32 {
type POLARSTYPE = Float32Type;
}
impl NumericNative for f64 {
type POLARSTYPE = Float64Type;
}
impl NumericNative for i8 {}
impl NumericNative for i16 {}
impl NumericNative for i32 {}
impl NumericNative for i64 {}
impl NumericNative for u8 {}
impl NumericNative for u16 {}
impl NumericNative for u32 {}
impl NumericNative for u64 {}
impl NumericNative for f32 {}
impl NumericNative for f64 {}

pub trait PolarsNumericType: Send + Sync + PolarsDataType + 'static {
type Native: NumericNative;
Expand Down Expand Up @@ -678,6 +699,12 @@ pub enum DataType {
Unknown,
}

impl Default for DataType {
fn default() -> Self {
DataType::Unknown
}
}

impl Hash for DataType {
fn hash<H: Hasher>(&self, state: &mut H) {
std::mem::discriminant(self).hash(state)
Expand Down
16 changes: 11 additions & 5 deletions polars/polars-core/src/series/ops/to_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ use polars_arrow::kernels::list::array_to_unit_list;
use std::borrow::Cow;

fn reshape_fast_path(name: &str, s: &Series) -> Series {
let chunks = s
.chunks()
.iter()
.map(|arr| Arc::new(array_to_unit_list(arr.clone())) as ArrayRef)
.collect::<Vec<_>>();
let chunks = match s.dtype() {
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => {
vec![Arc::new(array_to_unit_list(s.array_ref(0).clone())) as ArrayRef]
}
_ => s
.chunks()
.iter()
.map(|arr| Arc::new(array_to_unit_list(arr.clone())) as ArrayRef)
.collect::<Vec<_>>(),
};

let mut ca = ListChunked::from_chunks(name, chunks);
ca.set_fast_explode();
Expand Down
Loading

0 comments on commit 7b5480e

Please sign in to comment.