Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved performance in cast Primitive to Binary/String again (4x) #651

Merged
merged 7 commits into from
Dec 5, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/array/binary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ impl<O: Offset> MutableBinaryArray<O> {
}
}

/// Reserves `additional` slots.
pub fn reserve(&mut self, additional: usize) {
/// Reserves `additional` elements and `additional_values` on the values buffer.
pub fn reserve(&mut self, additional: usize, additional_values: usize) {
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved
self.offsets.reserve(additional);
if let Some(x) = self.validity.as_mut() {
x.reserve(additional)
}
self.values.reserve(additional_values);
}

#[inline]
Expand Down Expand Up @@ -388,6 +389,29 @@ impl<O: Offset> MutableBinaryArray<O> {
let (offsets, values) = values_iter(iterator);
Self::from_data(Self::default_data_type(), offsets, values, None)
}

/// Write values directly into the underlying data.
/// # Safety
/// Caller must ensure that `len <= self.capacity()`
#[inline]
pub unsafe fn write_values<F>(&mut self, mut f: F)
where
F: FnMut(&mut [u8]) -> usize,
{
// ensure values has enough capacity and size to write
self.values.set_len(self.values.capacity());
let buffer = &mut self.values.as_mut_slice()[self.offsets.last().unwrap().to_usize()..];
let len = f(buffer);
let new_len = self.offsets.last().unwrap().to_usize() + len;
self.values.set_len(new_len);

let size = O::from_usize(new_len).ok_or(ArrowError::Overflow).unwrap();
self.offsets.push(size);
match &mut self.validity {
Some(validity) => validity.push(true),
None => {}
}
}
}

impl<O: Offset, T: AsRef<[u8]>> Extend<Option<T>> for MutableBinaryArray<O> {
Expand All @@ -399,7 +423,7 @@ impl<O: Offset, T: AsRef<[u8]>> Extend<Option<T>> for MutableBinaryArray<O> {
impl<O: Offset, T: AsRef<[u8]>> TryExtend<Option<T>> for MutableBinaryArray<O> {
fn try_extend<I: IntoIterator<Item = Option<T>>>(&mut self, iter: I) -> Result<()> {
let mut iter = iter.into_iter();
self.reserve(iter.size_hint().0);
self.reserve(iter.size_hint().0, 0);
iter.try_for_each(|x| self.try_push(x))
}
}
Expand Down
23 changes: 23 additions & 0 deletions src/array/utf8/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,29 @@ impl<O: Offset> MutableUtf8Array<O> {
// soundness: T: AsRef<str>
unsafe { Self::from_data_unchecked(Self::default_data_type(), offsets, values, None) }
}

/// Write values directly into the underlying data.
/// # Safety
/// Caller must ensure that `len <= self.capacity()`
#[inline]
pub unsafe fn write_values<F>(&mut self, mut f: F)
where
F: FnMut(&mut [u8]) -> usize,
{
// ensure values has enough capacity and size to write
self.values.set_len(self.values.capacity());
let buffer = &mut self.values.as_mut_slice()[self.offsets.last().unwrap().to_usize()..];
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is unsound, even in unsafe code: a slice must always have initialized data on it. I propose a different implementation below that avoids introducing another API to the MutableBuffer. LMK what you think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I do agree with you. But now MutableBuffer is the only way to construct a BinaryArray, maybe we should expose values, offsets to outside.

We can have temp values, offset vectors in the cast kernel and then construct the MutableBuffer by these two vectors.

Refer to clickhouse's style:
https://github.com/ClickHouse/ClickHouse/blob/515cc74530d11e1b2b18a63141b66a15b94748ba/src/Columns/ColumnString.h

let len = f(buffer);
let new_len = self.offsets.last().unwrap().to_usize() + len;
self.values.set_len(new_len);

let size = O::from_usize(new_len).ok_or(ArrowError::Overflow).unwrap();
self.offsets.push(size);
match &mut self.validity {
Some(validity) => validity.push(true),
None => {}
}
}
}

impl<O: Offset, T: AsRef<str>> Extend<Option<T>> for MutableUtf8Array<O> {
Expand Down
21 changes: 8 additions & 13 deletions src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
datatypes::{DataType, TimeUnit},
temporal_conversions::*,
types::NativeType,
util::lexical_to_bytes_mut,
};

use super::CastOptions;
Expand All @@ -17,15 +16,14 @@ use super::CastOptions;
pub fn primitive_to_binary<T: NativeType + lexical_core::ToLexical, O: Offset>(
from: &PrimitiveArray<T>,
) -> BinaryArray<O> {
let mut buffer = vec![];
let builder = from.iter().fold(
MutableBinaryArray::<O>::with_capacity(from.len()),
|mut builder, x| {
match x {
Some(x) => {
lexical_to_bytes_mut(*x, &mut buffer);
builder.push(Some(buffer.as_slice()));
}
Some(x) => unsafe {
builder.reserve(1, T::FORMATTED_SIZE_DECIMAL);
builder.write_values(|bytes| lexical_core::write(*x, bytes).len());
},
None => builder.push_null(),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really cool idea!

I think we may go a step further, though: since the size is constant, the offsets will be [0, N, 2N, ..., M*N] and the values can be constructed directly from lexical_core::write, e.g. via extend. We also do not need to check for utf8 below because lexical_core guarantees this. We can even ignore the validity of the primitive array and continue writing whatever is in the null slot, and clone the validity.

I think this implementation is best done without a MutableBinaryArray, though: we benefit from operating on the buffers directly in this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the size is constant, the offsets will be [0, N, 2N, ..., M*N]

It's not constant, T::FORMATTED_SIZE_DECIMAL is the maximum size to reverse.

}
builder
Expand Down Expand Up @@ -70,17 +68,14 @@ where
pub fn primitive_to_utf8<T: NativeType + lexical_core::ToLexical, O: Offset>(
from: &PrimitiveArray<T>,
) -> Utf8Array<O> {
let mut buffer = vec![];
let builder = from.iter().fold(
MutableUtf8Array::<O>::with_capacity(from.len()),
|mut builder, x| {
match x {
Some(x) => {
lexical_to_bytes_mut(*x, &mut buffer);
builder.push(Some(unsafe {
std::str::from_utf8_unchecked(buffer.as_slice())
}));
}
Some(x) => unsafe {
builder.reserve(1, T::FORMATTED_SIZE_DECIMAL);
builder.write_values(|bytes| lexical_core::write(*x, bytes).len());
},
None => builder.push_null(),
}
builder
Expand Down