Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
More types
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 3, 2022
1 parent 741e53c commit a69347b
Showing 1 changed file with 146 additions and 39 deletions.
185 changes: 146 additions & 39 deletions src/ffi/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::VecDeque;

use crate::array::{Array, BooleanArray, FromFfi, Offset, Utf8Array};
use crate::array::{Array, BinaryArray, BooleanArray, FromFfi, Offset, PrimitiveArray, Utf8Array};
use crate::datatypes::DataType;
use crate::error::Error;

use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::read::{IpcBuffer, Node};
use crate::types::NativeType;

use super::{ArrowArray, InternalArrowArray};

Expand All @@ -18,7 +19,7 @@ struct PrivateData<T> {
dictionary_ptr: Option<*mut ArrowArray>,
}

fn get_buffer(buffers: &mut VecDeque<IpcBuffer>) -> Result<(usize, usize), Error> {
fn get_buffer_bounds(buffers: &mut VecDeque<IpcBuffer>) -> Result<(usize, usize), Error> {
let buffer = buffers
.pop_front()
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?;
Expand All @@ -36,6 +37,52 @@ fn get_buffer(buffers: &mut VecDeque<IpcBuffer>) -> Result<(usize, usize), Error
Ok((offset, length))
}

fn get_buffer<'a, T: NativeType>(
data: &'a [u8],
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
num_rows: usize,
) -> Result<&'a [u8], Error> {
let (offset, length) = get_buffer_bounds(buffers)?;

// verify that they are in-bounds
let values = data
.get(block_offset + offset..block_offset + offset + length)
.ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?;

// validate alignment
let v: &[T] = bytemuck::try_cast_slice(values)
.map_err(|_| Error::OutOfSpec("buffer not aligned for mmap".to_string()))?;

if v.len() < num_rows {
return Err(Error::OutOfSpec(
"buffer's length is too small in mmap".to_string(),
));
}

Ok(values)
}

fn get_validity<'a>(
data: &'a [u8],
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
null_count: usize,
) -> Result<Option<&'a [u8]>, Error> {
let validity = get_buffer_bounds(buffers)?;
let (offset, length) = validity;

Ok(if null_count > 0 {
// verify that they are in-bounds and get its pointer
Some(
data.get(block_offset + offset..block_offset + offset + length)
.ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?,
)
} else {
None
})
}

fn create_array<T: Clone + AsRef<[u8]>, I: Iterator<Item = Option<*const u8>>>(
data: T,
num_rows: usize,
Expand Down Expand Up @@ -91,7 +138,7 @@ unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
array.release = None;
}

fn mmap_utf8<O: Offset, T: Clone + AsRef<[u8]>>(
fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
Expand All @@ -109,32 +156,10 @@ fn mmap_utf8<O: Offset, T: Clone + AsRef<[u8]>>(

let data_ref = data.as_ref();

let validity = get_buffer(buffers)?;
let (offset, length) = validity;

let validity = if null_count > 0 {
// verify that they are in-bounds and get its pointer
Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr())
} else {
None
};
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

let offsets = get_buffer(buffers)?;
let (offset, length) = offsets;

// verify that they are in-bounds and get its pointer
let offsets = &data_ref[block_offset + offset..block_offset + offset + length];

// validate alignment
let _: &[O] = bytemuck::cast_slice(offsets);

let offsets = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();

let values = get_buffer(buffers)?;
let (offset, length) = values;

// verify that they are in-bounds and get its pointer
let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();
let offsets = get_buffer::<O>(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr();
let values = get_buffer::<u8>(data_ref, block_offset, buffers, 0)?.as_ptr();

// NOTE: offsets and values invariants are _not_ validated
Ok(create_array(
Expand Down Expand Up @@ -163,17 +188,9 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(

let data_ref = data.as_ref();

let validity = get_buffer(buffers)?;
let (offset, length) = validity;

let validity = if null_count > 0 {
// verify that they are in-bounds and get its pointer
Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr())
} else {
None
};
let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

let values = get_buffer(buffers)?;
let values = get_buffer_bounds(buffers)?;
let (offset, length) = values;

// verify that they are in-bounds and get its pointer
Expand All @@ -200,19 +217,98 @@ fn boolean<T: Clone + AsRef<[u8]>>(
unsafe { BooleanArray::try_from_ffi(array) }
}

fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
let data_ref = data.as_ref();

let num_rows: usize = node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let null_count: usize = node
.null_count()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

let values = get_buffer::<P>(data_ref, block_offset, buffers, num_rows)?.as_ptr();

Ok(create_array(
data,
num_rows,
null_count,
[validity, Some(values)].into_iter(),
))
}

fn primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<PrimitiveArray<P>, Error> {
let array = mmap_primitive::<P, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is safe because we just (correctly) constructed `ArrowArray`
unsafe { PrimitiveArray::try_from_ffi(array) }
}

unsafe fn utf8<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<Utf8Array<O>, Error> {
let array = mmap_utf8::<O, _>(data, node, block_offset, buffers)?;
let array = mmap_binary::<O, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is unsafe because `mmap_utf8` does not validate invariants
unsafe { Utf8Array::<O>::try_from_ffi(array) }
}

unsafe fn binary<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<BinaryArray<O>, Error> {
let array = mmap_binary::<O, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is unsafe because `mmap_utf8` does not validate invariants
unsafe { BinaryArray::<O>::try_from_ffi(array) }
}

fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
todo!()
}

unsafe fn list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<BinaryArray<O>, Error> {
let array = mmap_list::<O, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is unsafe because `mmap_utf8` does not validate invariants
unsafe { BinaryArray::<O>::try_from_ffi(array) }
}

/// Maps a memory region to an [`Array`].
pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
data: T,
Expand All @@ -227,10 +323,21 @@ pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?;
match data_type.to_physical_type() {
Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()),
Primitive(p) => with_match_primitive_type!(p, |$T| {
primitive::<$T, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
}),
Utf8 => utf8::<i32, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()),
LargeUtf8 => {
utf8::<i64, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
}
Binary => {
binary::<i32, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
}
LargeBinary => {
binary::<i64, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
}
List => list::<i32, _>(data, &node, block_offset, field_nodes, buffers, data_type)
.map(|x| x.boxed()),
_ => todo!(),
}
}

0 comments on commit a69347b

Please sign in to comment.