Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Improved performance in cast Primitive to Binary/String again (4x) #651

Merged
merged 7 commits into from
Dec 5, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ multiversion = { version = "0.6.1", optional = true }

[dev-dependencies]
criterion = "0.3"
criterion-macro = "0.3"
pprof = { version = "0.6.1", features = ["flamegraph", "criterion"] }
flate2 = "1"
doc-comment = "0.3"
crossbeam-channel = "0.5.1"
Expand Down
7 changes: 6 additions & 1 deletion benches/cast_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use criterion::{criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use rand::distributions::Uniform;
use rand::Rng;

Expand Down Expand Up @@ -177,5 +178,9 @@ fn add_benchmark(c: &mut Criterion) {
});
}

criterion_group!(benches, add_benchmark);
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = add_benchmark
}
criterion_main!(benches);
7 changes: 4 additions & 3 deletions src/array/binary/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,13 @@ impl<O: Offset> MutableBinaryArray<O> {
}
}

/// Reserves `additional` slots.
pub fn reserve(&mut self, additional: usize) {
/// Reserves `additional` elements and `additional_values` on the values buffer.
pub fn reserve(&mut self, additional: usize, additional_values: usize) {
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved
self.offsets.reserve(additional);
if let Some(x) = self.validity.as_mut() {
x.reserve(additional)
}
self.values.reserve(additional_values);
}

#[inline]
Expand Down Expand Up @@ -399,7 +400,7 @@ impl<O: Offset, T: AsRef<[u8]>> Extend<Option<T>> for MutableBinaryArray<O> {
impl<O: Offset, T: AsRef<[u8]>> TryExtend<Option<T>> for MutableBinaryArray<O> {
fn try_extend<I: IntoIterator<Item = Option<T>>>(&mut self, iter: I) -> Result<()> {
let mut iter = iter.into_iter();
self.reserve(iter.size_hint().0);
self.reserve(iter.size_hint().0, 0);
iter.try_for_each(|x| self.try_push(x))
}
}
Expand Down
92 changes: 59 additions & 33 deletions src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::hash::Hash;

use crate::buffer::MutableBuffer;
use crate::error::Result;
use crate::{
array::*,
Expand All @@ -8,7 +9,6 @@ use crate::{
datatypes::{DataType, TimeUnit},
temporal_conversions::*,
types::NativeType,
util::lexical_to_bytes_mut,
};

use super::CastOptions;
Expand All @@ -17,21 +17,35 @@ use super::CastOptions;
pub fn primitive_to_binary<T: NativeType + lexical_core::ToLexical, O: Offset>(
from: &PrimitiveArray<T>,
) -> BinaryArray<O> {
let mut buffer = vec![];
let builder = from.iter().fold(
MutableBinaryArray::<O>::with_capacity(from.len()),
|mut builder, x| {
match x {
Some(x) => {
lexical_to_bytes_mut(*x, &mut buffer);
builder.push(Some(buffer.as_slice()));
}
None => builder.push_null(),
}
builder
},
);
builder.into()
let mut values: MutableBuffer<u8> = MutableBuffer::with_capacity(from.len());
let mut offsets: MutableBuffer<O> = MutableBuffer::with_capacity(from.len() + 1);
offsets.push(O::default());

let mut offset: usize = 0;

unsafe {
for x in from.values().iter() {
values.reserve(offset + T::FORMATTED_SIZE_DECIMAL);

let bytes = std::slice::from_raw_parts_mut(
values.as_mut_ptr().add(offset),
values.capacity() - offset,
);
let len = lexical_core::write_unchecked(*x, bytes).len();

offset += len;
offsets.push(O::from_isize(offset as isize).unwrap());
}
values.set_len(offset);
values.shrink_to_fit();
}

BinaryArray::<O>::from_data(
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved
BinaryArray::<O>::default_data_type(),
offsets.into(),
values.into(),
from.validity().cloned(),
)
}

pub(super) fn primitive_to_binary_dyn<T, O>(from: &dyn Array) -> Result<Box<dyn Array>>
Expand Down Expand Up @@ -70,23 +84,35 @@ where
pub fn primitive_to_utf8<T: NativeType + lexical_core::ToLexical, O: Offset>(
from: &PrimitiveArray<T>,
) -> Utf8Array<O> {
let mut buffer = vec![];
let builder = from.iter().fold(
MutableUtf8Array::<O>::with_capacity(from.len()),
|mut builder, x| {
match x {
Some(x) => {
lexical_to_bytes_mut(*x, &mut buffer);
builder.push(Some(unsafe {
std::str::from_utf8_unchecked(buffer.as_slice())
}));
}
None => builder.push_null(),
}
builder
},
);
builder.into()
let mut values: MutableBuffer<u8> = MutableBuffer::with_capacity(from.len());
let mut offsets: MutableBuffer<O> = MutableBuffer::with_capacity(from.len() + 1);
offsets.push(O::default());

let mut offset: usize = 0;

unsafe {
for x in from.values().iter() {
values.reserve(offset + T::FORMATTED_SIZE_DECIMAL);

let bytes = std::slice::from_raw_parts_mut(
values.as_mut_ptr().add(offset),
values.capacity() - offset,
);
let len = lexical_core::write_unchecked(*x, bytes).len();

offset += len;
offsets.push(O::from_isize(offset as isize).unwrap());
}
values.set_len(offset);
values.shrink_to_fit();
}

Utf8Array::<O>::from_data(
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved
Utf8Array::<O>::default_data_type(),
offsets.into(),
values.into(),
from.validity().cloned(),
)
}

pub(super) fn primitive_to_utf8_dyn<T, O>(from: &dyn Array) -> Result<Box<dyn Array>>
Expand Down