Skip to content

Commit

Permalink
Use AlignedVec wrapper for aligned vector allocaton, such that the de…
Browse files Browse the repository at this point in the history
…structor is called with the proper alignment
  • Loading branch information
ritchie46 committed Oct 4, 2020
1 parent e80e1a4 commit 6a5d6a5
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 58 deletions.
115 changes: 79 additions & 36 deletions polars/src/chunked_array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use crate::prelude::*;
use crate::utils::get_iter_capacity;
use arrow::array::{ArrayBuilder, ArrayDataBuilder, ArrayRef};
use arrow::datatypes::{ArrowPrimitiveType, Field, ToByteSlice};
pub use arrow::memory;
use arrow::{
array::{Array, ArrayData, LargeListBuilder, PrimitiveArray, PrimitiveBuilder, StringBuilder},
buffer::Buffer,
memory,
util::bit_util,
};
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

Expand Down Expand Up @@ -240,10 +240,10 @@ pub fn aligned_vec_to_primitive_array<T: ArrowPrimitiveType>(
null_bit_buffer: Option<Buffer>,
null_count: usize,
) -> PrimitiveArray<T> {
let values = values.into_inner();
let values = unsafe { values.into_inner() };
let vec_len = values.len();

let me = ManuallyDrop::new(values);
let me = mem::ManuallyDrop::new(values);
let ptr = me.as_ptr() as *const u8;
let len = me.len() * std::mem::size_of::<T::Native>();
let capacity = me.capacity() * std::mem::size_of::<T::Native>();
Expand All @@ -261,54 +261,95 @@ pub fn aligned_vec_to_primitive_array<T: ArrowPrimitiveType>(
PrimitiveArray::<T>::from(data)
}

pub trait AlignedAlloc<T> {
fn with_capacity_aligned(size: usize) -> Vec<T>;
}

impl<T> AlignedAlloc<T> for Vec<T> {
/// Create a new Vec where first bytes memory address has an alignment of 64 bytes, as described
/// by arrow spec.
/// Read more:
/// https://github.com/rust-ndarray/ndarray/issues/771
fn with_capacity_aligned(size: usize) -> Vec<T> {
// Can only have a zero copy to arrow memory if address of first byte % 64 == 0
let t_size = std::mem::size_of::<T>();
let capacity = size * t_size;
let ptr = memory::allocate_aligned(capacity) as *mut T;
unsafe { Vec::from_raw_parts(ptr, 0, capacity) }
#[derive(Debug)]
pub struct AlignedVec<T> {
inner: Vec<T>,
capacity: usize,
// if into_inner is called, this will be true and we can use the default Vec's destructor
taken: bool,
}

impl<T> Drop for AlignedVec<T> {
fn drop(&mut self) {
if !self.taken {
let inner = mem::take(&mut self.inner);
let mut me = mem::ManuallyDrop::new(inner);
let ptr: *mut T = me.as_mut_ptr();
let ptr = ptr as *mut u8;
unsafe { memory::free_aligned(ptr, self.capacity) }
}
}
}

pub struct AlignedVec<T>(pub Vec<T>);

impl<T> FromIterator<T> for AlignedVec<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
let mut iter = iter.into_iter();
let sh = iter.size_hint();
let size = sh.1.unwrap_or(sh.0);

let mut inner = Vec::with_capacity_aligned(size);
let mut av = Self::with_capacity_aligned(size);

while let Some(v) = iter.next() {
inner.push(v)
unsafe { av.push(v) }
}

// Iterator size hint wasn't correct and reallocation has occurred
assert!(inner.len() <= size);
AlignedVec(inner)
assert!(av.len() <= size);
av
}
}

impl<T> AlignedVec<T> {
pub fn new(v: Vec<T>) -> Result<Self> {
if v.as_ptr() as usize % 64 != 0 {
Err(PolarsError::MemoryNotAligned)
} else {
Ok(AlignedVec(v))
/// Create a new Vec where first bytes memory address has an alignment of 64 bytes, as described
/// by arrow spec.
/// Read more:
/// https://github.com/rust-ndarray/ndarray/issues/771
pub fn with_capacity_aligned(size: usize) -> Self {
// Can only have a zero copy to arrow memory if address of first byte % 64 == 0
let t_size = std::mem::size_of::<T>();
let capacity = size * t_size;
let ptr = memory::allocate_aligned(capacity) as *mut T;
let v = unsafe { Vec::from_raw_parts(ptr, 0, capacity) };
AlignedVec {
inner: v,
capacity,
taken: false,
}
}
pub fn into_inner(self) -> Vec<T> {
self.0

pub fn len(&self) -> usize {
self.inner.len()
}

pub unsafe fn from_ptr(ptr: usize, len: usize, capacity: usize) -> Self {
assert_eq!((ptr as usize) % memory::ALIGNMENT, 0);
let ptr = ptr as *mut T;
let v = Vec::from_raw_parts(ptr, len, capacity);
Self {
inner: v,
capacity,
taken: false,
}
}

/// Take ownership of the Vec. This is UB because the destructor of Vec<T> probably has a different
/// alignment than what we allocated.
unsafe fn into_inner(mut self) -> Vec<T> {
let inner = mem::take(&mut self.inner);
self.taken = true;
inner
}

/// Push at the end of the Vec. This is unsafe because a push when the capacity of the
/// inner Vec is reached will reallocate the Vec without the alignment, leaving this destructor's
/// alignment incorrect
pub unsafe fn push(&mut self, value: T) {
debug_assert!(self.inner.len() < self.capacity);
self.inner.push(value)
}

pub fn as_ptr(&self) -> *const T {
self.inner.as_ptr()
}
}

Expand Down Expand Up @@ -606,13 +647,15 @@ mod test {
#[test]
fn from_vec() {
// Can only have a zero copy to arrow memory if address of first byte % 64 == 0
let mut v = Vec::with_capacity_aligned(2);
v.push(1);
v.push(2);
let mut v = AlignedVec::with_capacity_aligned(2);
unsafe {
v.push(1);
v.push(2);
}

let ptr = v.as_ptr();
assert_eq!((ptr as usize) % 64, 0);
let a = aligned_vec_to_primitive_array::<Int32Type>(AlignedVec::new(v).unwrap(), None, 0);
let a = aligned_vec_to_primitive_array::<Int32Type>(v, None, 0);
assert_eq!(a.value_slice(0, 2), &[1, 2])
}

Expand Down
2 changes: 1 addition & 1 deletion polars/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ where
buffer: Option<Buffer>,
null_count: usize,
) -> Self {
let len = values.0.len();
let len = values.len();
let arr = Arc::new(aligned_vec_to_primitive_array::<T>(
values, buffer, null_count,
));
Expand Down
9 changes: 3 additions & 6 deletions polars/src/chunked_array/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,13 @@ where
.enumerate()
.sorted_by(|(_idx_a, a), (_idx_b, b)| sort_partial(b, a))
.map(|(idx, _v)| idx)
.collect::<AlignedVec<usize>>()
.0
.collect()
} else {
self.into_iter()
.enumerate()
.sorted_by(|(_idx_a, a), (_idx_b, b)| sort_partial(a, b))
.map(|(idx, _v)| idx)
.collect::<AlignedVec<usize>>()
.0
.collect()
}
}
}
Expand All @@ -290,8 +288,7 @@ macro_rules! argsort {
.enumerate()
.sorted_by($closure)
.map(|(idx, _v)| idx)
.collect::<AlignedVec<usize>>()
.0
.collect()
}};
}

Expand Down
2 changes: 1 addition & 1 deletion polars/src/chunked_array/unique.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
T: Hash + Eq,
{
let mut set = HashSet::with_capacity_and_hasher(capacity, FnvBuildHasher::default());
let mut unique = Vec::with_capacity_aligned(capacity);
let mut unique = Vec::with_capacity(capacity);
a.enumerate().for_each(|(idx, val)| {
if set.insert(val) {
unique.push(idx)
Expand Down
10 changes: 5 additions & 5 deletions polars/src/chunked_array/upstream_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,15 @@ impl<'a> FromParallelIterator<&'a str> for Utf8Chunked {
// We cannot return a & str owned by this function.
impl<'a> From<&'a Utf8Chunked> for Vec<Option<&'a str>> {
fn from(ca: &'a Utf8Chunked) -> Self {
let mut vec = Vec::with_capacity_aligned(ca.len());
let mut vec = Vec::with_capacity(ca.len());
ca.into_iter().for_each(|opt| vec.push(opt));
vec
}
}

impl From<Utf8Chunked> for Vec<Option<String>> {
fn from(ca: Utf8Chunked) -> Self {
let mut vec = Vec::with_capacity_aligned(ca.len());
let mut vec = Vec::with_capacity(ca.len());
ca.into_iter()
.for_each(|opt| vec.push(opt.map(|s| s.to_string())));
vec
Expand All @@ -423,15 +423,15 @@ impl From<Utf8Chunked> for Vec<Option<String>> {

impl<'a> From<&'a BooleanChunked> for Vec<Option<bool>> {
fn from(ca: &'a BooleanChunked) -> Self {
let mut vec = Vec::with_capacity_aligned(ca.len());
let mut vec = Vec::with_capacity(ca.len());
ca.into_iter().for_each(|opt| vec.push(opt));
vec
}
}

impl From<BooleanChunked> for Vec<Option<bool>> {
fn from(ca: BooleanChunked) -> Self {
let mut vec = Vec::with_capacity_aligned(ca.len());
let mut vec = Vec::with_capacity(ca.len());
ca.into_iter().for_each(|opt| vec.push(opt));
vec
}
Expand All @@ -444,7 +444,7 @@ where
ChunkedArray<T>: ChunkOps,
{
fn from(ca: &'a ChunkedArray<T>) -> Self {
let mut vec = Vec::with_capacity_aligned(ca.len());
let mut vec = Vec::with_capacity(ca.len());
ca.into_iter().for_each(|opt| vec.push(opt));
vec
}
Expand Down
2 changes: 1 addition & 1 deletion polars/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use crate::{
chunked_array::{
arithmetic::Pow,
builder::{
AlignedAlloc, AlignedVec, BooleanChunkedBuilder, LargListBuilderTrait,
AlignedVec, BooleanChunkedBuilder, LargListBuilderTrait,
LargeListPrimitiveChunkedBuilder, LargeListUtf8ChunkedBuilder, NewChunkedArray,
PrimitiveChunkedBuilder, Utf8ChunkedBuilder,
},
Expand Down
7 changes: 5 additions & 2 deletions py-polars/src/npy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ use numpy::{
ToNpyDims, PY_ARRAY_API,
};
use numpy::{Element, PyArray1};
use polars::prelude::*;
use polars::chunked_array::builder::memory;
use pyo3::prelude::*;
use std::{mem, ptr};

/// Create an empty numpy array arrows 64 byte alignment
pub fn aligned_array<T: Element>(py: Python<'_>, size: usize) -> (&PyArray1<T>, *mut T) {
let mut buf: Vec<T> = Vec::with_capacity_aligned(size);
let t_size = std::mem::size_of::<T>();
let capacity = size * t_size;
let ptr = memory::allocate_aligned(capacity) as *mut T;
let mut buf = unsafe { Vec::from_raw_parts(ptr, 0, capacity) };
unsafe { buf.set_len(size) }
// modified from
// numpy-0.10.0/src/array.rs:375
Expand Down
8 changes: 2 additions & 6 deletions py-polars/src/series.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::datatypes::DataType;
use crate::error::PyPolarsEr;
use crate::{
dispatch::ApplyLambda,
npy::{self, aligned_array},
};
use crate::{dispatch::ApplyLambda, npy::aligned_array};
use numpy::PyArray1;
use polars::chunked_array::builder::get_bitmap;
use polars::prelude::*;
Expand Down Expand Up @@ -629,8 +626,7 @@ macro_rules! impl_unsafe_from_ptr {
($name:ident, $series_variant:ident) => {
impl PySeries {
fn $name(&self, ptr: usize, len: usize) -> Self {
let v = unsafe { npy::vec_from_ptr(ptr, len) };
let av = AlignedVec::new(v).unwrap();
let av = unsafe { AlignedVec::from_ptr(ptr, len, len) };
let (null_count, null_bitmap) = get_bitmap(self.series.chunks()[0].as_ref());
let ca = ChunkedArray::new_from_owned_with_null_bitmap(
self.name(),
Expand Down

0 comments on commit 6a5d6a5

Please sign in to comment.