Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 3, 2022
1 parent a69347b commit 9e7e98d
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 182 deletions.
2 changes: 1 addition & 1 deletion src/array/list/mod.rs
Expand Up @@ -324,7 +324,7 @@ impl<O: Offset> ListArray<O> {
/// Returns a the inner [`Field`]
/// # Errors
/// Panics iff the logical type is not consistent with this struct.
fn try_get_child(data_type: &DataType) -> Result<&Field, Error> {
pub fn try_get_child(data_type: &DataType) -> Result<&Field, Error> {
if O::IS_LARGE {
match data_type.to_logical_type() {
DataType::LargeList(child) => Ok(child.as_ref()),
Expand Down
168 changes: 76 additions & 92 deletions src/ffi/mmap.rs
@@ -1,21 +1,21 @@
use std::collections::VecDeque;

use crate::array::{Array, BinaryArray, BooleanArray, FromFfi, Offset, PrimitiveArray, Utf8Array};
use crate::array::{Array, ListArray, Offset};
use crate::datatypes::DataType;
use crate::error::Error;

use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::read::{IpcBuffer, Node};
use crate::types::NativeType;

use super::{ArrowArray, InternalArrowArray};
use super::{try_from, ArrowArray, InternalArrowArray};

#[allow(dead_code)]
struct PrivateData<T> {
// the owner of the pointers' regions
data: T,
buffers_ptr: Box<[*const std::os::raw::c_void]>,
//children_ptr: Box<[*mut ArrowArray]>,
children_ptr: Box<[*mut ArrowArray]>,
dictionary_ptr: Option<*mut ArrowArray>,
}

Expand Down Expand Up @@ -83,42 +83,52 @@ fn get_validity<'a>(
})
}

fn create_array<T: Clone + AsRef<[u8]>, I: Iterator<Item = Option<*const u8>>>(
fn create_array<
T: Clone + AsRef<[u8]>,
I: Iterator<Item = Option<*const u8>>,
II: Iterator<Item = ArrowArray>,
>(
data: T,
num_rows: usize,
null_count: usize,
buffers: I,
children: II,
) -> ArrowArray {
let n_buffers = buffers.size_hint().0 as i64;

let buffers_ptr = buffers
.map(|maybe_buffer| match maybe_buffer {
Some(b) => b as *const std::os::raw::c_void,
None => std::ptr::null(),
})
.collect::<Box<[_]>>();
let n_buffers = buffers_ptr.len() as i64;

let children_ptr = children
.map(|child| Box::into_raw(Box::new(child)))
.collect::<Box<_>>();
let n_children = children_ptr.len() as i64;

let mut private_data = Box::new(PrivateData::<T> {
data,
buffers_ptr,
children_ptr,
dictionary_ptr: None,
});

ArrowArray {
length: num_rows as i64,
null_count: null_count as i64,
offset: 0,
offset: 0, // IPC files are by definition not offset
n_buffers,
n_children: 0,
n_children,
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: std::ptr::null_mut(),
children: private_data.children_ptr.as_mut_ptr(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(release::<T>),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}

// callback used to drop [ArrowArray] when it is exported
/// callback used to drop [`ArrowArray`] when it is exported
unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
if array.is_null() {
return;
Expand Down Expand Up @@ -167,6 +177,7 @@ fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
num_rows,
null_count,
[validity, Some(offsets), Some(values)].into_iter(),
[].into_iter(),
))
}

Expand Down Expand Up @@ -201,22 +212,10 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
num_rows,
null_count,
[validity, Some(values)].into_iter(),
[].into_iter(),
))
}

fn boolean<T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<BooleanArray, Error> {
let array = mmap_boolean(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is safe because we just (correctly) constructed `ArrowArray`
unsafe { BooleanArray::try_from_ffi(array) }
}

fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
Expand Down Expand Up @@ -244,100 +243,85 @@ fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
num_rows,
null_count,
[validity, Some(values)].into_iter(),
[].into_iter(),
))
}

fn primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
data_type: &DataType,
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<PrimitiveArray<P>, Error> {
let array = mmap_primitive::<P, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is safe because we just (correctly) constructed `ArrowArray`
unsafe { PrimitiveArray::try_from_ffi(array) }
}
) -> Result<ArrowArray, Error> {
let child = ListArray::<O>::try_get_child(data_type)?.data_type();

unsafe fn utf8<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<Utf8Array<O>, Error> {
let array = mmap_binary::<O, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is unsafe because `mmap_utf8` does not validate invariants
unsafe { Utf8Array::<O>::try_from_ffi(array) }
}
let num_rows: usize = node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

unsafe fn binary<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<BinaryArray<O>, Error> {
let array = mmap_binary::<O, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is unsafe because `mmap_utf8` does not validate invariants
unsafe { BinaryArray::<O>::try_from_ffi(array) }
}
let null_count: usize = node
.null_count()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
todo!()
}
let data_ref = data.as_ref();

unsafe fn list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<BinaryArray<O>, Error> {
let array = mmap_list::<O, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is unsafe because `mmap_utf8` does not validate invariants
unsafe { BinaryArray::<O>::try_from_ffi(array) }
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

let offsets = get_buffer::<O>(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr();

let values = get_array(data.clone(), block_offset, child, field_nodes, buffers)?;

// NOTE: offsets and values invariants are _not_ validated
Ok(create_array(
data,
num_rows,
null_count,
[validity, Some(offsets)].into_iter(),
[values].into_iter(),
))
}

/// Maps a memory region to an [`Array`].
pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
fn get_array<T: Clone + AsRef<[u8]>>(
data: T,
block_offset: usize,
data_type: DataType,
data_type: &DataType,
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<Box<dyn Array>, Error> {
) -> Result<ArrowArray, Error> {
use crate::datatypes::PhysicalType::*;
let node = field_nodes
.pop_front()
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?;

match data_type.to_physical_type() {
Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()),
Boolean => mmap_boolean(data, &node, block_offset, buffers),
Primitive(p) => with_match_primitive_type!(p, |$T| {
primitive::<$T, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
mmap_primitive::<$T, _>(data, &node, block_offset, buffers)
}),
Utf8 => utf8::<i32, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()),
LargeUtf8 => {
utf8::<i64, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
}
Binary => {
binary::<i32, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
Utf8 | Binary => mmap_binary::<i32, _>(data, &node, block_offset, buffers),
LargeBinary | LargeUtf8 => mmap_binary::<i64, _>(data, &node, block_offset, buffers),
List => mmap_list::<i32, _>(data, &node, block_offset, data_type, field_nodes, buffers),
LargeList => {
mmap_list::<i64, _>(data, &node, block_offset, data_type, field_nodes, buffers)
}
LargeBinary => {
binary::<i64, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
}
List => list::<i32, _>(data, &node, block_offset, field_nodes, buffers, data_type)
.map(|x| x.boxed()),
_ => todo!(),
}
}

/// Maps a memory region to an [`Array`].
pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
data: T,
block_offset: usize,
data_type: DataType,
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<Box<dyn Array>, Error> {
let array = get_array(data, block_offset, &data_type, field_nodes, buffers)?;
// The unsafety comes from the fact that `array` is not necessarily valid -
// the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data)
unsafe { try_from(InternalArrowArray::new(array, data_type)) }
}

0 comments on commit 9e7e98d

Please sign in to comment.