diff --git a/nodejs-polars/src/list_construction.rs b/nodejs-polars/src/list_construction.rs index b581b4f70a64..085672f3320b 100644 --- a/nodejs-polars/src/list_construction.rs +++ b/nodejs-polars/src/list_construction.rs @@ -135,7 +135,7 @@ pub fn js_arr_to_list(name: &str, arr: &Array, dtype: &DataType) -> napi::Result builder.finish().into_series() } DataType::Datetime(_, _) => { - let mut builder = ListPrimitiveChunkedBuilder::::new( + let mut builder = ListPrimitiveChunkedBuilder::::new( name, len as usize, (len as usize) * 5, diff --git a/polars/Cargo.toml b/polars/Cargo.toml index 5cd7a0063962..e9d9677a5cd8 100644 --- a/polars/Cargo.toml +++ b/polars/Cargo.toml @@ -85,9 +85,9 @@ lazy_regex = ["polars-lazy/regex"] cum_agg = ["polars-core/cum_agg", "polars-core/cum_agg"] rolling_window = ["polars-core/rolling_window", "polars-lazy/rolling_window"] interpolate = ["polars-core/interpolate", "polars-lazy/interpolate"] -list = ["polars-core/list", "polars-lazy/list"] +list = ["polars-lazy/list", "polars-ops/list"] rank = ["polars-core/rank", "polars-lazy/rank"] -diff = ["polars-core/diff", "polars-lazy/diff"] +diff = ["polars-core/diff", "polars-lazy/diff", "polars-ops/diff"] pct_change = ["polars-core/pct_change", "polars-lazy/pct_change"] moment = ["polars-core/moment", "polars-lazy/moment"] arange = ["polars-lazy/arange"] diff --git a/polars/polars-arrow/Cargo.toml b/polars/polars-arrow/Cargo.toml index 51a04ad39d53..1262e936338d 100644 --- a/polars/polars-arrow/Cargo.toml +++ b/polars/polars-arrow/Cargo.toml @@ -9,7 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", features = ["compute_concatenate"], default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "aafba7b4eb4991e016638cbc1d4df676912e8236", features = ["compute_concatenate"], default-features = false } +# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false } # arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.11", default-features = false, features = ["compute_concatenate"] } hashbrown = "0.12" @@ -20,4 +21,5 @@ thiserror = "^1.0" [features] strings = [] compute = ["arrow/compute_cast"] +temporal = ["arrow/compute_temporal"] bigidx = [] diff --git a/polars/polars-arrow/src/array/list.rs b/polars/polars-arrow/src/array/list.rs index 91947e469ddb..9fe835c42d6e 100644 --- a/polars/polars-arrow/src/array/list.rs +++ b/polars/polars-arrow/src/array/list.rs @@ -31,6 +31,7 @@ impl<'a> AnonymousBuilder<'a> { self.arrays.is_empty() } + #[inline] pub fn push(&mut self, arr: &'a dyn Array) { self.size += arr.len() as i64; self.offsets.push(self.size); diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index 3add486b003c..aade4f50bb19 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -62,8 +62,6 @@ cum_agg = [] # rolling window functions rolling_window = [] interpolate = [] -# additional list utils -list = [] rank = [] diff = [] pct_change = ["diff"] @@ -133,7 +131,6 @@ docs-selection = [ "moment", "dtype-categorical", "rank", - "list", "diagonal_concat", "horizontal_concat", "abs", @@ -176,7 +173,8 @@ thiserror = "^1.0" package = "arrow2" git = "https://github.com/jorgecarleitao/arrow2" # git = "https://github.com/ritchie46/arrow2" -rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184" +rev = "aafba7b4eb4991e016638cbc1d4df676912e8236" +# path = "../../../arrow2" # branch = "polars" # version = "0.11" default-features = false diff --git a/polars/polars-core/src/chunked_array/builder/list.rs b/polars/polars-core/src/chunked_array/builder/list.rs index 62283fb614ff..1e76545fbbeb 100644 --- a/polars/polars-core/src/chunked_array/builder/list.rs +++ b/polars/polars-core/src/chunked_array/builder/list.rs @@ -36,9 +36,9 @@ where pub struct ListPrimitiveChunkedBuilder where - T: NumericNative, + T: PolarsNumericType, { - pub builder: LargePrimitiveBuilder, + pub builder: LargePrimitiveBuilder, field: Field, fast_explode: bool, } @@ -62,7 +62,7 @@ macro_rules! finish_list_builder { impl ListPrimitiveChunkedBuilder where - T: NumericNative, + T: PolarsNumericType, { pub fn new( name: &str, @@ -70,8 +70,8 @@ where values_capacity: usize, logical_type: DataType, ) -> Self { - let values = MutablePrimitiveArray::::with_capacity(values_capacity); - let builder = LargePrimitiveBuilder::::new_with_capacity(values, capacity); + let values = MutablePrimitiveArray::::with_capacity(values_capacity); + let builder = LargePrimitiveBuilder::::new_with_capacity(values, capacity); let field = Field::new(name, DataType::List(Box::new(logical_type))); Self { @@ -81,7 +81,7 @@ where } } - pub fn append_slice(&mut self, opt_v: Option<&[T]>) { + pub fn append_slice(&mut self, opt_v: Option<&[T::Native]>) { match opt_v { Some(items) => { let values = self.builder.mut_values(); @@ -99,7 +99,7 @@ where } /// Appends from an iterator over values #[inline] - pub fn append_iter_values + TrustedLen>(&mut self, iter: I) { + pub fn append_iter_values + TrustedLen>(&mut self, iter: I) { let values = self.builder.mut_values(); if iter.size_hint().0 == 0 { @@ -113,7 +113,7 @@ where /// Appends from an iterator over values #[inline] - pub fn append_iter> + TrustedLen>(&mut self, iter: I) { + pub fn append_iter> + TrustedLen>(&mut self, iter: I) { let values = self.builder.mut_values(); if iter.size_hint().0 == 0 { @@ -128,7 +128,7 @@ where impl ListBuilderTrait for ListPrimitiveChunkedBuilder where - T: NumericNative, + T: PolarsNumericType, { #[inline] fn append_opt_series(&mut self, opt_s: Option<&Series>) { @@ -151,12 +151,10 @@ where if s.is_empty() { self.fast_explode = false; } - let arrays = s.chunks(); + let ca = s.unpack::().unwrap(); let values = self.builder.mut_values(); - arrays.iter().for_each(|x| { - let arr = x.as_any().downcast_ref::>().unwrap(); - + ca.downcast_iter().for_each(|arr| { if !arr.has_validity() { values.extend_from_slice(arr.values().as_slice()) } else { @@ -350,14 +348,23 @@ pub fn get_list_builder( #[cfg(feature = "object")] DataType::Object(_) => _err(), #[cfg(feature = "dtype-struct")] - DataType::Struct(_) => _err(), + DataType::Struct(_) => Ok(Box::new(AnonymousOwnedListBuilder::new( + name, + list_capacity, + physical_type, + ))), + DataType::List(_) => Ok(Box::new(AnonymousOwnedListBuilder::new( + name, + list_capacity, + physical_type, + ))), #[cfg(feature = "dtype-categorical")] DataType::Categorical(_) => _err(), _ => { macro_rules! get_primitive_builder { ($type:ty) => {{ let builder = ListPrimitiveChunkedBuilder::<$type>::new( - &name, + name, list_capacity, value_capacity, dt.clone(), @@ -379,7 +386,7 @@ pub fn get_list_builder( Box::new(builder) }}; } - Ok(match_dtype_to_physical_apply_macro!( + Ok(match_dtype_to_logical_apply_macro!( physical_type, get_primitive_builder, get_utf8_builder, @@ -395,12 +402,18 @@ pub struct AnonymousListBuilder<'a> { pub dtype: DataType, } +impl Default for AnonymousListBuilder<'_> { + fn default() -> Self { + Self::new("", 0, Default::default()) + } +} + impl<'a> AnonymousListBuilder<'a> { - pub fn new(name: &str, capacity: usize, dtype: DataType) -> Self { + pub fn new(name: &str, capacity: usize, inner_dtype: DataType) -> Self { Self { name: name.into(), builder: AnonymousBuilder::new(capacity), - dtype, + dtype: inner_dtype, } } @@ -440,17 +453,85 @@ impl<'a> AnonymousListBuilder<'a> { } } - pub fn finish(self) -> ListChunked { - if self.builder.is_empty() { - ListChunked::full_null_with_dtype(&self.name, 0, &self.dtype) + pub fn finish(&mut self) -> ListChunked { + let slf = std::mem::take(self); + if slf.builder.is_empty() { + ListChunked::full_null_with_dtype(&slf.name, 0, &slf.dtype) } else { - let arr = self + let arr = slf .builder - .finish(Some(&self.dtype.to_physical().to_arrow())) + .finish(Some(&slf.dtype.to_physical().to_arrow())) .unwrap(); let mut ca = ListChunked::from_chunks("", vec![Arc::new(arr)]); - ca.field = Arc::new(Field::new(&self.name, DataType::List(Box::new(self.dtype)))); + ca.field = Arc::new(Field::new(&slf.name, DataType::List(Box::new(slf.dtype)))); ca } } } + +pub struct AnonymousOwnedListBuilder { + name: String, + builder: AnonymousBuilder<'static>, + owned: Vec, + inner_dtype: DataType, +} + +impl Default for AnonymousOwnedListBuilder { + fn default() -> Self { + Self::new("", 0, Default::default()) + } +} + +impl ListBuilderTrait for AnonymousOwnedListBuilder { + fn append_series(&mut self, s: &Series) { + // Safety + // we deref a raw pointer with a lifetime that is not static + // it is safe because we also clone Series (Arc +=1) and therefore the &dyn Arrays + // will not be dropped until the owned series are dropped + unsafe { + match s.dtype() { + #[cfg(feature = "dtype-struct")] + DataType::Struct(_) => self.builder.push(&*(&**s.array_ref(0) as *const dyn Array)), + _ => { + self.builder + .push_multiple(&*(s.chunks().as_ref() as *const [ArrayRef])); + } + } + } + // this make sure that the underlying ArrayRef's are not dropped + self.owned.push(s.clone()); + } + + fn append_null(&mut self) { + self.builder.push_null() + } + + fn finish(&mut self) -> ListChunked { + let slf = std::mem::take(self); + if slf.builder.is_empty() { + ListChunked::full_null_with_dtype(&slf.name, 0, &slf.inner_dtype) + } else { + let arr = slf + .builder + .finish(Some(&slf.inner_dtype.to_physical().to_arrow())) + .unwrap(); + let mut ca = ListChunked::from_chunks("", vec![Arc::new(arr)]); + ca.field = Arc::new(Field::new( + &slf.name, + DataType::List(Box::new(slf.inner_dtype)), + )); + ca + } + } +} + +impl AnonymousOwnedListBuilder { + pub fn new(name: &str, capacity: usize, inner_dtype: DataType) -> Self { + Self { + name: name.into(), + builder: AnonymousBuilder::new(capacity), + owned: Vec::with_capacity(capacity), + inner_dtype, + } + } +} diff --git a/polars/polars-core/src/chunked_array/builder/mod.rs b/polars/polars-core/src/chunked_array/builder/mod.rs index 581c04c4f609..841df7144e37 100644 --- a/polars/polars-core/src/chunked_array/builder/mod.rs +++ b/polars/polars-core/src/chunked_array/builder/mod.rs @@ -186,7 +186,8 @@ mod test { #[test] fn test_list_builder() { - let mut builder = ListPrimitiveChunkedBuilder::::new("a", 10, 5, DataType::Int32); + let mut builder = + ListPrimitiveChunkedBuilder::::new("a", 10, 5, DataType::Int32); // create a series containing two chunks let mut s1 = Int32Chunked::from_slice("a", &[1, 2, 3]).into_series(); @@ -212,7 +213,8 @@ mod test { assert_eq!(out.get(0).unwrap().len(), 6); assert_eq!(out.get(1).unwrap().len(), 3); - let mut builder = ListPrimitiveChunkedBuilder::::new("a", 10, 5, DataType::Int32); + let mut builder = + ListPrimitiveChunkedBuilder::::new("a", 10, 5, DataType::Int32); builder.append_series(&s1); builder.append_null(); diff --git a/polars/polars-core/src/chunked_array/cast.rs b/polars/polars-core/src/chunked_array/cast.rs index 114b9415cb5a..e090e7a07267 100644 --- a/polars/polars-core/src/chunked_array/cast.rs +++ b/polars/polars-core/src/chunked_array/cast.rs @@ -122,7 +122,8 @@ mod test { #[test] fn test_cast_list() -> Result<()> { - let mut builder = ListPrimitiveChunkedBuilder::::new("a", 10, 10, DataType::Int32); + let mut builder = + ListPrimitiveChunkedBuilder::::new("a", 10, 10, DataType::Int32); builder.append_slice(Some(&[1i32, 2, 3])); builder.append_slice(Some(&[1i32, 2, 3])); let ca = builder.finish(); diff --git a/polars/polars-core/src/chunked_array/list/iterator.rs b/polars/polars-core/src/chunked_array/list/iterator.rs index 91b86c220c38..13363d6207a7 100644 --- a/polars/polars-core/src/chunked_array/list/iterator.rs +++ b/polars/polars-core/src/chunked_array/list/iterator.rs @@ -12,6 +12,9 @@ pub struct AmortizedListIter<'a, I: Iterator>> { inner: NonNull, lifetime: PhantomData<&'a ArrayRef>, iter: I, + // used only if feature="dtype-struct" + #[allow(dead_code)] + inner_dtype: DataType, } impl<'a, I: Iterator>> Iterator for AmortizedListIter<'a, I> { @@ -20,7 +23,30 @@ impl<'a, I: Iterator>> Iterator for AmortizedListIter<'a fn next(&mut self) -> Option { self.iter.next().map(|opt_val| { opt_val.map(|array_ref| { + #[cfg(feature = "dtype-struct")] + // structs arrays are bound to the series not to the arrayref + // so we must get a hold to the new array + if matches!(self.inner_dtype, DataType::Struct(_)) { + // Safety + // dtype is known + unsafe { + let array_ref = Arc::from(array_ref); + let mut s = Series::from_chunks_and_dtype_unchecked( + "", + vec![array_ref], + &self.inner_dtype, + ); + // swap the new series with the container + std::mem::swap(&mut *self.series_container, &mut s); + // return a reference to the container + // this lifetime is now bound to 'a + return UnstableSeries::new(&*(&*self.series_container as *const Series)); + } + } + + // update the inner state unsafe { *self.inner.as_mut() = array_ref.into() }; + // Safety // we cannot control the lifetime of an iterators `next` method. // but as long as self is alive the reference to the series container is valid @@ -85,6 +111,7 @@ impl ListChunked { inner: NonNull::new(ptr).unwrap(), lifetime: PhantomData, iter: self.downcast_iter().flat_map(|arr| arr.iter()), + inner_dtype: self.inner_dtype(), } } diff --git a/polars/polars-core/src/chunked_array/list/mod.rs b/polars/polars-core/src/chunked_array/list/mod.rs index 00b6bd1344dc..0d0d20cdec57 100644 --- a/polars/polars-core/src/chunked_array/list/mod.rs +++ b/polars/polars-core/src/chunked_array/list/mod.rs @@ -1,8 +1,5 @@ //! Special list utility methods mod iterator; -#[cfg(feature = "list")] -#[cfg_attr(docsrs, doc(cfg(feature = "list")))] -pub mod namespace; use crate::prelude::*; diff --git a/polars/polars-core/src/chunked_array/ndarray.rs b/polars/polars-core/src/chunked_array/ndarray.rs index 4ab7153ae09a..746e39854b44 100644 --- a/polars/polars-core/src/chunked_array/ndarray.rs +++ b/polars/polars-core/src/chunked_array/ndarray.rs @@ -172,7 +172,8 @@ mod test { let ndarr = ca.to_ndarray()?; assert_eq!(ndarr, ArrayView1::from(&[1.0, 2.0, 3.0])); - let mut builder = ListPrimitiveChunkedBuilder::new("", 10, 10, DataType::Float64); + let mut builder = + ListPrimitiveChunkedBuilder::::new("", 10, 10, DataType::Float64); builder.append_slice(Some(&[1.0, 2.0, 3.0])); builder.append_slice(Some(&[2.0, 4.0, 5.0])); builder.append_slice(Some(&[6.0, 7.0, 8.0])); @@ -183,7 +184,8 @@ mod test { assert_eq!(ndarr, expected); // test list array that is not square - let mut builder = ListPrimitiveChunkedBuilder::new("", 10, 10, DataType::Float64); + let mut builder = + ListPrimitiveChunkedBuilder::::new("", 10, 10, DataType::Float64); builder.append_slice(Some(&[1.0, 2.0, 3.0])); builder.append_slice(Some(&[2.0])); builder.append_slice(Some(&[6.0, 7.0, 8.0])); diff --git a/polars/polars-core/src/chunked_array/ops/full.rs b/polars/polars-core/src/chunked_array/ops/full.rs index 41f79abf447d..a8be315aca86 100644 --- a/polars/polars-core/src/chunked_array/ops/full.rs +++ b/polars/polars-core/src/chunked_array/ops/full.rs @@ -78,11 +78,7 @@ impl ChunkFullNull for ListChunked { } impl ListChunked { - pub(crate) fn full_null_with_dtype( - name: &str, - length: usize, - inner_dtype: &DataType, - ) -> ListChunked { + pub fn full_null_with_dtype(name: &str, length: usize, inner_dtype: &DataType) -> ListChunked { let arr = new_null_array( ArrowDataType::LargeList(Box::new(ArrowField::new( "item", diff --git a/polars/polars-core/src/datatypes.rs b/polars/polars-core/src/datatypes.rs index e74de81b8306..0c7a422301e4 100644 --- a/polars/polars-core/src/datatypes.rs +++ b/polars/polars-core/src/datatypes.rs @@ -141,17 +141,38 @@ pub trait NumericNative: + FromPrimitive + NativeArithmetics { + type POLARSTYPE; +} +impl NumericNative for i8 { + type POLARSTYPE = Int8Type; +} +impl NumericNative for i16 { + type POLARSTYPE = Int16Type; +} +impl NumericNative for i32 { + type POLARSTYPE = Int32Type; +} +impl NumericNative for i64 { + type POLARSTYPE = Int64Type; +} +impl NumericNative for u8 { + type POLARSTYPE = UInt8Type; +} +impl NumericNative for u16 { + type POLARSTYPE = UInt16Type; +} +impl NumericNative for u32 { + type POLARSTYPE = UInt32Type; +} +impl NumericNative for u64 { + type POLARSTYPE = UInt64Type; +} +impl NumericNative for f32 { + type POLARSTYPE = Float32Type; +} +impl NumericNative for f64 { + type POLARSTYPE = Float64Type; } -impl NumericNative for i8 {} -impl NumericNative for i16 {} -impl NumericNative for i32 {} -impl NumericNative for i64 {} -impl NumericNative for u8 {} -impl NumericNative for u16 {} -impl NumericNative for u32 {} -impl NumericNative for u64 {} -impl NumericNative for f32 {} -impl NumericNative for f64 {} pub trait PolarsNumericType: Send + Sync + PolarsDataType + 'static { type Native: NumericNative; @@ -678,6 +699,12 @@ pub enum DataType { Unknown, } +impl Default for DataType { + fn default() -> Self { + DataType::Unknown + } +} + impl Hash for DataType { fn hash(&self, state: &mut H) { std::mem::discriminant(self).hash(state) diff --git a/polars/polars-core/src/fmt.rs b/polars/polars-core/src/fmt.rs index 0c5dc11caaa3..4119dd77be6f 100644 --- a/polars/polars-core/src/fmt.rs +++ b/polars/polars-core/src/fmt.rs @@ -789,7 +789,8 @@ mod test { #[test] fn test_fmt_list() { - let mut builder = ListPrimitiveChunkedBuilder::::new("a", 10, 10, DataType::Int32); + let mut builder = + ListPrimitiveChunkedBuilder::::new("a", 10, 10, DataType::Int32); builder.append_slice(Some(&[1, 2, 3])); builder.append_slice(None); let list = builder.finish().into_series(); diff --git a/polars/polars-core/src/frame/groupby/aggregations/agg_list.rs b/polars/polars-core/src/frame/groupby/aggregations/agg_list.rs new file mode 100644 index 000000000000..629f41c54d8f --- /dev/null +++ b/polars/polars-core/src/frame/groupby/aggregations/agg_list.rs @@ -0,0 +1,390 @@ +use super::*; +#[cfg(feature = "dtype-struct")] +use crate::chunked_array::builder::AnonymousOwnedListBuilder; + +pub trait AggList { + fn agg_list(&self, _groups: &GroupsProxy) -> Series; +} + +impl AggList for ChunkedArray +where + T: PolarsNumericType, + ChunkedArray: IntoSeries, +{ + fn agg_list(&self, groups: &GroupsProxy) -> Series { + let ca = self.rechunk(); + + match groups { + GroupsProxy::Idx(groups) => { + let mut can_fast_explode = true; + + let arr = ca.downcast_iter().next().unwrap(); + let values = arr.values(); + + let mut offsets = Vec::::with_capacity(groups.len() + 1); + let mut length_so_far = 0i64; + offsets.push(length_so_far); + + let mut list_values = Vec::::with_capacity(self.len()); + groups.iter().for_each(|(_, idx)| { + let idx_len = idx.len(); + if idx_len == 0 { + can_fast_explode = false; + } + + length_so_far += idx_len as i64; + // Safety: + // group tuples are in bounds + unsafe { + list_values.extend(idx.iter().map(|idx| { + debug_assert!((*idx as usize) < values.len()); + *values.get_unchecked(*idx as usize) + })); + // Safety: + // we know that offsets has allocated enough slots + offsets.push_unchecked(length_so_far); + } + }); + + let validity = if arr.null_count() > 0 { + let old_validity = arr.validity().unwrap(); + let mut validity = MutableBitmap::from_len_set(list_values.len()); + + let mut count = 0; + groups.iter().for_each(|(_, idx)| unsafe { + for i in idx { + if !old_validity.get_bit_unchecked(*i as usize) { + validity.set_bit_unchecked(count, false) + } + count += 1; + } + }); + Some(validity.into()) + } else { + None + }; + + let array = PrimitiveArray::from_data( + T::get_dtype().to_arrow(), + list_values.into(), + validity, + ); + let data_type = ListArray::::default_datatype(T::get_dtype().to_arrow()); + let arr = + ListArray::::from_data(data_type, offsets.into(), Arc::new(array), None); + + let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]); + if can_fast_explode { + ca.set_fast_explode() + } + ca.into() + } + GroupsProxy::Slice(groups) => { + let mut can_fast_explode = true; + let arr = ca.downcast_iter().next().unwrap(); + let values = arr.values(); + + let mut offsets = Vec::::with_capacity(groups.len() + 1); + let mut length_so_far = 0i64; + offsets.push(length_so_far); + + let mut list_values = Vec::::with_capacity(self.len()); + groups.iter().for_each(|&[first, len]| { + if len == 0 { + can_fast_explode = false; + } + + length_so_far += len as i64; + list_values.extend_from_slice(&values[first as usize..(first + len) as usize]); + unsafe { + // Safety: + // we know that offsets has allocated enough slots + offsets.push_unchecked(length_so_far); + } + }); + + let validity = if arr.null_count() > 0 { + let old_validity = arr.validity().unwrap(); + let mut validity = MutableBitmap::from_len_set(list_values.len()); + + let mut count = 0; + groups.iter().for_each(|[first, len]| unsafe { + for i in *first..(*first + *len) { + if !old_validity.get_bit_unchecked(i as usize) { + validity.set_bit_unchecked(count, false) + } + count += 1; + } + }); + Some(validity.into()) + } else { + None + }; + + let array = PrimitiveArray::from_data( + T::get_dtype().to_arrow(), + list_values.into(), + validity, + ); + let data_type = ListArray::::default_datatype(T::get_dtype().to_arrow()); + let arr = + ListArray::::from_data(data_type, offsets.into(), Arc::new(array), None); + let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]); + if can_fast_explode { + ca.set_fast_explode() + } + ca.into() + } + } + } +} + +impl AggList for BooleanChunked { + fn agg_list(&self, groups: &GroupsProxy) -> Series { + match groups { + GroupsProxy::Idx(groups) => { + let mut builder = + ListBooleanChunkedBuilder::new(self.name(), groups.len(), self.len()); + for idx in groups.all().iter() { + let ca = unsafe { self.take_unchecked(idx.into()) }; + builder.append(&ca) + } + builder.finish().into_series() + } + GroupsProxy::Slice(groups) => { + let mut builder = + ListBooleanChunkedBuilder::new(self.name(), groups.len(), self.len()); + for [first, len] in groups { + let ca = self.slice(*first as i64, *len as usize); + builder.append(&ca) + } + builder.finish().into_series() + } + } + } +} + +impl AggList for Utf8Chunked { + fn agg_list(&self, groups: &GroupsProxy) -> Series { + match groups { + GroupsProxy::Idx(groups) => { + let mut builder = + ListUtf8ChunkedBuilder::new(self.name(), groups.len(), self.len()); + for idx in groups.all().iter() { + let ca = unsafe { self.take_unchecked(idx.into()) }; + builder.append(&ca) + } + builder.finish().into_series() + } + GroupsProxy::Slice(groups) => { + let mut builder = + ListUtf8ChunkedBuilder::new(self.name(), groups.len(), self.len()); + for [first, len] in groups { + let ca = self.slice(*first as i64, *len as usize); + builder.append(&ca) + } + builder.finish().into_series() + } + } + } +} + +fn agg_list_list, &mut i64, &mut Vec) -> bool>( + ca: &ListChunked, + groups_len: usize, + func: F, +) -> Series { + let can_fast_explode = true; + let mut offsets = Vec::::with_capacity(groups_len + 1); + let mut length_so_far = 0i64; + offsets.push(length_so_far); + + let mut list_values = Vec::with_capacity(groups_len); + + let can_fast_explode = func( + ca, + can_fast_explode, + &mut offsets, + &mut length_so_far, + &mut list_values, + ); + if groups_len == 0 { + list_values.push(ca.chunks[0].slice(0, 0).into()) + } + let arrays = list_values.iter().map(|arr| &**arr).collect::>(); + let list_values: ArrayRef = arrow::compute::concatenate::concatenate(&arrays) + .unwrap() + .into(); + let data_type = ListArray::::default_datatype(list_values.data_type().clone()); + let arr = Arc::new(ListArray::::from_data( + data_type, + offsets.into(), + list_values, + None, + )) as ArrayRef; + let mut listarr = ListChunked::from_chunks(ca.name(), vec![arr]); + if can_fast_explode { + listarr.set_fast_explode() + } + listarr.into_series() +} + +impl AggList for ListChunked { + fn agg_list(&self, groups: &GroupsProxy) -> Series { + match groups { + GroupsProxy::Idx(groups) => { + let func = |ca: &ListChunked, + mut can_fast_explode: bool, + offsets: &mut Vec, + length_so_far: &mut i64, + list_values: &mut Vec| { + groups.iter().for_each(|(_, idx)| { + let idx_len = idx.len(); + if idx_len == 0 { + can_fast_explode = false; + } + + *length_so_far += idx_len as i64; + // Safety: + // group tuples are in bounds + unsafe { + let mut s = ca.take_unchecked(idx.into()); + let arr = s.chunks.pop().unwrap(); + list_values.push(arr); + + // Safety: + // we know that offsets has allocated enough slots + offsets.push_unchecked(*length_so_far); + } + }); + can_fast_explode + }; + + agg_list_list(self, groups.len(), func) + } + GroupsProxy::Slice(groups) => { + let func = |ca: &ListChunked, + mut can_fast_explode: bool, + offsets: &mut Vec, + length_so_far: &mut i64, + list_values: &mut Vec| { + groups.iter().for_each(|&[first, len]| { + if len == 0 { + can_fast_explode = false; + } + + *length_so_far += len as i64; + let mut s = ca.slice(first as i64, len as usize); + let arr = s.chunks.pop().unwrap(); + list_values.push(arr); + + unsafe { + // Safety: + // we know that offsets has allocated enough slots + offsets.push_unchecked(*length_so_far); + } + }); + can_fast_explode + }; + + agg_list_list(self, groups.len(), func) + } + } + } +} + +#[cfg(feature = "object")] +impl AggList for ObjectChunked { + fn agg_list(&self, groups: &GroupsProxy) -> Series { + let mut can_fast_explode = true; + let mut offsets = Vec::::with_capacity(groups.len() + 1); + let mut length_so_far = 0i64; + offsets.push(length_so_far); + + // we know that iterators length + let iter = unsafe { + groups + .iter() + .flat_map(|indicator| { + let (group_vals, len) = match indicator { + GroupsIndicator::Idx((_first, idx)) => { + // Safety: + // group tuples always in bounds + let group_vals = self.take_unchecked(idx.into()); + + (group_vals, idx.len() as IdxSize) + } + GroupsIndicator::Slice([first, len]) => { + let group_vals = slice_from_offsets(self, first, len); + + (group_vals, len) + } + }; + + if len == 0 { + can_fast_explode = false; + } + length_so_far += len as i64; + // Safety: + // we know that offsets has allocated enough slots + offsets.push_unchecked(length_so_far); + + let arr = group_vals.downcast_iter().next().unwrap().clone(); + arr.into_iter_cloned() + }) + .trust_my_length(self.len()) + }; + + let mut pe = create_extension(iter); + + // Safety: + // this is safe because we just created the PolarsExtension + // meaning that the sentinel is heap allocated and the dereference of the + // pointer does not fail + unsafe { pe.set_to_series_fn::() }; + let extension_array = Arc::new(pe.take_and_forget()) as ArrayRef; + let extension_dtype = extension_array.data_type(); + + let data_type = ListArray::::default_datatype(extension_dtype.clone()); + let arr = Arc::new(ListArray::::from_data( + data_type, + offsets.into(), + extension_array, + None, + )) as ArrayRef; + + let mut listarr = ListChunked::from_chunks(self.name(), vec![arr]); + if can_fast_explode { + listarr.set_fast_explode() + } + listarr.into_series() + } +} + +#[cfg(feature = "dtype-struct")] +impl AggList for StructChunked { + fn agg_list(&self, groups: &GroupsProxy) -> Series { + let s = self.clone().into_series(); + match groups { + GroupsProxy::Idx(groups) => { + let mut builder = + AnonymousOwnedListBuilder::new(self.name(), groups.len(), self.dtype().clone()); + for idx in groups.all().iter() { + let taken = + unsafe { s.take_iter_unchecked(&mut idx.iter().map(|i| *i as usize)) }; + builder.append_series(&taken) + } + builder.finish().into_series() + } + GroupsProxy::Slice(groups) => { + let mut builder = + AnonymousOwnedListBuilder::new(self.name(), groups.len(), self.dtype().clone()); + for [first, len] in groups { + let taken = s.slice(*first as i64, *len as usize); + builder.append_series(&taken) + } + builder.finish().into_series() + } + } + } +} diff --git a/polars/polars-core/src/frame/groupby/aggregations.rs b/polars/polars-core/src/frame/groupby/aggregations/mod.rs similarity index 69% rename from polars/polars-core/src/frame/groupby/aggregations.rs rename to polars/polars-core/src/frame/groupby/aggregations/mod.rs index 6ef08c84ebec..75f52337a5e7 100644 --- a/polars/polars-core/src/frame/groupby/aggregations.rs +++ b/polars/polars-core/src/frame/groupby/aggregations/mod.rs @@ -1,4 +1,7 @@ +mod agg_list; + use crate::POOL; +pub use agg_list::*; use arrow::bitmap::MutableBitmap; use num::{Bounded, Num, NumCast, ToPrimitive, Zero}; use rayon::prelude::*; @@ -748,362 +751,3 @@ where } impl ChunkedArray where ChunkedArray: ChunkTake + IntoSeries {} - -pub trait AggList { - fn agg_list(&self, _groups: &GroupsProxy) -> Series; -} - -impl AggList for ChunkedArray -where - T: PolarsNumericType, - ChunkedArray: IntoSeries, -{ - fn agg_list(&self, groups: &GroupsProxy) -> Series { - let ca = self.rechunk(); - - match groups { - GroupsProxy::Idx(groups) => { - let mut can_fast_explode = true; - - let arr = ca.downcast_iter().next().unwrap(); - let values = arr.values(); - - let mut offsets = Vec::::with_capacity(groups.len() + 1); - let mut length_so_far = 0i64; - offsets.push(length_so_far); - - let mut list_values = Vec::::with_capacity(self.len()); - groups.iter().for_each(|(_, idx)| { - let idx_len = idx.len(); - if idx_len == 0 { - can_fast_explode = false; - } - - length_so_far += idx_len as i64; - // Safety: - // group tuples are in bounds - unsafe { - list_values.extend(idx.iter().map(|idx| { - debug_assert!((*idx as usize) < values.len()); - *values.get_unchecked(*idx as usize) - })); - // Safety: - // we know that offsets has allocated enough slots - offsets.push_unchecked(length_so_far); - } - }); - - let validity = if arr.null_count() > 0 { - let old_validity = arr.validity().unwrap(); - let mut validity = MutableBitmap::from_len_set(list_values.len()); - - let mut count = 0; - groups.iter().for_each(|(_, idx)| unsafe { - for i in idx { - if !old_validity.get_bit_unchecked(*i as usize) { - validity.set_bit_unchecked(count, false) - } - count += 1; - } - }); - Some(validity.into()) - } else { - None - }; - - let array = PrimitiveArray::from_data( - T::get_dtype().to_arrow(), - list_values.into(), - validity, - ); - let data_type = ListArray::::default_datatype(T::get_dtype().to_arrow()); - let arr = - ListArray::::from_data(data_type, offsets.into(), Arc::new(array), None); - - let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]); - if can_fast_explode { - ca.set_fast_explode() - } - ca.into() - } - GroupsProxy::Slice(groups) => { - let mut can_fast_explode = true; - let arr = ca.downcast_iter().next().unwrap(); - let values = arr.values(); - - let mut offsets = Vec::::with_capacity(groups.len() + 1); - let mut length_so_far = 0i64; - offsets.push(length_so_far); - - let mut list_values = Vec::::with_capacity(self.len()); - groups.iter().for_each(|&[first, len]| { - if len == 0 { - can_fast_explode = false; - } - - length_so_far += len as i64; - list_values.extend_from_slice(&values[first as usize..(first + len) as usize]); - unsafe { - // Safety: - // we know that offsets has allocated enough slots - offsets.push_unchecked(length_so_far); - } - }); - - let validity = if arr.null_count() > 0 { - let old_validity = arr.validity().unwrap(); - let mut validity = MutableBitmap::from_len_set(list_values.len()); - - let mut count = 0; - groups.iter().for_each(|[first, len]| unsafe { - for i in *first..(*first + *len) { - if !old_validity.get_bit_unchecked(i as usize) { - validity.set_bit_unchecked(count, false) - } - count += 1; - } - }); - Some(validity.into()) - } else { - None - }; - - let array = PrimitiveArray::from_data( - T::get_dtype().to_arrow(), - list_values.into(), - validity, - ); - let data_type = ListArray::::default_datatype(T::get_dtype().to_arrow()); - let arr = - ListArray::::from_data(data_type, offsets.into(), Arc::new(array), None); - let mut ca = ListChunked::from_chunks(self.name(), vec![Arc::new(arr)]); - if can_fast_explode { - ca.set_fast_explode() - } - ca.into() - } - } - } -} - -impl AggList for BooleanChunked { - fn agg_list(&self, groups: &GroupsProxy) -> Series { - match groups { - GroupsProxy::Idx(groups) => { - let mut builder = - ListBooleanChunkedBuilder::new(self.name(), groups.len(), self.len()); - for idx in groups.all().iter() { - let ca = unsafe { self.take_unchecked(idx.into()) }; - builder.append(&ca) - } - builder.finish().into_series() - } - GroupsProxy::Slice(groups) => { - let mut builder = - ListBooleanChunkedBuilder::new(self.name(), groups.len(), self.len()); - for [first, len] in groups { - let ca = self.slice(*first as i64, *len as usize); - builder.append(&ca) - } - builder.finish().into_series() - } - } - } -} - -impl AggList for Utf8Chunked { - fn agg_list(&self, groups: &GroupsProxy) -> Series { - match groups { - GroupsProxy::Idx(groups) => { - let mut builder = - ListUtf8ChunkedBuilder::new(self.name(), groups.len(), self.len()); - for idx in groups.all().iter() { - let ca = unsafe { self.take_unchecked(idx.into()) }; - builder.append(&ca) - } - builder.finish().into_series() - } - GroupsProxy::Slice(groups) => { - let mut builder = - ListUtf8ChunkedBuilder::new(self.name(), groups.len(), self.len()); - for [first, len] in groups { - let ca = self.slice(*first as i64, *len as usize); - builder.append(&ca) - } - builder.finish().into_series() - } - } - } -} - -fn agg_list_list, &mut i64, &mut Vec) -> bool>( - ca: &ListChunked, - groups_len: usize, - func: F, -) -> Series { - let can_fast_explode = true; - let mut offsets = Vec::::with_capacity(groups_len + 1); - let mut length_so_far = 0i64; - offsets.push(length_so_far); - - let mut list_values = Vec::with_capacity(groups_len); - - let can_fast_explode = func( - ca, - can_fast_explode, - &mut offsets, - &mut length_so_far, - &mut list_values, - ); - if groups_len == 0 { - list_values.push(ca.chunks[0].slice(0, 0).into()) - } - let arrays = list_values.iter().map(|arr| &**arr).collect::>(); - let list_values: ArrayRef = arrow::compute::concatenate::concatenate(&arrays) - .unwrap() - .into(); - let data_type = ListArray::::default_datatype(list_values.data_type().clone()); - let arr = Arc::new(ListArray::::from_data( - data_type, - offsets.into(), - list_values, - None, - )) as ArrayRef; - let mut listarr = ListChunked::from_chunks(ca.name(), vec![arr]); - if can_fast_explode { - listarr.set_fast_explode() - } - listarr.into_series() -} - -impl AggList for ListChunked { - fn agg_list(&self, groups: &GroupsProxy) -> Series { - match groups { - GroupsProxy::Idx(groups) => { - let func = |ca: &ListChunked, - mut can_fast_explode: bool, - offsets: &mut Vec, - length_so_far: &mut i64, - list_values: &mut Vec| { - groups.iter().for_each(|(_, idx)| { - let idx_len = idx.len(); - if idx_len == 0 { - can_fast_explode = false; - } - - *length_so_far += idx_len as i64; - // Safety: - // group tuples are in bounds - unsafe { - let mut s = ca.take_unchecked(idx.into()); - let arr = s.chunks.pop().unwrap(); - list_values.push(arr); - - // Safety: - // we know that offsets has allocated enough slots - offsets.push_unchecked(*length_so_far); - } - }); - can_fast_explode - }; - - agg_list_list(self, groups.len(), func) - } - GroupsProxy::Slice(groups) => { - let func = |ca: &ListChunked, - mut can_fast_explode: bool, - offsets: &mut Vec, - length_so_far: &mut i64, - list_values: &mut Vec| { - groups.iter().for_each(|&[first, len]| { - if len == 0 { - can_fast_explode = false; - } - - *length_so_far += len as i64; - let mut s = ca.slice(first as i64, len as usize); - let arr = s.chunks.pop().unwrap(); - list_values.push(arr); - - unsafe { - // Safety: - // we know that offsets has allocated enough slots - offsets.push_unchecked(*length_so_far); - } - }); - can_fast_explode - }; - - agg_list_list(self, groups.len(), func) - } - } - } -} - -#[cfg(feature = "object")] -impl AggList for ObjectChunked { - fn agg_list(&self, groups: &GroupsProxy) -> Series { - let mut can_fast_explode = true; - let mut offsets = Vec::::with_capacity(groups.len() + 1); - let mut length_so_far = 0i64; - offsets.push(length_so_far); - - // we know that iterators length - let iter = unsafe { - groups - .iter() - .flat_map(|indicator| { - let (group_vals, len) = match indicator { - GroupsIndicator::Idx((_first, idx)) => { - // Safety: - // group tuples always in bounds - let group_vals = self.take_unchecked(idx.into()); - - (group_vals, idx.len() as IdxSize) - } - GroupsIndicator::Slice([first, len]) => { - let group_vals = slice_from_offsets(self, first, len); - - (group_vals, len) - } - }; - - if len == 0 { - can_fast_explode = false; - } - length_so_far += len as i64; - // Safety: - // we know that offsets has allocated enough slots - offsets.push_unchecked(length_so_far); - - let arr = group_vals.downcast_iter().next().unwrap().clone(); - arr.into_iter_cloned() - }) - .trust_my_length(self.len()) - }; - - let mut pe = create_extension(iter); - - // Safety: - // this is safe because we just created the PolarsExtension - // meaning that the sentinel is heap allocated and the dereference of the - // pointer does not fail - unsafe { pe.set_to_series_fn::() }; - let extension_array = Arc::new(pe.take_and_forget()) as ArrayRef; - let extension_dtype = extension_array.data_type(); - - let data_type = ListArray::::default_datatype(extension_dtype.clone()); - let arr = Arc::new(ListArray::::from_data( - data_type, - offsets.into(), - extension_array, - None, - )) as ArrayRef; - - let mut listarr = ListChunked::from_chunks(self.name(), vec![arr]); - if can_fast_explode { - listarr.set_fast_explode() - } - listarr.into_series() - } -} diff --git a/polars/polars-core/src/series/implementations/struct_.rs b/polars/polars-core/src/series/implementations/struct_.rs index 3fc3d9484daa..4b583debd4d6 100644 --- a/polars/polars-core/src/series/implementations/struct_.rs +++ b/polars/polars-core/src/series/implementations/struct_.rs @@ -46,13 +46,7 @@ impl private::PrivateSeries for SeriesWrap { } fn agg_list(&self, groups: &GroupsProxy) -> Series { - let fields = self - .0 - .fields() - .iter() - .map(|s| s.agg_list(groups)) - .collect::>(); - StructChunked::new_unchecked(self.name(), &fields).into_series() + self.0.agg_list(groups) } fn group_tuples(&self, multithreaded: bool, sorted: bool) -> GroupsProxy { diff --git a/polars/polars-core/src/series/ops/to_list.rs b/polars/polars-core/src/series/ops/to_list.rs index 7fa223f7bbe4..09f4eb954d13 100644 --- a/polars/polars-core/src/series/ops/to_list.rs +++ b/polars/polars-core/src/series/ops/to_list.rs @@ -4,11 +4,17 @@ use polars_arrow::kernels::list::array_to_unit_list; use std::borrow::Cow; fn reshape_fast_path(name: &str, s: &Series) -> Series { - let chunks = s - .chunks() - .iter() - .map(|arr| Arc::new(array_to_unit_list(arr.clone())) as ArrayRef) - .collect::>(); + let chunks = match s.dtype() { + #[cfg(feature = "dtype-struct")] + DataType::Struct(_) => { + vec![Arc::new(array_to_unit_list(s.array_ref(0).clone())) as ArrayRef] + } + _ => s + .chunks() + .iter() + .map(|arr| Arc::new(array_to_unit_list(arr.clone())) as ArrayRef) + .collect::>(), + }; let mut ca = ListChunked::from_chunks(name, chunks); ca.set_fast_explode(); diff --git a/polars/polars-io/Cargo.toml b/polars/polars-io/Cargo.toml index a4d17a096683..cf753db8d6f1 100644 --- a/polars/polars-io/Cargo.toml +++ b/polars/polars-io/Cargo.toml @@ -35,9 +35,10 @@ private = ["polars-time/private"] [dependencies] ahash = "0.7" anyhow = "1.0" -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", default-features = false } +arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "aafba7b4eb4991e016638cbc1d4df676912e8236", default-features = false } # arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false } # arrow = { package = "arrow2", version = "0.11", default-features = false } +# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false } csv-core = { version = "0.1.10", optional = true } dirs = "4.0" flate2 = { version = "1", optional = true, default-features = false } diff --git a/polars/polars-lazy/Cargo.toml b/polars/polars-lazy/Cargo.toml index d9b4865dbc6f..7de55448d4c1 100644 --- a/polars/polars-lazy/Cargo.toml +++ b/polars/polars-lazy/Cargo.toml @@ -50,10 +50,10 @@ cum_agg = ["polars-core/cum_agg"] interpolate = ["polars-core/interpolate"] rolling_window = ["polars-core/rolling_window"] rank = ["polars-core/rank"] -diff = ["polars-core/diff"] +diff = ["polars-core/diff", "polars-ops/diff"] pct_change = ["polars-core/pct_change"] moment = ["polars-core/moment"] -list = ["polars-core/list"] +list = ["polars-ops/list"] abs = ["polars-core/abs"] random = ["polars-core/random"] dynamic_groupby = ["polars-core/dynamic_groupby"] diff --git a/polars/polars-lazy/src/dsl/functions.rs b/polars/polars-lazy/src/dsl/functions.rs index 7351714d902d..ed1b8dd704ec 100644 --- a/polars/polars-lazy/src/dsl/functions.rs +++ b/polars/polars-lazy/src/dsl/functions.rs @@ -8,6 +8,8 @@ use polars_core::export::arrow::temporal_conversions::NANOSECONDS; use polars_core::prelude::*; use polars_core::utils::arrow::temporal_conversions::SECONDS_IN_DAY; use polars_core::utils::get_supertype; +#[cfg(feature = "list")] +use polars_ops::prelude::ListNameSpaceImpl; use rayon::prelude::*; use std::ops::{BitAnd, BitOr}; @@ -273,7 +275,7 @@ pub fn arange(low: Expr, high: Expr, step: usize) -> Expr { let sb = sb.cast(&DataType::Int64)?; let low = sa.i64()?; let high = sb.i64()?; - let mut builder = ListPrimitiveChunkedBuilder::::new( + let mut builder = ListPrimitiveChunkedBuilder::::new( "arange", low.len(), low.len() * 3, diff --git a/polars/polars-lazy/src/tests/aggregations.rs b/polars/polars-lazy/src/tests/aggregations.rs index af78130db13f..aca575fbf3e2 100644 --- a/polars/polars-lazy/src/tests/aggregations.rs +++ b/polars/polars-lazy/src/tests/aggregations.rs @@ -1,4 +1,5 @@ use super::*; +use polars_ops::prelude::ListNameSpaceImpl; #[test] fn test_agg_exprs() -> Result<()> { diff --git a/polars/polars-lazy/src/tests/queries.rs b/polars/polars-lazy/src/tests/queries.rs index 62e0fa568ee9..36440bd4838c 100644 --- a/polars/polars-lazy/src/tests/queries.rs +++ b/polars/polars-lazy/src/tests/queries.rs @@ -1797,7 +1797,8 @@ fn test_groupby_on_lists() -> Result<()> { let s0 = Series::new("", [1i32, 2, 3]); let s1 = Series::new("groups", [4i32, 5]); - let mut builder = ListPrimitiveChunkedBuilder::::new("arrays", 10, 10, DataType::Int32); + let mut builder = + ListPrimitiveChunkedBuilder::::new("arrays", 10, 10, DataType::Int32); builder.append_series(&s0); builder.append_series(&s1); let s2 = builder.finish().into_series(); diff --git a/polars/polars-ops/Cargo.toml b/polars/polars-ops/Cargo.toml index 2496b285bdaa..f4befc74d376 100644 --- a/polars/polars-ops/Cargo.toml +++ b/polars/polars-ops/Cargo.toml @@ -10,6 +10,7 @@ description = "More operations on polars data structures" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +polars-arrow = { version = "0.21.1", path = "../polars-arrow", default-features = false } polars-core = { version = "0.21.1", path = "../polars-core", features = ["private"], default-features = false } [features] @@ -22,4 +23,6 @@ dtype-struct = ["polars-core/dtype-struct"] dtype-u8 = ["polars-core/dtype-u8"] object = ["polars-core/object"] to_dummies = [] -list_to_struct = ["polars-core/list", "polars-core/dtype-struct"] +list_to_struct = ["polars-core/dtype-struct", "list"] +list = [] +diff = [] diff --git a/polars/polars-ops/src/chunked_array/list/mod.rs b/polars/polars-ops/src/chunked_array/list/mod.rs index 9b5219947506..0cc5ce8d4ccd 100644 --- a/polars/polars-ops/src/chunked_array/list/mod.rs +++ b/polars/polars-ops/src/chunked_array/list/mod.rs @@ -1,8 +1,13 @@ use polars_core::prelude::*; +#[cfg(feature = "list")] +#[cfg_attr(docsrs, doc(cfg(feature = "list")))] +mod namespace; #[cfg(feature = "list_to_struct")] mod to_struct; +#[cfg(feature = "list")] +pub use namespace::*; #[cfg(feature = "list_to_struct")] pub use to_struct::*; diff --git a/polars/polars-core/src/chunked_array/list/namespace.rs b/polars/polars-ops/src/chunked_array/list/namespace.rs similarity index 68% rename from polars/polars-core/src/chunked_array/list/namespace.rs rename to polars/polars-ops/src/chunked_array/list/namespace.rs index f68bb7c523a9..ed63876690d4 100644 --- a/polars/polars-core/src/chunked_array/list/namespace.rs +++ b/polars/polars-ops/src/chunked_array/list/namespace.rs @@ -1,8 +1,9 @@ -use crate::chunked_array::builder::get_list_builder; -use crate::prelude::*; -use crate::series::ops::NullBehavior; +use super::*; use polars_arrow::kernels::list::sublist_get; use polars_arrow::prelude::ValueSize; +use polars_core::chunked_array::builder::get_list_builder; +use polars_core::series::ops::NullBehavior; +use polars_core::utils::CustomIterTools; use std::convert::TryFrom; use std::fmt::Write; @@ -52,22 +53,23 @@ fn cast_rhs( Ok(()) } -impl ListChunked { +pub trait ListNameSpaceImpl: AsList { /// In case the inner dtype [`DataType::Utf8`], the individual items will be joined into a /// single string separated by `separator`. - pub fn lst_join(&self, separator: &str) -> Result { - match self.inner_dtype() { + fn lst_join(&self, separator: &str) -> Result { + let ca = self.as_list(); + match ca.inner_dtype() { DataType::Utf8 => { // used to amortize heap allocs let mut buf = String::with_capacity(128); let mut builder = Utf8ChunkedBuilder::new( - self.name(), - self.len(), - self.get_values_size() + separator.len() * self.len(), + ca.name(), + ca.len(), + ca.get_values_size() + separator.len() * ca.len(), ); - self.amortized_iter().for_each(|opt_s| { + ca.amortized_iter().for_each(|opt_s| { let opt_val = opt_s.map(|s| { // make sure that we don't write values of previous iteration buf.clear(); @@ -97,82 +99,95 @@ impl ListChunked { } } - pub fn lst_max(&self) -> Series { - self.apply_amortized(|s| s.as_ref().max_as_series()) + fn lst_max(&self) -> Series { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().max_as_series()) .explode() .unwrap() .into_series() } - pub fn lst_min(&self) -> Series { - self.apply_amortized(|s| s.as_ref().min_as_series()) + fn lst_min(&self) -> Series { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().min_as_series()) .explode() .unwrap() .into_series() } - pub fn lst_sum(&self) -> Series { - self.apply_amortized(|s| s.as_ref().sum_as_series()) + fn lst_sum(&self) -> Series { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().sum_as_series()) .explode() .unwrap() .into_series() } - pub fn lst_mean(&self) -> Float64Chunked { - self.amortized_iter() + fn lst_mean(&self) -> Float64Chunked { + let ca = self.as_list(); + ca.amortized_iter() .map(|s| s.and_then(|s| s.as_ref().mean())) .collect() } #[must_use] - pub fn lst_sort(&self, reverse: bool) -> ListChunked { - self.apply_amortized(|s| s.as_ref().sort(reverse)) + fn lst_sort(&self, reverse: bool) -> ListChunked { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().sort(reverse)) } #[must_use] - pub fn lst_reverse(&self) -> ListChunked { - self.apply_amortized(|s| s.as_ref().reverse()) + fn lst_reverse(&self) -> ListChunked { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().reverse()) } - pub fn lst_unique(&self) -> Result { - self.try_apply_amortized(|s| s.as_ref().unique()) + fn lst_unique(&self) -> Result { + let ca = self.as_list(); + ca.try_apply_amortized(|s| s.as_ref().unique()) } - pub fn lst_arg_min(&self) -> IdxCa { - let mut out: IdxCa = self + fn lst_arg_min(&self) -> IdxCa { + let ca = self.as_list(); + let mut out: IdxCa = ca .amortized_iter() .map(|opt_s| opt_s.and_then(|s| s.as_ref().arg_min().map(|idx| idx as IdxSize))) .collect_trusted(); - out.rename(self.name()); + out.rename(ca.name()); out } - pub fn lst_arg_max(&self) -> IdxCa { - let mut out: IdxCa = self + fn lst_arg_max(&self) -> IdxCa { + let ca = self.as_list(); + let mut out: IdxCa = ca .amortized_iter() .map(|opt_s| opt_s.and_then(|s| s.as_ref().arg_max().map(|idx| idx as IdxSize))) .collect_trusted(); - out.rename(self.name()); + out.rename(ca.name()); out } #[cfg(feature = "diff")] #[cfg_attr(docsrs, doc(cfg(feature = "diff")))] - pub fn lst_diff(&self, n: usize, null_behavior: NullBehavior) -> ListChunked { - self.apply_amortized(|s| s.as_ref().diff(n, null_behavior)) + fn lst_diff(&self, n: usize, null_behavior: NullBehavior) -> ListChunked { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().diff(n, null_behavior)) } - pub fn lst_shift(&self, periods: i64) -> ListChunked { - self.apply_amortized(|s| s.as_ref().shift(periods)) + fn lst_shift(&self, periods: i64) -> ListChunked { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().shift(periods)) } - pub fn lst_slice(&self, offset: i64, length: usize) -> ListChunked { - self.apply_amortized(|s| s.as_ref().slice(offset, length)) + fn lst_slice(&self, offset: i64, length: usize) -> ListChunked { + let ca = self.as_list(); + ca.apply_amortized(|s| s.as_ref().slice(offset, length)) } - pub fn lst_lengths(&self) -> IdxCa { - let mut lengths = Vec::with_capacity(self.len()); - self.downcast_iter().for_each(|arr| { + fn lst_lengths(&self) -> IdxCa { + let ca = self.as_list(); + let mut lengths = Vec::with_capacity(ca.len()); + ca.downcast_iter().for_each(|arr| { let offsets = arr.offsets().as_slice(); let mut last = offsets[0]; for o in &offsets[1..] { @@ -180,31 +195,34 @@ impl ListChunked { last = *o; } }); - IdxCa::from_vec(self.name(), lengths) + IdxCa::from_vec(ca.name(), lengths) } /// Get the value by index in the sublists. /// So index `0` would return the first item of every sublist /// and index `-1` would return the last item of every sublist /// if an index is out of bounds, it will return a `None`. - pub fn lst_get(&self, idx: i64) -> Result { - let chunks = self + fn lst_get(&self, idx: i64) -> Result { + let ca = self.as_list(); + let chunks = ca .downcast_iter() .map(|arr| sublist_get(arr, idx)) .collect::>(); - Series::try_from((self.name(), chunks)) + Series::try_from((ca.name(), chunks)) } - pub fn lst_concat(&self, other: &[Series]) -> Result { + fn lst_concat(&self, other: &[Series]) -> Result { + let ca = self.as_list(); let other_len = other.len(); - let length = self.len(); + let length = ca.len(); let mut other = other.to_vec(); - let dtype = self.dtype(); - let inner_type = self.inner_dtype(); + let dtype = ca.dtype(); + dbg!(&ca, ca.dtype()); + let inner_type = ca.inner_dtype(); // broadcasting path in case all unit length // this path will not expand the series, so saves memory - if other.iter().all(|s| s.len() == 1) && self.len() != 1 { + if other.iter().all(|s| s.len() == 1) && ca.len() != 1 { cast_rhs(&mut other, &inner_type, dtype, length, false)?; let to_append = other .iter() @@ -215,7 +233,11 @@ impl ListChunked { .collect::>(); // there was a None, so all values will be None if to_append.len() != other_len { - return Ok(Self::full_null_with_dtype(self.name(), length, &inner_type)); + return Ok(ListChunked::full_null_with_dtype( + ca.name(), + length, + &inner_type, + )); } let vals_size_other = other @@ -225,11 +247,11 @@ impl ListChunked { let mut builder = get_list_builder( &inner_type, - self.get_values_size() + vals_size_other + 1, + ca.get_values_size() + vals_size_other + 1, length, - self.name(), + ca.name(), )?; - self.into_iter().for_each(|opt_s| { + ca.into_iter().for_each(|opt_s| { let opt_s = opt_s.map(|mut s| { for append in &to_append { s.append(append).unwrap(); @@ -252,15 +274,15 @@ impl ListChunked { for s in other.iter_mut() { iters.push(s.list()?.amortized_iter()) } - let mut first_iter = self.into_iter(); + let mut first_iter = ca.into_iter(); let mut builder = get_list_builder( &inner_type, - self.get_values_size() + vals_size_other + 1, + ca.get_values_size() + vals_size_other + 1, length, - self.name(), + ca.name(), )?; - for _ in 0..self.len() { + for _ in 0..ca.len() { let mut acc = match first_iter.next().unwrap() { Some(s) => s, None => { @@ -294,3 +316,5 @@ impl ListChunked { } } } + +impl ListNameSpaceImpl for ListChunked {} diff --git a/polars/polars-time/Cargo.toml b/polars/polars-time/Cargo.toml index ad9fe725f334..cf586d2068ed 100644 --- a/polars/polars-time/Cargo.toml +++ b/polars/polars-time/Cargo.toml @@ -9,16 +9,15 @@ description = "Time related code for the polars dataframe library" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "826a2b8ed8598a614c5df9115ea657d1e3c40184", default-features = false } chrono = "0.4" lexical = { version = "6", default-features = false, features = ["std", "parse-floats", "parse-integers"] } -polars-arrow = { version = "0.21.1", path = "../polars-arrow", features = ["compute"] } +polars-arrow = { version = "0.21.1", path = "../polars-arrow", features = ["compute", "temporal"] } polars-core = { version = "0.21.1", path = "../polars-core", default-features = false, features = ["private", "dtype-datetime", "dtype-duration", "dtype-time", "dtype-date"] } serde = { version = "1", features = ["derive"], optional = true } [features] dtype-date = ["polars-core/dtype-date", "polars-core/temporal"] -dtype-datetime = ["polars-core/dtype-date", "polars-core/temporal", "arrow/compute_temporal"] +dtype-datetime = ["polars-core/dtype-date", "polars-core/temporal"] dtype-time = ["polars-core/dtype-time", "polars-core/temporal"] dtype-duration = ["polars-core/dtype-duration", "polars-core/temporal"] private = [] diff --git a/polars/polars-time/src/chunkedarray/datetime.rs b/polars/polars-time/src/chunkedarray/datetime.rs index b7ce12e4f4b5..e3bc7db0486e 100644 --- a/polars/polars-time/src/chunkedarray/datetime.rs +++ b/polars/polars-time/src/chunkedarray/datetime.rs @@ -3,6 +3,7 @@ use arrow::array::{Array, ArrayRef, PrimitiveArray}; use arrow::compute::cast::CastOptions; use arrow::compute::{cast::cast, temporal}; use arrow::error::Result as ArrowResult; +use polars_arrow::export::arrow; use polars_core::prelude::*; fn cast_and_apply< diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index 96b0648e3e63..4579df434d1b 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -74,7 +74,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.11.2" -source = "git+https://github.com/jorgecarleitao/arrow2?rev=826a2b8ed8598a614c5df9115ea657d1e3c40184#826a2b8ed8598a614c5df9115ea657d1e3c40184" +source = "git+https://github.com/jorgecarleitao/arrow2?rev=aafba7b4eb4991e016638cbc1d4df676912e8236#aafba7b4eb4991e016638cbc1d4df676912e8236" dependencies = [ "arrow-format", "avro-schema", @@ -1218,6 +1218,7 @@ dependencies = [ name = "polars-ops" version = "0.21.1" dependencies = [ + "polars-arrow", "polars-core", ] @@ -1225,7 +1226,6 @@ dependencies = [ name = "polars-time" version = "0.21.1" dependencies = [ - "arrow2", "chrono", "lexical", "polars-arrow", diff --git a/py-polars/polars/internals/construction.py b/py-polars/polars/internals/construction.py index dd512befe1dc..fa34317f8227 100644 --- a/py-polars/polars/internals/construction.py +++ b/py-polars/polars/internals/construction.py @@ -131,6 +131,18 @@ def _get_first_non_none(values: Sequence[Optional[Any]]) -> Any: return next((v for v in values if v is not None), None) +def sequence_from_anyvalue_or_object(name: str, values: Sequence[Any]) -> "PySeries": + """ + Last resort conversion. AnyValues are most flexible and if they fail we go for object types + """ + + try: + return PySeries.new_from_anyvalues(name, values) + # raised if we cannot convert to Wrap + except RuntimeError: + return PySeries.new_object(name, values, False) + + def sequence_to_pyseries( name: str, values: Sequence[Any], @@ -208,11 +220,8 @@ def sequence_to_pyseries( else: try: nested_arrow_dtype = py_type_to_arrow_type(nested_dtype) - except ValueError as e: # pragma: no cover - raise ValueError( - f"Cannot construct Series from sequence of {nested_dtype}." - ) from e - + except ValueError: # pragma: no cover + return sequence_from_anyvalue_or_object(name, values) try: arrow_values = pa.array(values, pa.large_list(nested_arrow_dtype)) return arrow_to_pyseries(name, arrow_values) @@ -226,15 +235,15 @@ def sequence_to_pyseries( return PySeries.new_series_list(name, [v.inner() for v in values], strict) elif dtype_ == PySeries: return PySeries.new_series_list(name, values, strict) - else: constructor = py_type_to_constructor(dtype_) if constructor == PySeries.new_object: - np_constructor = numpy_type_to_constructor(dtype_) - if np_constructor is not None: - values = np.array(values) # type: ignore - constructor = np_constructor + try: + return PySeries.new_from_anyvalues(name, values) + # raised if we cannot convert to Wrap + except RuntimeError: + return sequence_from_anyvalue_or_object(name, values) return constructor(name, values, strict) diff --git a/py-polars/src/apply/series.rs b/py-polars/src/apply/series.rs index f20511b00b22..241d850f7985 100644 --- a/py-polars/src/apply/series.rs +++ b/py-polars/src/apply/series.rs @@ -1,5 +1,5 @@ use super::*; -use crate::conversion::to_wrapped; +use crate::conversion::slice_to_wrapped; use crate::series::PySeries; use crate::{PyPolarsErr, Wrap}; use polars::chunked_array::builder::get_list_builder; @@ -1902,7 +1902,7 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { fn make_dict_arg(py: Python, names: &[&str], vals: &[AnyValue]) -> Py { let dict = PyDict::new(py); - for (name, val) in names.iter().zip(to_wrapped(vals)) { + for (name, val) in names.iter().zip(slice_to_wrapped(vals)) { dict.set_item(name, val).unwrap() } dict.into_py(py) diff --git a/py-polars/src/conversion.rs b/py-polars/src/conversion.rs index dc040da1062a..f91ec3ad9818 100644 --- a/py-polars/src/conversion.rs +++ b/py-polars/src/conversion.rs @@ -19,7 +19,13 @@ use pyo3::{PyAny, PyResult}; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; -pub(crate) fn to_wrapped(slice: &[T]) -> &[Wrap] { +pub(crate) fn slice_to_wrapped(slice: &[T]) -> &[Wrap] { + // Safety: + // Wrap is transparent. + unsafe { std::mem::transmute(slice) } +} + +pub(crate) fn slice_extract_wrapped(slice: &[Wrap]) -> &[T] { // Safety: // Wrap is transparent. unsafe { std::mem::transmute(slice) } diff --git a/py-polars/src/list_construction.rs b/py-polars/src/list_construction.rs index 53f0129f932a..24d80021db92 100644 --- a/py-polars/src/list_construction.rs +++ b/py-polars/src/list_construction.rs @@ -8,7 +8,7 @@ pub fn py_seq_to_list(name: &str, seq: &PyAny, dtype: &DataType) -> PyResult { let mut builder = - ListPrimitiveChunkedBuilder::::new(name, len, len * 5, DataType::Int64); + ListPrimitiveChunkedBuilder::::new(name, len, len * 5, DataType::Int64); for sub_seq in seq.iter()? { let sub_seq = sub_seq?; let (sub_seq, len) = get_pyseq(sub_seq)?; @@ -32,8 +32,12 @@ pub fn py_seq_to_list(name: &str, seq: &PyAny, dtype: &DataType) -> PyResult { - let mut builder = - ListPrimitiveChunkedBuilder::::new(name, len, len * 5, DataType::Float64); + let mut builder = ListPrimitiveChunkedBuilder::::new( + name, + len, + len * 5, + DataType::Float64, + ); for sub_seq in seq.iter()? { let sub_seq = sub_seq?; let (sub_seq, len) = get_pyseq(sub_seq)?; diff --git a/py-polars/src/series.rs b/py-polars/src/series.rs index ea9525146731..3c73f1c60b9d 100644 --- a/py-polars/src/series.rs +++ b/py-polars/src/series.rs @@ -197,6 +197,12 @@ impl From for PySeries { clippy::len_without_is_empty )] impl PySeries { + #[staticmethod] + pub fn new_from_anyvalues(name: &str, val: Vec>>) -> PySeries { + let avs = slice_extract_wrapped(&val); + Series::new(name, avs).into() + } + #[staticmethod] pub fn new_str(name: &str, val: Wrap, _strict: bool) -> Self { let mut s = val.0.into_series(); diff --git a/py-polars/tests/test_apply.py b/py-polars/tests/test_apply.py index c4bd4f52c2cf..52aeb250f546 100644 --- a/py-polars/tests/test_apply.py +++ b/py-polars/tests/test_apply.py @@ -1,3 +1,4 @@ +import typing from datetime import date, datetime, timedelta from functools import reduce from typing import List, Optional @@ -55,6 +56,7 @@ def test_apply_return_py_object() -> None: assert out.shape == (1, 2) +@typing.no_type_check def test_agg_objects() -> None: df = pl.DataFrame( { @@ -64,8 +66,16 @@ def test_agg_objects() -> None: } ) + class Foo: + def __init__(self, payload): + self.payload = payload + out = df.groupby("groups").agg( - [pl.apply([pl.col("dates"), pl.col("names")], lambda s: dict(zip(s[0], s[1])))] + [ + pl.apply( + [pl.col("dates"), pl.col("names")], lambda s: Foo(dict(zip(s[0], s[1]))) + ) + ] ) assert out.dtypes == [pl.Utf8, pl.Object] diff --git a/py-polars/tests/test_struct.py b/py-polars/tests/test_struct.py index df306fa5402d..10d040feb348 100644 --- a/py-polars/tests/test_struct.py +++ b/py-polars/tests/test_struct.py @@ -278,3 +278,51 @@ def test_sort_df_with_list_struct() -> None: "a": [1], "b": [[{"c": 1}]], } + + +def test_struct_list_head_tail() -> None: + assert pl.DataFrame( + { + "list_of_struct": [ + [{"a": 1, "b": 4}, {"a": 3, "b": 6}], + [{"a": 10, "b": 40}, {"a": 20, "b": 50}, {"a": 30, "b": 60}], + ] + } + ).with_columns( + [ + pl.col("list_of_struct").arr.head(1).alias("head"), + pl.col("list_of_struct").arr.tail(1).alias("tail"), + ] + ).to_dict( + False + ) == { + "list_of_struct": [ + [{"a": 1, "b": 4}, {"a": 3, "b": 6}], + [{"a": 10, "b": 40}, {"a": 20, "b": 50}, {"a": 30, "b": 60}], + ], + "head": [[{"a": 1, "b": 4}], [{"a": 10, "b": 40}]], + "tail": [[{"a": 3, "b": 6}], [{"a": 30, "b": 60}]], + } + + +def test_struct_agg_list() -> None: + df = pl.DataFrame( + { + "group": ["a", "a", "b", "b", "b"], + "col1": [ + {"x": 1, "y": 100}, + {"x": 2, "y": 200}, + {"x": 3, "y": 300}, + {"x": 4, "y": 400}, + {"x": 5, "y": 500}, + ], + } + ) + + assert df.groupby("group", maintain_order=True).agg_list().to_dict(False) == { + "group": ["a", "b"], + "col1": [ + [{"x": 1, "y": 100}, {"x": 2, "y": 200}], + [{"x": 3, "y": 300}, {"x": 4, "y": 400}, {"x": 5, "y": 500}], + ], + }