Skip to content

Commit

Permalink
perf: Don't rechunk in parallel collection (#15907)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 26, 2024
1 parent 131354c commit a9c8b59
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 236 deletions.
4 changes: 4 additions & 0 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
}
}

#[inline]
pub fn push_value_ignore_validity<V: AsRef<T>>(&mut self, value: V) {
let value = value.as_ref();
let bytes = value.to_bytes();
Expand Down Expand Up @@ -187,13 +188,15 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.views.push(value);
}

#[inline]
pub fn push_value<V: AsRef<T>>(&mut self, value: V) {
if let Some(validity) = &mut self.validity {
validity.push(true)
}
self.push_value_ignore_validity(value)
}

#[inline]
pub fn push<V: AsRef<T>>(&mut self, value: Option<V>) {
if let Some(value) = value {
self.push_value(value)
Expand All @@ -202,6 +205,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
}
}

#[inline]
pub fn push_null(&mut self) {
self.views.push(View::default());
match &mut self.validity {
Expand Down
39 changes: 25 additions & 14 deletions crates/polars-arrow/src/array/boolean/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,30 @@ impl MutableBooleanArray {
}
}

#[inline]
pub fn push_value(&mut self, value: bool) {
self.values.push(value);
match &mut self.validity {
Some(validity) => validity.push(true),
None => {},
}
}

#[inline]
pub fn push_null(&mut self) {
self.values.push(false);
match &mut self.validity {
Some(validity) => validity.push(false),
None => self.init_validity(),
}
}

/// Pushes a new entry to [`MutableBooleanArray`].
#[inline]
pub fn push(&mut self, value: Option<bool>) {
match value {
Some(value) => {
self.values.push(value);
match &mut self.validity {
Some(validity) => validity.push(true),
None => {},
}
},
None => {
self.values.push(false);
match &mut self.validity {
Some(validity) => validity.push(false),
None => self.init_validity(),
}
},
Some(value) => self.push_value(value),
None => self.push_null(),
}
}

Expand Down Expand Up @@ -232,6 +239,10 @@ impl MutableBooleanArray {
let a: BooleanArray = self.into();
Arc::new(a)
}

pub fn freeze(self) -> BooleanArray {
self.into()
}
}

/// Getters
Expand Down
176 changes: 170 additions & 6 deletions crates/polars-arrow/src/pushable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use crate::array::{MutableBinaryViewArray, MutablePrimitiveArray, ViewType};
use crate::bitmap::MutableBitmap;
use crate::offset::{Offset, Offsets};
use crate::array::{
BinaryViewArrayGeneric, BooleanArray, MutableBinaryViewArray, MutableBooleanArray,
MutablePrimitiveArray, PrimitiveArray, ViewType,
};
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::offset::{Offset, Offsets, OffsetsBuffer};
use crate::types::NativeType;

/// A private trait representing structs that can receive elements.
pub trait Pushable<T>: Sized + Default {
type Freeze;

fn with_capacity(capacity: usize) -> Self {
let mut new = Self::default();
new.reserve(capacity);
Expand All @@ -16,9 +21,12 @@ pub trait Pushable<T>: Sized + Default {
fn push_null(&mut self);
fn extend_constant(&mut self, additional: usize, value: T);
fn extend_null_constant(&mut self, additional: usize);
fn freeze(self) -> Self::Freeze;
}

impl Pushable<bool> for MutableBitmap {
type Freeze = Bitmap;

#[inline]
fn reserve(&mut self, additional: usize) {
MutableBitmap::reserve(self, additional)
Expand Down Expand Up @@ -47,9 +55,14 @@ impl Pushable<bool> for MutableBitmap {
fn extend_null_constant(&mut self, additional: usize) {
self.extend_constant(additional, false)
}

fn freeze(self) -> Self::Freeze {
self.into()
}
}

impl<T: Copy + Default> Pushable<T> for Vec<T> {
type Freeze = Vec<T>;
#[inline]
fn reserve(&mut self, additional: usize) {
Vec::reserve(self, additional)
Expand Down Expand Up @@ -78,8 +91,12 @@ impl<T: Copy + Default> Pushable<T> for Vec<T> {
fn extend_null_constant(&mut self, additional: usize) {
self.extend_constant(additional, T::default())
}
fn freeze(self) -> Self::Freeze {
self
}
}
impl<O: Offset> Pushable<usize> for Offsets<O> {
type Freeze = OffsetsBuffer<O>;
fn reserve(&mut self, additional: usize) {
self.reserve(additional)
}
Expand Down Expand Up @@ -107,9 +124,14 @@ impl<O: Offset> Pushable<usize> for Offsets<O> {
fn extend_null_constant(&mut self, additional: usize) {
self.extend_constant(additional)
}
fn freeze(self) -> Self::Freeze {
self.into()
}
}

impl<T: NativeType> Pushable<Option<T>> for MutablePrimitiveArray<T> {
type Freeze = PrimitiveArray<T>;

#[inline]
fn reserve(&mut self, additional: usize) {
MutablePrimitiveArray::reserve(self, additional)
Expand Down Expand Up @@ -139,16 +161,29 @@ impl<T: NativeType> Pushable<Option<T>> for MutablePrimitiveArray<T> {
fn extend_null_constant(&mut self, additional: usize) {
MutablePrimitiveArray::extend_constant(self, additional, None)
}
fn freeze(self) -> Self::Freeze {
self.into()
}
}

impl<T: ViewType + ?Sized> Pushable<&T> for MutableBinaryViewArray<T> {
pub trait NoOption {}
impl NoOption for &str {}
impl NoOption for &[u8] {}

impl<T, K> Pushable<T> for MutableBinaryViewArray<K>
where
T: AsRef<K> + NoOption,
K: ViewType + ?Sized,
{
type Freeze = BinaryViewArrayGeneric<K>;

#[inline]
fn reserve(&mut self, additional: usize) {
MutableBinaryViewArray::reserve(self, additional)
}

#[inline]
fn push(&mut self, value: &T) {
fn push(&mut self, value: T) {
MutableBinaryViewArray::push_value(self, value)
}

Expand All @@ -161,7 +196,8 @@ impl<T: ViewType + ?Sized> Pushable<&T> for MutableBinaryViewArray<T> {
MutableBinaryViewArray::push_null(self)
}

fn extend_constant(&mut self, additional: usize, value: &T) {
fn extend_constant(&mut self, additional: usize, value: T) {
let value = value.as_ref();
// First push a value to get the View
MutableBinaryViewArray::push_value(self, value);

Expand All @@ -183,4 +219,132 @@ impl<T: ViewType + ?Sized> Pushable<&T> for MutableBinaryViewArray<T> {
fn extend_null_constant(&mut self, additional: usize) {
self.extend_null(additional);
}
fn freeze(self) -> Self::Freeze {
self.into()
}
}

impl<T, K> Pushable<Option<T>> for MutableBinaryViewArray<K>
where
T: AsRef<K>,
K: ViewType + ?Sized,
{
type Freeze = BinaryViewArrayGeneric<K>;
#[inline]
fn reserve(&mut self, additional: usize) {
MutableBinaryViewArray::reserve(self, additional)
}

#[inline]
fn push(&mut self, value: Option<T>) {
MutableBinaryViewArray::push(self, value.as_ref())
}

#[inline]
fn len(&self) -> usize {
MutableBinaryViewArray::len(self)
}

fn push_null(&mut self) {
MutableBinaryViewArray::push_null(self)
}

fn extend_constant(&mut self, additional: usize, value: Option<T>) {
let value = value.as_ref();
// First push a value to get the View
MutableBinaryViewArray::push(self, value);

// And then use that new view to extend
let views = self.views_mut();
let view = *views.last().unwrap();

let remaining = additional - 1;
for _ in 0..remaining {
views.push(view);
}

if let Some(bitmap) = self.validity() {
bitmap.extend_constant(remaining, true)
}
}

#[inline]
fn extend_null_constant(&mut self, additional: usize) {
self.extend_null(additional);
}
fn freeze(self) -> Self::Freeze {
self.into()
}
}

impl Pushable<bool> for MutableBooleanArray {
type Freeze = BooleanArray;
#[inline]
fn reserve(&mut self, additional: usize) {
MutableBooleanArray::reserve(self, additional)
}

#[inline]
fn push(&mut self, value: bool) {
MutableBooleanArray::push_value(self, value)
}

#[inline]
fn len(&self) -> usize {
self.values().len()
}

#[inline]
fn push_null(&mut self) {
unimplemented!()
}

#[inline]
fn extend_constant(&mut self, additional: usize, value: bool) {
MutableBooleanArray::extend_constant(self, additional, Some(value))
}

#[inline]
fn extend_null_constant(&mut self, _additional: usize) {
unimplemented!()
}
fn freeze(self) -> Self::Freeze {
self.into()
}
}

impl Pushable<Option<bool>> for MutableBooleanArray {
type Freeze = BooleanArray;
#[inline]
fn reserve(&mut self, additional: usize) {
MutableBooleanArray::reserve(self, additional)
}

#[inline]
fn push(&mut self, value: Option<bool>) {
MutableBooleanArray::push(self, value)
}

#[inline]
fn len(&self) -> usize {
self.values().len()
}

#[inline]
fn push_null(&mut self) {
MutableBooleanArray::push_null(self)
}

#[inline]
fn extend_constant(&mut self, additional: usize, value: Option<bool>) {
MutableBooleanArray::extend_constant(self, additional, value)
}

#[inline]
fn extend_null_constant(&mut self, additional: usize) {
MutableBooleanArray::extend_constant(self, additional, None)
}
fn freeze(self) -> Self::Freeze {
self.into()
}
}
4 changes: 2 additions & 2 deletions crates/polars-core/src/chunked_array/builder/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ impl ChunkedBuilder<bool, BooleanType> for BooleanChunkedBuilder {
/// Appends a value of type `T` into the builder
#[inline]
fn append_value(&mut self, v: bool) {
self.array_builder.push(Some(v));
self.array_builder.push_value(v);
}

/// Appends a null slot into the builder
#[inline]
fn append_null(&mut self) {
self.array_builder.push(None);
self.array_builder.push_null();
}

fn finish(mut self) -> BooleanChunked {
Expand Down
28 changes: 28 additions & 0 deletions crates/polars-core/src/chunked_array/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,31 @@ impl BooleanChunked {
Self::with_chunk(name, arr)
}
}

impl<'a, T> From<&'a ChunkedArray<T>> for Vec<Option<T::Physical<'a>>>
where
T: PolarsDataType,
{
fn from(ca: &'a ChunkedArray<T>) -> Self {
let mut out = Vec::with_capacity(ca.len());
for arr in ca.downcast_iter() {
out.extend(arr.iter())
}
out
}
}
impl From<StringChunked> for Vec<Option<String>> {
fn from(ca: StringChunked) -> Self {
ca.iter().map(|opt| opt.map(|s| s.to_string())).collect()
}
}

impl From<BooleanChunked> for Vec<Option<bool>> {
fn from(ca: BooleanChunked) -> Self {
let mut out = Vec::with_capacity(ca.len());
for arr in ca.downcast_iter() {
out.extend(arr.iter())
}
out
}
}

0 comments on commit a9c8b59

Please sign in to comment.