Skip to content

Commit

Permalink
add cache optimal zip kernel
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 9, 2020
1 parent 061f1d5 commit 1d08785
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 55 deletions.
14 changes: 14 additions & 0 deletions polars/src/chunked_array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use arrow::{
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

Expand Down Expand Up @@ -356,6 +357,19 @@ impl<T> AlignedVec<T> {
pub fn as_ptr(&self) -> *const T {
self.inner.as_ptr()
}

pub fn as_mut_ptr(&mut self) -> *mut T {
self.inner.as_mut_ptr()
}

pub fn capacity(&self) -> usize {
self.inner.capacity()
}

pub fn into_raw_parts(self) -> (*mut T, usize, usize) {
let mut me = ManuallyDrop::new(self);
(me.as_mut_ptr(), me.len(), me.capacity())
}
}

pub trait NewChunkedArray<T, N> {
Expand Down
85 changes: 85 additions & 0 deletions polars/src/chunked_array/kernels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use crate::prelude::*;
use arrow::array::{Array, ArrayData, ArrayRef, BooleanArray, PrimitiveArray};
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
use arrow::datatypes::ArrowNumericType;
use arrow::error::Result as ArrowResult;
use std::ops::BitOr;
use std::sync::Arc;

pub(super) fn apply_bin_op_to_option_bitmap<F>(
left: &Option<Bitmap>,
right: &Option<Bitmap>,
op: F,
) -> Result<Option<Bitmap>>
where
F: Fn(&Bitmap, &Bitmap) -> ArrowResult<Bitmap>,
{
match *left {
None => match *right {
None => Ok(None),
Some(ref r) => Ok(Some(r.clone())),
},
Some(ref l) => match *right {
None => Ok(Some(l.clone())),
Some(ref r) => Ok(Some(op(&l, &r)?)),
},
}
}

/// Cache optimal zip version
pub fn zip<T>(mask: &BooleanArray, a: &PrimitiveArray<T>, b: &PrimitiveArray<T>) -> Result<ArrayRef>
where
T: ArrowNumericType,
{
// get data buffers
let data_a = a.data();
let data_b = b.data();
let data_mask = mask.data();

// get null bitmasks
let mask_bitmap = data_mask.null_bitmap();
let a_bitmap = data_a.null_bitmap();
let b_bitmap = data_b.null_bitmap();

// Compute final null values by bitor ops
let bitmap = apply_bin_op_to_option_bitmap(mask_bitmap, a_bitmap, |a, b| a.bitor(b))?;
let bitmap = apply_bin_op_to_option_bitmap(&bitmap, b_bitmap, |a, b| a.bitor(b))?;
let null_bit_buffer = bitmap.map(|bitmap| bitmap.into_buffer());

// Create an aligned vector.
let mut values = AlignedVec::with_capacity_aligned(mask.len());

// Get a slice to the values in the arrow arrays with the right offset
let vals_a = a.value_slice(a.offset(), a.len());
let vals_b = a.value_slice(b.offset(), b.len());

// fill the aligned vector
for i in 0..mask.len() {
let take_a = mask.value(i);
if take_a {
unsafe {
values.push(vals_a.get_unchecked(i));
}
} else {
unsafe {
values.push(vals_b.get_unchecked(i));
}
}
}

// give ownership to apache arrow without copying
let (ptr, len, cap) = values.into_raw_parts();

let buf = unsafe { Buffer::from_raw_parts(ptr as *const u8, len, cap) };
let data = ArrayData::new(
T::get_data_type(),
a.len(),
None,
null_bit_buffer,
a.offset(),
vec![buf],
vec![],
);
Ok(Arc::new(PrimitiveArray::<T>::from(Arc::new(data))))
}
2 changes: 2 additions & 0 deletions polars/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod cast;
pub mod chunkops;
pub mod comparison;
pub mod iterator;
pub mod kernels;
#[cfg(feature = "ndarray")]
#[doc(cfg(feature = "ndarray"))]
mod ndarray;
Expand All @@ -43,6 +44,7 @@ pub mod take;
pub mod temporal;
pub mod unique;
pub mod upstream_traits;

use arrow::array::{
Array, ArrayDataRef, Date32Array, DurationMicrosecondArray, DurationMillisecondArray,
DurationNanosecondArray, DurationSecondArray, IntervalDayTimeArray, IntervalYearMonthArray,
Expand Down
141 changes: 86 additions & 55 deletions polars/src/chunked_array/ops.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Traits for miscellaneous operations on ChunkedArray
use crate::chunked_array::builder::get_large_list_builder;
use crate::chunked_array::kernels;
use crate::prelude::*;
use crate::utils::Xob;
use arrow::compute;
Expand Down Expand Up @@ -903,15 +904,14 @@ pub trait ChunkZip<T> {
fn zip_with_series(&self, mask: &BooleanChunked, other: &Series) -> Result<ChunkedArray<T>>;
}

// TODO! fast paths and check mask has no null values.
macro_rules! impl_ternary {
($mask:expr, $truthy:expr, $other:expr) => {{
($mask:expr, $self:expr, $other:expr, $ty:ty) => {{
if $mask.null_count() > 0 {
Err(PolarsError::HasNullValues)
} else {
let val = $mask
let mut val: ChunkedArray<$ty> = $mask
.into_no_null_iter()
.zip($truthy)
.zip($self)
.zip($other)
.map(
|((mask_val, true_val), false_val)| {
Expand All @@ -923,10 +923,48 @@ macro_rules! impl_ternary {
},
)
.collect();
val.rename($self.name());
Ok(val)
}
}};
}
macro_rules! impl_ternary_broadcast {
($self:ident, $self_len:ident, $other_len:expr, $other:expr, $mask:expr, $ty:ty) => {{
match ($self_len, $other_len) {
(1, 1) => {
let left = $self.get(0);
let right = $other.get(0);
let mut val: ChunkedArray<$ty> = $mask
.into_no_null_iter()
.map(|mask_val| if mask_val { left } else { right })
.collect();
val.rename($self.name());
Ok(val)
}
(_, 1) => {
let right = $other.get(0);
let mut val: ChunkedArray<$ty> = $mask
.into_no_null_iter()
.zip($self)
.map(|(mask_val, left)| if mask_val { left } else { right })
.collect();
val.rename($self.name());
Ok(val)
}
(1, _) => {
let left = $self.get(0);
let mut val: ChunkedArray<$ty> = $mask
.into_no_null_iter()
.zip($other)
.map(|(mask_val, right)| if mask_val { left } else { right })
.collect();
val.rename($self.name());
Ok(val)
}
(_, _) => Err(PolarsError::ShapeMisMatch),
}
}};
}

impl<T> ChunkZip<T> for ChunkedArray<T>
where
Expand All @@ -937,57 +975,42 @@ where
let other_len = other.len();
let mask_len = mask.len();

// broadcasting path
if self_len != mask_len || other_len != mask_len {
match (self_len, other_len) {
(1, 1) => {
let left = self.get(0);
let right = other.get(0);
let val = mask
.into_no_null_iter()
.map(|mask_val| if mask_val { left } else { right })
.collect();
Ok(val)
}
(_, 1) => {
let right = other.get(0);
let val = mask
.into_no_null_iter()
.zip(self)
.map(|(mask_val, left)| if mask_val { left } else { right })
.collect();
Ok(val)
}
(1, _) => {
let left = self.get(0);
let val = mask
.into_no_null_iter()
.zip(other)
.map(|(mask_val, right)| if mask_val { left } else { right })
.collect();
Ok(val)
}
(_, _) => Err(PolarsError::ShapeMisMatch),
}
impl_ternary_broadcast!(self, self_len, other_len, other, mask, T)

// cache optimal path
} else if self.chunk_id == other.chunk_id && other.chunk_id == mask.chunk_id {
let chunks = self
.downcast_chunks()
.iter()
.zip(&other.downcast_chunks())
.zip(&mask.downcast_chunks())
.map(|((left_c, right_c), mask_c)| kernels::zip(mask_c, left_c, right_c))
.collect::<Result<Vec<_>>>()?;
Ok(ChunkedArray::new_from_chunks(self.name(), chunks))
// no null path
} else if self.null_count() == 0 && other.null_count() == 0 {
let val: Xob<ChunkedArray<_>> = mask
.into_no_null_iter()
.zip(self.into_no_null_iter())
.zip(other.into_no_null_iter())
.map(
|((mask_val, true_val), false_val)| {
if mask_val {
true_val
} else {
false_val
}
},
)
.collect();
let mut ca = val.into_inner();
ca.rename(self.name());
Ok(ca)
// slowest path
} else {
if self.null_count() == 0 && other.null_count() == 0 {
let val: Xob<ChunkedArray<_>> = mask
.into_no_null_iter()
.zip(self.into_no_null_iter())
.zip(other.into_no_null_iter())
.map(
|((mask_val, true_val), false_val)| {
if mask_val {
true_val
} else {
false_val
}
},
)
.collect();
Ok(val.into_inner())
} else {
impl_ternary!(mask, self, other)
}
impl_ternary!(mask, self, other, T)
}
}

Expand All @@ -999,7 +1022,7 @@ where

impl ChunkZip<BooleanType> for BooleanChunked {
fn zip_with(&self, mask: &BooleanChunked, other: &BooleanChunked) -> Result<BooleanChunked> {
impl_ternary!(mask, self, other)
impl_ternary!(mask, self, other, BooleanType)
}

fn zip_with_series(
Expand All @@ -1014,7 +1037,15 @@ impl ChunkZip<BooleanType> for BooleanChunked {

impl ChunkZip<Utf8Type> for Utf8Chunked {
fn zip_with(&self, mask: &BooleanChunked, other: &Utf8Chunked) -> Result<Utf8Chunked> {
impl_ternary!(mask, self, other)
let self_len = self.len();
let other_len = other.len();
let mask_len = mask.len();

if self_len != mask_len || other_len != mask_len {
impl_ternary_broadcast!(self, self_len, other_len, other, mask, Utf8Type)
} else {
impl_ternary!(mask, self, other, Utf8Type)
}
}

fn zip_with_series(
Expand Down

0 comments on commit 1d08785

Please sign in to comment.