Skip to content

Commit

Permalink
Chunk utility struct for random access
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 10, 2021
1 parent 3e6b79e commit 5e799cd
Show file tree
Hide file tree
Showing 20 changed files with 229 additions and 181 deletions.
4 changes: 2 additions & 2 deletions polars/polars-core/src/chunked_array/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ where
(a, b) if a == b => {
let (lhs, rhs) = align_chunks_binary(lhs, rhs);
let chunks = lhs
.downcast_chunks()
.zip(rhs.downcast_chunks())
.downcast_iter()
.zip(rhs.downcast_iter())
.map(|(lhs, rhs)| Arc::new(kernel(lhs, rhs).expect("output")) as ArrayRef)
.collect();
lhs.copy_with_chunks(chunks)
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
macro_rules! cast_from_dtype {
($self: expr, $kernel:expr, $dtype: expr) => {{
let chunks = $self
.downcast_chunks()
.downcast_iter()
.into_iter()
.map(|arr| $kernel(arr, $dtype))
.collect();
Expand Down
10 changes: 5 additions & 5 deletions polars/polars-core/src/chunked_array/comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ where
operator: impl Fn(&PrimitiveArray<T>, &PrimitiveArray<T>) -> arrow::error::Result<BooleanArray>,
) -> Result<BooleanChunked> {
let chunks = self
.downcast_chunks()
.zip(rhs.downcast_chunks())
.downcast_iter()
.zip(rhs.downcast_iter())
.map(|(left, right)| {
let arr_res = operator(left, right);
let arr = match arr_res {
Expand Down Expand Up @@ -570,8 +570,8 @@ impl BooleanChunked {
operator: impl Fn(&BooleanArray, &BooleanArray) -> arrow::error::Result<BooleanArray>,
) -> Result<BooleanChunked> {
let chunks = self
.downcast_chunks()
.zip(rhs.downcast_chunks())
.downcast_iter()
.zip(rhs.downcast_iter())
.map(|(left, right)| {
let arr_res = operator(left, right);
let arr = match arr_res {
Expand Down Expand Up @@ -643,7 +643,7 @@ impl Not for &BooleanChunked {

fn not(self) -> Self::Output {
let chunks = self
.downcast_chunks()
.downcast_iter()
.map(|a| {
let arr = compute::not(a).expect("should not fail");
Arc::new(arr) as ArrayRef
Expand Down
57 changes: 27 additions & 30 deletions polars/polars-core/src/chunked_array/iterator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::datatypes::CategoricalChunked;
use crate::chunked_array::ops::downcast::Chunks;
use crate::prelude::{
BooleanChunked, ChunkedArray, ListChunked, PolarsNumericType, Series, UnsafeValue, Utf8Chunked,
};
Expand Down Expand Up @@ -87,7 +88,7 @@ where
T: PolarsNumericType,
{
fn new(ca: &'a ChunkedArray<T>) -> Self {
let chunk = ca.downcast_chunks().next().unwrap();
let chunk = ca.downcast_iter().next().unwrap();
let slice = chunk.values();
let iter = slice.iter().copied();

Expand Down Expand Up @@ -137,7 +138,7 @@ where
T: PolarsNumericType,
{
fn new(ca: &'a ChunkedArray<T>) -> Self {
let mut chunks = ca.downcast_chunks();
let mut chunks = ca.downcast_iter();
let arr = chunks.next().unwrap();
let idx_left = 0;
let idx_right = arr.len();
Expand Down Expand Up @@ -206,7 +207,7 @@ where
T: PolarsNumericType,
{
ca: &'a ChunkedArray<T>,
chunks: Vec<&'a PrimitiveArray<T>>,
chunks: Chunks<'a, PrimitiveArray<T>>,
// current iterator if we iterate from the left
current_iter_left: Copied<Iter<'a, T::Native>>,
// If iter_left and iter_right are the same, this is None and we need to use iter left
Expand Down Expand Up @@ -249,9 +250,8 @@ where
}
}
fn new(ca: &'a ChunkedArray<T>) -> Self {
// TODO: traverse iterator without collect
let chunks: Vec<_> = ca.downcast_chunks().collect();
let current_iter_left = chunks[0].values().iter().copied();
let chunks = ca.downcast_chunks();
let current_iter_left = chunks.get(0).unwrap().values().iter().copied();

let idx_left = 0;
let chunk_idx_left = 0;
Expand All @@ -262,7 +262,7 @@ where
if chunk_idx_left == chunk_idx_right {
current_iter_right = None
} else {
let arr = chunks[chunk_idx_right];
let arr = chunks.get(chunk_idx_right).unwrap();
current_iter_right = Some(arr.values().iter().copied())
}

Expand Down Expand Up @@ -352,7 +352,7 @@ where
T: PolarsNumericType,
{
ca: &'a ChunkedArray<T>,
chunks: Vec<&'a PrimitiveArray<T>>,
chunks: Chunks<'a, PrimitiveArray<T>>,
// current iterator if we iterate from the left
current_iter_left: Copied<Iter<'a, T::Native>>,
current_data_left: &'a ArrayData,
Expand Down Expand Up @@ -406,9 +406,8 @@ where
}

fn new(ca: &'a ChunkedArray<T>) -> Self {
// TODO: traverse without collect
let chunks: Vec<_> = ca.downcast_chunks().collect();
let arr_left = chunks[0];
let chunks= ca.downcast_chunks();
let arr_left = chunks.get(0).unwrap();
let current_iter_left = arr_left.values().iter().copied();
let current_data_left = arr_left.data();

Expand All @@ -418,7 +417,7 @@ where

let chunk_idx_right = chunks.len() - 1;
let current_iter_right;
let arr = chunks[chunk_idx_right];
let arr = chunks.get(chunk_idx_right).unwrap();
let current_data_right = arr.data();
if chunk_idx_left == chunk_idx_right {
current_iter_right = None
Expand Down Expand Up @@ -623,7 +622,7 @@ macro_rules! impl_single_chunk_iterator {

impl<'a> $iterator_name<'a> {
fn new(ca: &'a $ca_type) -> Self {
let current_array = ca.downcast_chunks().next().unwrap();
let current_array = ca.downcast_iter().next().unwrap();
let idx_left = 0;
let idx_right = current_array.len();

Expand Down Expand Up @@ -751,7 +750,7 @@ macro_rules! impl_single_chunk_null_check_iterator {

impl<'a> $iterator_name<'a> {
fn new(ca: &'a $ca_type) -> Self {
let current_array = ca.downcast_chunks().next().unwrap();
let current_array = ca.downcast_iter().next().unwrap();
let current_data = current_array.data();
let idx_left = 0;
let idx_right = current_array.len();
Expand Down Expand Up @@ -877,7 +876,7 @@ macro_rules! impl_many_chunk_iterator {
/// The return type is `$iter_item`.
pub struct $iterator_name<'a> {
ca: &'a $ca_type,
chunks: Vec<&'a $arrow_array>,
chunks: Chunks<'a, $arrow_array>,
current_array_left: &'a $arrow_array,
current_array_right: &'a $arrow_array,
current_array_idx_left: usize,
Expand All @@ -891,13 +890,12 @@ macro_rules! impl_many_chunk_iterator {

impl<'a> $iterator_name<'a> {
fn new(ca: &'a $ca_type) -> Self {
// TODO: fix without collect
let chunks: Vec<_> = ca.downcast_chunks().collect();
let current_array_left = chunks[0];
let chunks = ca.downcast_chunks();
let current_array_left = chunks.get(0).unwrap();
let idx_left = 0;
let chunk_idx_left = 0;
let chunk_idx_right = chunks.len() - 1;
let current_array_right = chunks[chunk_idx_right];
let current_array_right = chunks.get(chunk_idx_right).unwrap();
let idx_right = ca.len();
let current_array_idx_left = 0;
let current_array_idx_right = current_array_right.len();
Expand All @@ -924,7 +922,7 @@ macro_rules! impl_many_chunk_iterator {
let (chunk_idx_left, current_array_idx_left) = self.ca.index_to_chunked_index(idx_left);
self.chunk_idx_left = chunk_idx_left;
self.current_array_idx_left = current_array_idx_left;
self.current_array_left = self.chunks[chunk_idx_left];
self.current_array_left = self.chunks.get(chunk_idx_left).unwrap();
self.current_array_left_len = self.current_array_left.len();
}

Expand Down Expand Up @@ -977,7 +975,7 @@ macro_rules! impl_many_chunk_iterator {
if self.chunk_idx_left < self.chunks.len() {
// reset to new array
self.current_array_idx_left = 0;
self.current_array_left = self.chunks[self.chunk_idx_left];
self.current_array_left = self.chunks.get(self.chunk_idx_left).unwrap();
self.current_array_left_len = self.current_array_left.len();
}
}
Expand Down Expand Up @@ -1023,7 +1021,7 @@ macro_rules! impl_many_chunk_iterator {
// set a new chunk as current data
self.chunk_idx_right -= 1;
// reset to new array
self.current_array_right = self.chunks[self.chunk_idx_right];
self.current_array_right = self.chunks.get(self.chunk_idx_right).unwrap();
self.current_array_idx_right = self.current_array_right.len();
}

Expand Down Expand Up @@ -1069,7 +1067,7 @@ macro_rules! impl_many_chunk_null_check_iterator {
/// The return type is `Option<$iter_item>`.
pub struct $iterator_name<'a> {
ca: &'a $ca_type,
chunks: Vec<&'a $arrow_array>,
chunks: Chunks<'a, $arrow_array>,
#[allow(dead_code)]
current_data_left: &'a ArrayData,
current_array_left: &'a $arrow_array,
Expand All @@ -1086,14 +1084,13 @@ macro_rules! impl_many_chunk_null_check_iterator {

impl<'a> $iterator_name<'a> {
fn new(ca: &'a $ca_type) -> Self {
// TODO: fix without collect
let chunks: Vec<_> = ca.downcast_chunks().collect();
let current_array_left = chunks[0];
let chunks = ca.downcast_chunks();
let current_array_left = chunks.get(0).unwrap();
let current_data_left = current_array_left.data();
let idx_left = 0;
let chunk_idx_left = 0;
let chunk_idx_right = chunks.len() - 1;
let current_array_right = chunks[chunk_idx_right];
let current_array_right = chunks.get(chunk_idx_right).unwrap();
let current_data_right = current_array_right.data();
let idx_right = ca.len();
let current_array_idx_left = 0;
Expand Down Expand Up @@ -1123,7 +1120,7 @@ macro_rules! impl_many_chunk_null_check_iterator {
let (chunk_idx_left, current_array_idx_left) = self.ca.index_to_chunked_index(idx_left);
self.chunk_idx_left = chunk_idx_left;
self.current_array_idx_left = current_array_idx_left;
self.current_array_left = self.chunks[chunk_idx_left];
self.current_array_left = self.chunks.get(chunk_idx_left).unwrap();
self.current_array_left_len = self.current_array_left.len();
self.current_data_left = self.current_array_left.data();
}
Expand Down Expand Up @@ -1190,7 +1187,7 @@ macro_rules! impl_many_chunk_null_check_iterator {
if self.chunk_idx_left < self.chunks.len() {
// reset to new array
self.current_array_idx_left = 0;
self.current_array_left = self.chunks[self.chunk_idx_left];
self.current_array_left = self.chunks.get(self.chunk_idx_left).unwrap();
self.current_data_left = self.current_array_left.data();
self.current_array_left_len = self.current_array_left.len();
}
Expand Down Expand Up @@ -1246,7 +1243,7 @@ macro_rules! impl_many_chunk_null_check_iterator {
// set a new chunk as current data
self.chunk_idx_right -= 1;
// reset to new array
self.current_array_right = self.chunks[self.chunk_idx_right];
self.current_array_right = self.chunks.get(self.chunk_idx_right).unwrap();
self.current_data_right = self.current_array_right.data();
self.current_array_idx_right = self.current_array_right.len();
}
Expand Down
12 changes: 6 additions & 6 deletions polars/polars-core/src/chunked_array/iterator/par/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ macro_rules! impl_all_parallel_iterators {
// The methods are the same for the `ReturnOption` and `ReturnUnwrap` variant.
impl<$( $lifetime )?> $seq_iter_single_chunk {
fn from_parts(ca: $ca_type, offset: usize, len: usize) -> Self {
let chunks = ca.downcast_chunks();
let chunks = ca.downcast_iter();
let current_array = chunks[0];
let idx_left = offset;
let idx_right = offset + len;
Expand All @@ -110,7 +110,7 @@ macro_rules! impl_all_parallel_iterators {
impl<$( $lifetime )?> $seq_iter_many_chunk {
fn from_parts(ca: $ca_type, offset: usize, len: usize) -> Self {
let ca = ca;
let chunks = ca.downcast_chunks();
let chunks = ca.downcast_iter();
let idx_left = offset;
let (chunk_idx_left, current_array_idx_left) = ca.index_to_chunked_index(idx_left);
let current_array_left = chunks[chunk_idx_left];
Expand Down Expand Up @@ -190,7 +190,7 @@ macro_rules! impl_all_parallel_iterators {
for $seq_iter_single_chunk_null_check
{
fn from(prod: $producer_single_chunk_null_check_return_option<$( $lifetime )?>) -> Self {
let chunks = prod.ca.downcast_chunks();
let chunks = prod.ca.downcast_iter();
let current_array = chunks[0];
let current_data = current_array.data();
let idx_left = prod.offset;
Expand Down Expand Up @@ -270,7 +270,7 @@ macro_rules! impl_all_parallel_iterators {
{
fn from(prod: $producer_many_chunk_null_check_return_option<$( $lifetime )?>) -> Self {
let ca = prod.ca;
let chunks = ca.downcast_chunks();
let chunks = ca.downcast_iter();

// Compute left chunk indexes.
let idx_left = prod.offset;
Expand Down Expand Up @@ -554,7 +554,7 @@ macro_rules! impl_into_par_iter {
type Item = Option<$iter_item>;

fn into_par_iter(self) -> Self::Iter {
let chunks = self.downcast_chunks();
let chunks = self.downcast_iter();
match chunks.len() {
1 => {
if self.null_count() == 0 {
Expand Down Expand Up @@ -688,7 +688,7 @@ macro_rules! impl_into_no_null_par_iter {

fn into_par_iter(self) -> Self::Iter {
let ca = self.0;
let chunks = ca.downcast_chunks();
let chunks = ca.downcast_iter();
match chunks.len() {
1 => $dispatcher::SingleChunk(
<$single_chunk_return_unwrapped>::new(ca),
Expand Down
12 changes: 6 additions & 6 deletions polars/polars-core/src/chunked_array/iterator/par/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where
T: PolarsNumericType + Send + Sync,
{
fn from_parts(ca: &'a ChunkedArray<T>, offset: usize, len: usize) -> Self {
let chunk = ca.downcast_chunks()[0];
let chunk = ca.downcast_iter()[0];
let slice = &chunk.values()[offset..len];
let iter = slice.iter().copied();

Expand All @@ -113,7 +113,7 @@ where
T: PolarsNumericType + Send + Sync,
{
fn from_parts(ca: &'a ChunkedArray<T>, offset: usize, len: usize) -> Self {
let chunks = ca.downcast_chunks();
let chunks = ca.downcast_iter();

// Compute left array indexes.
let idx_left = offset;
Expand Down Expand Up @@ -256,7 +256,7 @@ where
T: PolarsNumericType + Send + Sync,
{
fn from(prod: NumProducerSingleChunkNullCheckReturnOption<'a, T>) -> Self {
let chunks = prod.ca.downcast_chunks();
let chunks = prod.ca.downcast_iter();
let arr = chunks[0];
let idx_left = prod.offset;
let idx_right = prod.offset + prod.len;
Expand Down Expand Up @@ -393,7 +393,7 @@ where
{
fn from(prod: NumProducerManyChunkNullCheckReturnOption<'a, T>) -> Self {
let ca = prod.ca;
let chunks = prod.ca.downcast_chunks();
let chunks = prod.ca.downcast_iter();

// Compute left array indexes and data.
let idx_left = prod.offset;
Expand Down Expand Up @@ -632,7 +632,7 @@ where
type Item = Option<T::Native>;

fn into_par_iter(self) -> Self::Iter {
let chunks = self.downcast_chunks();
let chunks = self.downcast_iter();
match chunks.len() {
1 => {
if self.null_count() == 0 {
Expand Down Expand Up @@ -747,7 +747,7 @@ where

fn into_par_iter(self) -> Self::Iter {
let ca = self.0;
let chunks = ca.downcast_chunks();
let chunks = ca.downcast_iter();
match chunks.len() {
1 => NumNoNullParIterDispatcher::SingleChunk(
NumParIterSingleChunkReturnUnwrapped::new(ca),
Expand Down

0 comments on commit 5e799cd

Please sign in to comment.