Skip to content

Commit

Permalink
implement iterators for ObjectChunked
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 18, 2021
1 parent d2871e6 commit d705d43
Show file tree
Hide file tree
Showing 15 changed files with 164 additions and 38 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/test-windows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ jobs:
toolchain: nightly-2021-03-25
override: true
- name: Run tests
shell: bash
run: |
# do not produce debug symbols to keep memory usage down
export RUSTFLAGS="-C debuginfo=0"
cd polars && make test
cd ../py-polars && ./tasks.sh build-run-tests
29 changes: 26 additions & 3 deletions polars/polars-core/src/chunked_array/iterator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::datatypes::CategoricalChunked;
use crate::prelude::{
BooleanChunked, ChunkedArray, ListChunked, PolarsNumericType, Series, Utf8Chunked,
};
use crate::prelude::*;
use crate::utils::CustomIterTools;
use arrow::array::{Array, BooleanArray, LargeListArray, LargeStringArray};
use std::convert::TryFrom;
Expand Down Expand Up @@ -267,6 +265,31 @@ impl ListChunked {
}
}

#[cfg(feature = "object")]
impl<'a, T> IntoIterator for &'a ObjectChunked<T>
where
T: PolarsObject,
{
type Item = Option<&'a T>;
type IntoIter = Box<dyn PolarsIterator<Item = Self::Item> + 'a>;
fn into_iter(self) -> Self::IntoIter {
Box::new(self.downcast_iter().flatten().trust_my_length(self.len()))
}
}

#[cfg(feature = "object")]
impl<T: PolarsObject> ObjectChunked<T> {
#[allow(clippy::wrong_self_convention)]
pub fn into_no_null_iter(
&self,
) -> impl Iterator<Item = &T> + '_ + Send + Sync + ExactSizeIterator + DoubleEndedIterator {
self.downcast_iter()
.map(|arr| arr.values().iter())
.flatten()
.trust_my_length(self.len())
}
}

/// Trait for ChunkedArrays that don't have null values.
/// The result is the most efficient implementation `Iterator`, according to the number of chunks.
pub trait IntoNoNullIterator {
Expand Down
12 changes: 8 additions & 4 deletions polars/polars-core/src/chunked_array/object/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct ObjectChunkedBuilder<T> {

impl<T> ObjectChunkedBuilder<T>
where
T: Any + Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
pub fn new(name: &str, capacity: usize) -> Self {
ObjectChunkedBuilder {
Expand All @@ -24,17 +24,20 @@ where
}

/// Appends a value of type `T` into the builder
#[inline]
pub fn append_value(&mut self, v: T) {
self.values.push(v);
self.bitmask_builder.append(true);
}

/// Appends a null slot into the builder
#[inline]
pub fn append_null(&mut self) {
self.values.push(T::default());
self.bitmask_builder.append(false);
}

#[inline]
pub fn append_value_from_any(&mut self, v: &dyn Any) -> Result<()> {
match v.downcast_ref::<T>() {
None => Err(PolarsError::DataTypeMisMatch(
Expand All @@ -47,6 +50,7 @@ where
}
}

#[inline]
pub fn append_option(&mut self, opt: Option<T>) {
match opt {
Some(s) => self.append_value(s),
Expand Down Expand Up @@ -84,7 +88,7 @@ where

impl<T> Default for ObjectChunkedBuilder<T>
where
T: Any + Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
fn default() -> Self {
ObjectChunkedBuilder::new("", 0)
Expand All @@ -93,7 +97,7 @@ where

impl<T> NewChunkedArray<ObjectType<T>, T> for ObjectChunked<T>
where
T: Any + Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
fn new_from_slice(name: &str, v: &[T]) -> Self {
Self::new_from_iter(name, v.iter().cloned())
Expand Down Expand Up @@ -124,7 +128,7 @@ where

impl<T> ObjectChunked<T>
where
T: Any + Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
pub fn new_from_vec(name: &str, v: Vec<T>) -> Self {
let field = Arc::new(Field::new(name, DataType::Object));
Expand Down
84 changes: 84 additions & 0 deletions polars/polars-core/src/chunked_array/object/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use crate::chunked_array::object::{ObjectArray, PolarsObject};
use arrow::array::Array;

/// An iterator that returns Some(T) or None, that can be used on any ObjectArray
// Note: This implementation is based on std's [Vec]s' [IntoIter].
#[derive(Debug)]
pub struct ObjectIter<'a, T: PolarsObject> {
array: &'a ObjectArray<T>,
current: usize,
current_end: usize,
}

impl<'a, T: PolarsObject> ObjectIter<'a, T> {
/// create a new iterator
pub fn new(array: &'a ObjectArray<T>) -> Self {
ObjectIter::<T> {
array,
current: 0,
current_end: array.len(),
}
}
}

impl<'a, T: PolarsObject> std::iter::Iterator for ObjectIter<'a, T> {
type Item = Option<&'a T>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.current == self.current_end {
None
} else if self.array.is_null(self.current) {
self.current += 1;
Some(None)
} else {
let old = self.current;
self.current += 1;
// Safety:
// we just checked bounds in `self.current_end == self.current`
// this is safe on the premise that this struct is initialized with
// current = array.len()
// and that current_end is ever only decremented
unsafe { Some(Some(self.array.value_unchecked(old))) }
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(
self.array.len() - self.current,
Some(self.array.len() - self.current),
)
}
}

impl<'a, T: PolarsObject> std::iter::DoubleEndedIterator for ObjectIter<'a, T> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.current_end == self.current {
None
} else {
self.current_end -= 1;
Some(if self.array.is_null(self.current_end) {
None
} else {
// Safety:
// we just checked bounds in `self.current_end == self.current`
// this is safe on the premise that this struct is initialized with
// current = array.len()
// and that current_end is ever only decremented
unsafe { Some(self.array.value_unchecked(self.current_end)) }
})
}
}
}

/// all arrays have known size.
impl<'a, T: PolarsObject> std::iter::ExactSizeIterator for ObjectIter<'a, T> {}

impl<'a, T: PolarsObject> IntoIterator for &'a ObjectArray<T> {
type Item = Option<&'a T>;
type IntoIter = ObjectIter<'a, T>;

fn into_iter(self) -> Self::IntoIter {
ObjectIter::<'a, T>::new(self)
}
}
30 changes: 25 additions & 5 deletions polars/polars-core/src/chunked_array/object/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod builder;
mod iterator;

pub use crate::prelude::*;
use crate::utils::arrow::array::ArrayData;
use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, JsonEqual};
Expand All @@ -11,7 +13,7 @@ use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ObjectArray<T>
where
T: Any + Debug + Clone + Send + Sync,
T: PolarsObject,
{
values: Arc<Vec<T>>,
null_bitmap: Option<Arc<Bitmap>>,
Expand All @@ -20,18 +22,36 @@ where
len: usize,
}

pub trait PolarsObject: Any + Debug + Clone + Send + Sync + Default {}

impl<T> ObjectArray<T>
where
T: Any + Debug + Clone + Send + Sync,
T: PolarsObject,
{
/// Get a reference to the underlying data
pub fn values(&self) -> &Arc<Vec<T>> {
&self.values
}

/// Get a value at a certain index location
pub fn value(&self, index: usize) -> &T {
&self.values[self.offset + index]
}

/// Get a value at a certain index location
///
/// # Safety
///
/// This does not any bound checks. The caller needs to ensure the index is within
/// the size of the array.
pub unsafe fn value_unchecked(&self, index: usize) -> &T {
&self.values.get_unchecked(index)
}
}

impl<T> JsonEqual for ObjectArray<T>
where
T: Any + Debug + Clone + Send + Sync,
T: PolarsObject,
{
fn equals_json(&self, _json: &[&Value]) -> bool {
false
Expand All @@ -40,7 +60,7 @@ where

impl<T> Array for ObjectArray<T>
where
T: Any + Debug + Clone + Send + Sync,
T: PolarsObject,
{
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -115,7 +135,7 @@ where

impl<T> ObjectChunked<T>
where
T: Any + Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
///
/// # Safety
Expand Down
6 changes: 1 addition & 5 deletions polars/polars-core/src/chunked_array/ops/chunkops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ use arrow::array::Array;
use arrow::compute::concat;
use itertools::Itertools;
use polars_arrow::prelude::*;
#[cfg(feature = "object")]
use std::any::Any;
#[cfg(feature = "object")]
use std::fmt::Debug;

pub trait ChunkOps {
/// Aggregate to contiguous memory.
Expand Down Expand Up @@ -93,7 +89,7 @@ impl ChunkOps for ListChunked {
#[cfg(feature = "object")]
impl<T> ChunkOps for ObjectChunked<T>
where
T: Any + Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
fn rechunk(&self) -> Self
where
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/chunked_array/ops/downcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ impl ListChunked {
}

#[cfg(feature = "object")]
impl<T> ObjectChunked<T>
impl<'a, T> ObjectChunked<T>
where
T: 'static + std::fmt::Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
pub fn downcast_iter(&self) -> impl Iterator<Item = &ObjectArray<T>> + DoubleEndedIterator {
self.chunks.iter().map(|arr| {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ChunkFilter<ListType> for ListChunked {
#[cfg(feature = "object")]
impl<T> ChunkFilter<ObjectType<T>> for ObjectChunked<T>
where
T: 'static + std::fmt::Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
fn filter(&self, filter: &BooleanChunked) -> Result<ChunkedArray<ObjectType<T>>>
where
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ pub use std::sync::Arc;

#[cfg(feature = "temporal")]
pub use crate::chunked_array::temporal::conversion::*;

#[cfg(feature = "object")]
pub use crate::chunked_array::object::PolarsObject;
10 changes: 3 additions & 7 deletions polars/polars-core/src/series/implementations/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use crate::series::private::PrivateSeries;
use arrow::array::{ArrayData, ArrayRef};
use arrow::buffer::Buffer;
use std::any::Any;
use std::fmt::Debug;

#[cfg(feature = "object")]
impl<T> IntoSeries for ObjectChunked<T>
where
T: 'static + std::fmt::Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
fn into_series(self) -> Series {
Series(Arc::new(SeriesWrap(self)))
Expand All @@ -20,15 +19,12 @@ where

#[cfg(feature = "object")]
#[cfg_attr(docsrs, doc(cfg(feature = "object")))]
impl<T> PrivateSeries for SeriesWrap<ObjectChunked<T>> where
T: 'static + Debug + Clone + Send + Sync + Default
{
}
impl<T> PrivateSeries for SeriesWrap<ObjectChunked<T>> where T: PolarsObject {}
#[cfg(feature = "object")]
#[cfg_attr(docsrs, doc(cfg(feature = "object")))]
impl<T> SeriesTrait for SeriesWrap<ObjectChunked<T>>
where
T: 'static + Debug + Clone + Send + Sync + Default,
T: PolarsObject,
{
fn rename(&mut self, name: &str) {
ObjectChunked::rename(&mut self.0, name)
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl PyDataFrame {
mut n_threads: Option<usize>,
path: Option<String>,
overwrite_dtype: Option<Vec<(&str, &PyAny)>>,
low_memory: bool
low_memory: bool,
) -> PyResult<Self> {
let encoding = match encoding {
"utf8" => CsvEncoding::Utf8,
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl PyLazyFrame {
stop_after_n_rows: Option<usize>,
cache: bool,
overwrite_dtype: Option<Vec<(&str, &PyAny)>>,
low_memory: bool
low_memory: bool,
) -> Self {
let delimiter = sep.as_bytes()[0];

Expand Down
1 change: 0 additions & 1 deletion py-polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ impl PyExpr {
self.clone().inner.arg_sort(reverse).into()
}


pub fn take(&self, idx: PyExpr) -> PyExpr {
self.clone().inner.take(idx.inner).into()
}
Expand Down

0 comments on commit d705d43

Please sign in to comment.