Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added first impl of mmap
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 31, 2022
1 parent 9f6be52 commit c6879e2
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl InternalArrowArray {
}
}

impl ArrowArrayRef for Box<InternalArrowArray> {
impl ArrowArrayRef for InternalArrowArray {
/// the data_type as declared in the schema
fn data_type(&self) -> &DataType {
&self.data_type
Expand Down
154 changes: 154 additions & 0 deletions src/ffi/mmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::collections::VecDeque;

use crate::array::{Array, BooleanArray, FromFfi};
use crate::datatypes::DataType;
use crate::error::Error;

use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::read::{IpcBuffer, Node};

use super::{ArrowArray, InternalArrowArray};

#[allow(dead_code)]
struct PrivateData<T> {
// the owner of the pointers' regions
data: T,
buffers_ptr: Box<[*const std::os::raw::c_void]>,
//children_ptr: Box<[*mut ArrowArray]>,
dictionary_ptr: Option<*mut ArrowArray>,
}

fn get_buffer(buffers: &mut VecDeque<IpcBuffer>) -> Result<(usize, usize), Error> {
let buffer = buffers
.pop_front()
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?;

let offset: usize = buffer
.offset()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let length: usize = buffer
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

Ok((offset, length))
}

// callback used to drop [ArrowArray] when it is exported
unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
if array.is_null() {
return;
}
let array = &mut *array;

// take ownership of `private_data`, therefore dropping it
let private = Box::from_raw(array.private_data as *mut PrivateData<T>);
/*for child in private.children_ptr.iter() {
let _ = Box::from_raw(*child);
}*/

if let Some(ptr) = private.dictionary_ptr {
let _ = Box::from_raw(ptr);
}

array.release = None;
}

fn mmap_boolean<T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
let num_rows: usize = node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let null_count: usize = node
.null_count()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();

let validity = get_buffer(buffers)?;
let (offset, length) = validity;

let validity = if null_count > 0 {
// verify that they are in-bounds and get its pointer
Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr())
} else {
None
};

let values = get_buffer(buffers)?;
let (offset, length) = values;

// verify that they are in-bounds and get its pointer
let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();

// NOTE: this is valid for Boolean, but for others (e.g. Utf8), we need to validate other invariants
// or mark this as unsafe

let buffers_ptr = [validity, Some(values)]
.iter()
.map(|maybe_buffer| match maybe_buffer {
Some(b) => *b as *const std::os::raw::c_void,
None => std::ptr::null(),
})
.collect::<Box<[_]>>();
let n_buffers = buffers.len() as i64;

let mut private_data = Box::new(PrivateData::<T> {
data: data.clone(),
buffers_ptr,
dictionary_ptr: None,
});

Ok(ArrowArray {
length: num_rows as i64,
null_count: null_count as i64,
offset: 0,
n_buffers,
n_children: 0,
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: std::ptr::null_mut(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(release::<T>),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
})
}

fn boolean<T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<BooleanArray, Error> {
let array = mmap_boolean(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is safe because we just (correctly) constructed `ArrowArray`
unsafe { BooleanArray::try_from_ffi(array) }
}

/// Maps a memory region to an [`Array`].
pub fn mmap<T: Clone + AsRef<[u8]>>(
data: T,
block_offset: usize,
data_type: DataType,
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<Box<dyn Array>, Error> {
use crate::datatypes::PhysicalType::*;
let node = field_nodes
.pop_front()
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?;
match data_type.to_physical_type() {
Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()),
_ => todo!(),
}
}
9 changes: 8 additions & 1 deletion src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@
mod array;
mod bridge;
mod generated;
#[cfg(feature = "io_ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))]
mod mmap;
mod schema;
mod stream;

#[cfg(feature = "io_ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))]
pub use mmap::mmap;

pub(crate) use array::try_from;
pub(crate) use array::{ArrowArrayRef, InternalArrowArray};

Expand Down Expand Up @@ -44,5 +51,5 @@ pub unsafe fn import_array_from_c(
array: ArrowArray,
data_type: DataType,
) -> Result<Box<dyn Array>> {
try_from(Box::new(InternalArrowArray::new(array, data_type)))
try_from(InternalArrowArray::new(array, data_type))
}
72 changes: 72 additions & 0 deletions src/io/ipc/mmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//! Memory maps regions defined on the IPC format into [`Array`].
use std::collections::VecDeque;

use crate::array::Array;
use crate::error::Error;
use crate::ffi::mmap;

use super::read::read_file_metadata;
use super::read::reader::get_serialized_batch;
use super::read::OutOfSpecKind;
use super::CONTINUATION_MARKER;

use arrow_format::ipc::planus::ReadAsRoot;

/// something
pub fn map_chunk<T: Clone + AsRef<[u8]>>(data: T, index: usize) -> Result<Box<dyn Array>, Error> {
let mut bytes = data.as_ref();
let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?;

let block = metadata.blocks[index];

let offset: usize = block
.offset
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let meta_data_length: usize = block
.meta_data_length
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

bytes = &bytes[offset..];
let mut message_length = bytes[..4].try_into().unwrap();
bytes = &bytes[4..];

if message_length == CONTINUATION_MARKER {
// continuation marker encountered, read message next
message_length = bytes[..4].try_into().unwrap();
bytes = &bytes[4..];
};

let message_length: usize = i32::from_le_bytes(message_length)
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length])
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_serialized_batch(&message)?;

let buffers = batch
.buffers()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?;
let mut buffers = buffers.iter().collect::<VecDeque<_>>();

let field_nodes = batch
.nodes()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))?
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?;
let mut field_nodes = field_nodes.iter().collect::<VecDeque<_>>();

let data_type = metadata.schema.fields[0].data_type.clone();

mmap(
data.clone(),
offset + meta_data_length,
data_type,
&mut field_nodes,
&mut buffers,
)
}
1 change: 1 addition & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ mod compression;
mod endianess;

pub mod append;
pub mod mmap;
pub mod read;
pub mod write;

Expand Down
17 changes: 6 additions & 11 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
deserialize_footer(&serialized_footer, end - start)
}

pub(super) fn get_serialized_batch<'a>(
pub(crate) fn get_serialized_batch<'a>(
message: &'a arrow_format::ipc::MessageRef,
) -> Result<arrow_format::ipc::RecordBatchRef<'a>> {
let header = message
Expand Down Expand Up @@ -268,6 +268,11 @@ pub fn read_batch<R: Read + Seek>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let length: u64 = block
.meta_data_length
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

// read length
reader.seek(SeekFrom::Start(offset))?;
let mut meta_buf = [0; 4];
Expand All @@ -292,16 +297,6 @@ pub fn read_batch<R: Read + Seek>(

let batch = get_serialized_batch(&message)?;

let offset: u64 = block
.offset
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let length: u64 = block
.meta_data_length
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

read_record_batch(
batch,
&metadata.schema.fields,
Expand Down
44 changes: 44 additions & 0 deletions tests/it/io/ipc/read/mmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::fs::File;
use std::io::Read;

use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::ipc::mmap::map_chunk;

use super::super::common::read_gzip_json;

#[derive(Clone)]
struct Mmap(pub std::sync::Arc<Vec<u8>>);

impl AsRef<[u8]> for Mmap {
#[inline]
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

fn test_file(version: &str, file_name: &str) -> Result<()> {
let testdata = crate::test_util::arrow_test_data();

let arrow_file = format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, file_name
);

let data = std::fs::read(arrow_file).unwrap();

let data = Mmap(std::sync::Arc::new(data));

// read expected JSON output
let (_schema, _, batches) = read_gzip_json(version, file_name)?;

let array = unsafe { map_chunk(data, 0)? };

assert_eq!(batches[0].arrays()[0], array);
Ok(())
}

#[test]
fn read_generated_100_primitive() -> Result<()> {
test_file("1.0.0-littleendian", "generated_primitive")
}
1 change: 1 addition & 0 deletions tests/it/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
mod file;
mod mmap;
mod stream;

0 comments on commit c6879e2

Please sign in to comment.