Skip to content

Commit

Permalink
increase performance parallel joins
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 19, 2020
1 parent 0e6a71b commit 82cb23b
Show file tree
Hide file tree
Showing 7 changed files with 419 additions and 347 deletions.
310 changes: 222 additions & 88 deletions examples/10_minutes_to_pypolars.ipynb

Large diffs are not rendered by default.

15 changes: 0 additions & 15 deletions polars/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use arrow::array::{
TimestampSecondArray,
};
use std::mem;
use std::ops::{Deref, DerefMut};

/// Get a 'hash' of the chunks in order to compare chunk sizes quickly.
fn create_chunk_id(chunks: &Vec<ArrayRef>) -> Vec<usize> {
Expand Down Expand Up @@ -802,20 +801,6 @@ impl<T> AsRef<ChunkedArray<T>> for ChunkedArray<T> {

pub struct NoNull<T>(pub T);

impl<T> Deref for NoNull<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<T> DerefMut for NoNull<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

#[cfg(test)]
pub(crate) mod test {
use crate::prelude::*;
Expand Down
117 changes: 57 additions & 60 deletions polars/src/chunked_array/par/utf8.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::prelude::*;
use arrow::array::StringArray;
use rayon::iter::plumbing::*;
use rayon::iter::plumbing::{Consumer, ProducerCallback};
use rayon::prelude::*;
use std::{marker::PhantomData, mem, ops::Range};
use std::{mem, ops::Range};

#[derive(Debug, Clone)]
pub struct Utf8IntoIter<'a> {
Expand Down Expand Up @@ -48,50 +49,46 @@ impl<'a> IndexedParallelIterator for Utf8IntoIter<'a> {
CB: ProducerCallback<Self::Item>,
{
callback.callback(Utf8Producer {
ca: self.ca.clone(),
phantom: &PhantomData,
ca: &self.ca,
offset: 0,
len: self.ca.len(),
})
}
}

struct Utf8Producer<'a> {
ca: Utf8Chunked,
phantom: &'a PhantomData<()>,
ca: &'a Utf8Chunked,
offset: usize,
len: usize,
}

impl<'a> Producer for Utf8Producer<'a> {
type Item = Option<&'a str>;
type IntoIter = Utf8Iter<'a>;

fn into_iter(self) -> Self::IntoIter {
let iter = (0..self.ca.len()).into_iter();
Utf8Iter {
ca: self.ca,
phantom: &PhantomData,
iter,
}
let iter = (0..self.len).into_iter();
Utf8Iter { ca: self.ca, iter }
}

fn split_at(self, index: usize) -> (Self, Self) {
let left = self.ca.slice(0, index).unwrap();
let right = self.ca.slice(index, self.ca.len() - index).unwrap();
debug_assert!(right.len() + left.len() == self.ca.len());
(
Utf8Producer {
ca: left,
phantom: &PhantomData,
ca: self.ca,
offset: self.offset,
len: index + 1,
},
Utf8Producer {
ca: right,
phantom: &PhantomData,
ca: self.ca,
offset: self.offset + index,
len: self.len - index,
},
)
}
}

struct Utf8Iter<'a> {
ca: Utf8Chunked,
phantom: &'a PhantomData<()>,
ca: &'a Utf8Chunked,
iter: Range<usize>,
}

Expand Down Expand Up @@ -123,19 +120,19 @@ impl<'a> ExactSizeIterator for Utf8Iter<'a> {}
/// No null Iterators

#[derive(Debug, Clone)]
pub struct Utf8IntoIterNoNull<'a> {
pub struct Utf8IntoIterCont<'a> {
ca: &'a Utf8Chunked,
}

impl<'a> IntoParallelIterator for NoNull<&'a Utf8Chunked> {
type Iter = Utf8IntoIterNoNull<'a>;
type Iter = Utf8IntoIterCont<'a>;
type Item = &'a str;

fn into_par_iter(self) -> Self::Iter {
Utf8IntoIterNoNull { ca: self.0 }
Utf8IntoIterCont { ca: self.0 }
}
}
impl<'a> ParallelIterator for Utf8IntoIterNoNull<'a> {
impl<'a> ParallelIterator for Utf8IntoIterCont<'a> {
type Item = &'a str;

fn drive_unindexed<C>(self, consumer: C) -> C::Result
Expand All @@ -149,7 +146,7 @@ impl<'a> ParallelIterator for Utf8IntoIterNoNull<'a> {
Some(self.ca.len())
}
}
impl<'a> IndexedParallelIterator for Utf8IntoIterNoNull<'a> {
impl<'a> IndexedParallelIterator for Utf8IntoIterCont<'a> {
fn len(&self) -> usize {
self.ca.len()
}
Expand All @@ -165,75 +162,75 @@ impl<'a> IndexedParallelIterator for Utf8IntoIterNoNull<'a> {
where
CB: ProducerCallback<Self::Item>,
{
callback.callback(Utf8ProducerNoNull {
ca: self.ca.clone(),
phantom: &PhantomData,
callback.callback(Utf8ProducerCont {
arr: self.ca.downcast_chunks()[0],
offset: 0,
len: self.ca.len(),
})
}
}

struct Utf8ProducerNoNull<'a> {
ca: Utf8Chunked,
phantom: &'a PhantomData<()>,
struct Utf8ProducerCont<'a> {
arr: &'a StringArray,
offset: usize,
len: usize,
}

impl<'a> Producer for Utf8ProducerNoNull<'a> {
impl<'a> Producer for Utf8ProducerCont<'a> {
type Item = &'a str;
type IntoIter = Utf8IterNoNull<'a>;
type IntoIter = Utf8IterCont<'a>;

fn into_iter(self) -> Self::IntoIter {
let iter = (0..self.ca.len()).into_iter();
Utf8IterNoNull {
ca: self.ca,
phantom: &PhantomData,
let iter = (0..self.len).into_iter();
Utf8IterCont {
arr: self.arr,
iter,
offset: self.offset,
}
}

fn split_at(self, index: usize) -> (Self, Self) {
let left = self.ca.slice(0, index).unwrap();
let right = self.ca.slice(index, self.ca.len() - index).unwrap();
debug_assert!(right.len() + left.len() == self.ca.len());
(
Utf8ProducerNoNull {
ca: left,
phantom: &PhantomData,
Utf8ProducerCont {
arr: self.arr,
offset: self.offset,
len: index + 1,
},
Utf8ProducerNoNull {
ca: right,
phantom: &PhantomData,
Utf8ProducerCont {
arr: self.arr,
offset: self.offset + index,
len: self.len - index,
},
)
}
}

struct Utf8IterNoNull<'a> {
ca: Utf8Chunked,
phantom: &'a PhantomData<()>,
struct Utf8IterCont<'a> {
arr: &'a StringArray,
iter: Range<usize>,
offset: usize,
}

impl<'a> Iterator for Utf8IterNoNull<'a> {
impl<'a> Iterator for Utf8IterCont<'a> {
type Item = &'a str;

fn next(&mut self) -> Option<Self::Item> {
self.iter
.next()
.map(|idx| unsafe { mem::transmute::<&'_ str, &'a str>(self.ca.get_unchecked(idx)) })
self.iter.next().map(|idx| unsafe {
mem::transmute::<&'_ str, &'a str>(self.arr.value(idx + self.offset))
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.ca.len();
(len, Some(len))
self.iter.size_hint()
}
}

impl<'a> DoubleEndedIterator for Utf8IterNoNull<'a> {
impl<'a> DoubleEndedIterator for Utf8IterCont<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
self.iter
.next_back()
.map(|idx| unsafe { mem::transmute::<&'_ str, &'a str>(self.ca.get_unchecked(idx)) })
self.iter.next_back().map(|idx| unsafe {
mem::transmute::<&'_ str, &'a str>(self.arr.value(idx + self.offset))
})
}
}

impl<'a> ExactSizeIterator for Utf8IterNoNull<'a> {}
impl<'a> ExactSizeIterator for Utf8IterCont<'a> {}
2 changes: 2 additions & 0 deletions polars/src/doc/changelog/v0_6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
//! * Add more distributions for random sampling.
//! * Fix float aggregations with NaNs.
//! * Comparisons are more performant.
//! * Outer join is more performant.
//! * Start with parallel iterator support for ChunkedArrays.
//!

0 comments on commit 82cb23b

Please sign in to comment.