From c6879e27a2d1cd8286fd8513920f16c0f07128b7 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 31 Jul 2022 07:33:43 +0000 Subject: [PATCH] Added first impl of mmap --- src/ffi/array.rs | 2 +- src/ffi/mmap.rs | 154 +++++++++++++++++++++++++++++++++++ src/ffi/mod.rs | 9 +- src/io/ipc/mmap.rs | 72 ++++++++++++++++ src/io/ipc/mod.rs | 1 + src/io/ipc/read/reader.rs | 17 ++-- tests/it/io/ipc/read/mmap.rs | 44 ++++++++++ tests/it/io/ipc/read/mod.rs | 1 + 8 files changed, 287 insertions(+), 13 deletions(-) create mode 100644 src/ffi/mmap.rs create mode 100644 src/io/ipc/mmap.rs create mode 100644 tests/it/io/ipc/read/mmap.rs diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 46858deff73..b50fdaa7259 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -493,7 +493,7 @@ impl InternalArrowArray { } } -impl ArrowArrayRef for Box { +impl ArrowArrayRef for InternalArrowArray { /// the data_type as declared in the schema fn data_type(&self) -> &DataType { &self.data_type diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs new file mode 100644 index 00000000000..2ad8d8b1607 --- /dev/null +++ b/src/ffi/mmap.rs @@ -0,0 +1,154 @@ +use std::collections::VecDeque; + +use crate::array::{Array, BooleanArray, FromFfi}; +use crate::datatypes::DataType; +use crate::error::Error; + +use crate::io::ipc::read::OutOfSpecKind; +use crate::io::ipc::read::{IpcBuffer, Node}; + +use super::{ArrowArray, InternalArrowArray}; + +#[allow(dead_code)] +struct PrivateData { + // the owner of the pointers' regions + data: T, + buffers_ptr: Box<[*const std::os::raw::c_void]>, + //children_ptr: Box<[*mut ArrowArray]>, + dictionary_ptr: Option<*mut ArrowArray>, +} + +fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { + let buffer = buffers + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + + let offset: usize = buffer + .offset() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let length: usize = buffer + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok((offset, length)) +} + +// callback used to drop [ArrowArray] when it is exported +unsafe extern "C" fn release(array: *mut ArrowArray) { + if array.is_null() { + return; + } + let array = &mut *array; + + // take ownership of `private_data`, therefore dropping it + let private = Box::from_raw(array.private_data as *mut PrivateData); + /*for child in private.children_ptr.iter() { + let _ = Box::from_raw(*child); + }*/ + + if let Some(ptr) = private.dictionary_ptr { + let _ = Box::from_raw(ptr); + } + + array.release = None; +} + +fn mmap_boolean>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_buffer(buffers)?; + let (offset, length) = validity; + + let validity = if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) + } else { + None + }; + + let values = get_buffer(buffers)?; + let (offset, length) = values; + + // verify that they are in-bounds and get its pointer + let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + + // NOTE: this is valid for Boolean, but for others (e.g. Utf8), we need to validate other invariants + // or mark this as unsafe + + let buffers_ptr = [validity, Some(values)] + .iter() + .map(|maybe_buffer| match maybe_buffer { + Some(b) => *b as *const std::os::raw::c_void, + None => std::ptr::null(), + }) + .collect::>(); + let n_buffers = buffers.len() as i64; + + let mut private_data = Box::new(PrivateData:: { + data: data.clone(), + buffers_ptr, + dictionary_ptr: None, + }); + + Ok(ArrowArray { + length: num_rows as i64, + null_count: null_count as i64, + offset: 0, + n_buffers, + n_children: 0, + buffers: private_data.buffers_ptr.as_mut_ptr(), + children: std::ptr::null_mut(), + dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), + release: Some(release::), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + }) +} + +fn boolean>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result { + let array = mmap_boolean(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is safe because we just (correctly) constructed `ArrowArray` + unsafe { BooleanArray::try_from_ffi(array) } +} + +/// Maps a memory region to an [`Array`]. +pub fn mmap>( + data: T, + block_offset: usize, + data_type: DataType, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result, Error> { + use crate::datatypes::PhysicalType::*; + let node = field_nodes + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + match data_type.to_physical_type() { + Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + _ => todo!(), + } +} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 10fc2fb994a..d3766254794 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,9 +3,16 @@ mod array; mod bridge; mod generated; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +mod mmap; mod schema; mod stream; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +pub use mmap::mmap; + pub(crate) use array::try_from; pub(crate) use array::{ArrowArrayRef, InternalArrowArray}; @@ -44,5 +51,5 @@ pub unsafe fn import_array_from_c( array: ArrowArray, data_type: DataType, ) -> Result> { - try_from(Box::new(InternalArrowArray::new(array, data_type))) + try_from(InternalArrowArray::new(array, data_type)) } diff --git a/src/io/ipc/mmap.rs b/src/io/ipc/mmap.rs new file mode 100644 index 00000000000..2228472d159 --- /dev/null +++ b/src/io/ipc/mmap.rs @@ -0,0 +1,72 @@ +//! Memory maps regions defined on the IPC format into [`Array`]. +use std::collections::VecDeque; + +use crate::array::Array; +use crate::error::Error; +use crate::ffi::mmap; + +use super::read::read_file_metadata; +use super::read::reader::get_serialized_batch; +use super::read::OutOfSpecKind; +use super::CONTINUATION_MARKER; + +use arrow_format::ipc::planus::ReadAsRoot; + +/// something +pub fn map_chunk>(data: T, index: usize) -> Result, Error> { + let mut bytes = data.as_ref(); + let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?; + + let block = metadata.blocks[index]; + + let offset: usize = block + .offset + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let meta_data_length: usize = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + bytes = &bytes[offset..]; + let mut message_length = bytes[..4].try_into().unwrap(); + bytes = &bytes[4..]; + + if message_length == CONTINUATION_MARKER { + // continuation marker encountered, read message next + message_length = bytes[..4].try_into().unwrap(); + bytes = &bytes[4..]; + }; + + let message_length: usize = i32::from_le_bytes(message_length) + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length]) + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; + + let batch = get_serialized_batch(&message)?; + + let buffers = batch + .buffers() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?; + let mut buffers = buffers.iter().collect::>(); + + let field_nodes = batch + .nodes() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?; + let mut field_nodes = field_nodes.iter().collect::>(); + + let data_type = metadata.schema.fields[0].data_type.clone(); + + mmap( + data.clone(), + offset + meta_data_length, + data_type, + &mut field_nodes, + &mut buffers, + ) +} diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index 3d8a8ccb994..b2273a3f607 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -77,6 +77,7 @@ mod compression; mod endianess; pub mod append; +pub mod mmap; pub mod read; pub mod write; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index ab72adec96e..d46090bac43 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -230,7 +230,7 @@ pub fn read_file_metadata(reader: &mut R) -> Result( +pub(crate) fn get_serialized_batch<'a>( message: &'a arrow_format::ipc::MessageRef, ) -> Result> { let header = message @@ -268,6 +268,11 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length: u64 = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + // read length reader.seek(SeekFrom::Start(offset))?; let mut meta_buf = [0; 4]; @@ -292,16 +297,6 @@ pub fn read_batch( let batch = get_serialized_batch(&message)?; - let offset: u64 = block - .offset - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let length: u64 = block - .meta_data_length - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - read_record_batch( batch, &metadata.schema.fields, diff --git a/tests/it/io/ipc/read/mmap.rs b/tests/it/io/ipc/read/mmap.rs new file mode 100644 index 00000000000..78ac76bc6b2 --- /dev/null +++ b/tests/it/io/ipc/read/mmap.rs @@ -0,0 +1,44 @@ +use std::fs::File; +use std::io::Read; + +use arrow2::chunk::Chunk; +use arrow2::error::Result; +use arrow2::io::ipc::mmap::map_chunk; + +use super::super::common::read_gzip_json; + +#[derive(Clone)] +struct Mmap(pub std::sync::Arc>); + +impl AsRef<[u8]> for Mmap { + #[inline] + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +fn test_file(version: &str, file_name: &str) -> Result<()> { + let testdata = crate::test_util::arrow_test_data(); + + let arrow_file = format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, file_name + ); + + let data = std::fs::read(arrow_file).unwrap(); + + let data = Mmap(std::sync::Arc::new(data)); + + // read expected JSON output + let (_schema, _, batches) = read_gzip_json(version, file_name)?; + + let array = unsafe { map_chunk(data, 0)? }; + + assert_eq!(batches[0].arrays()[0], array); + Ok(()) +} + +#[test] +fn read_generated_100_primitive() -> Result<()> { + test_file("1.0.0-littleendian", "generated_primitive") +} diff --git a/tests/it/io/ipc/read/mod.rs b/tests/it/io/ipc/read/mod.rs index 6d1510b36c4..c3602cecc7a 100644 --- a/tests/it/io/ipc/read/mod.rs +++ b/tests/it/io/ipc/read/mod.rs @@ -1,2 +1,3 @@ mod file; +mod mmap; mod stream;