diff --git a/src/array/list/mod.rs b/src/array/list/mod.rs index 88ab6513234..13f781f680f 100644 --- a/src/array/list/mod.rs +++ b/src/array/list/mod.rs @@ -324,7 +324,7 @@ impl ListArray { /// Returns a the inner [`Field`] /// # Errors /// Panics iff the logical type is not consistent with this struct. - fn try_get_child(data_type: &DataType) -> Result<&Field, Error> { + pub fn try_get_child(data_type: &DataType) -> Result<&Field, Error> { if O::IS_LARGE { match data_type.to_logical_type() { DataType::LargeList(child) => Ok(child.as_ref()), diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index fb1da4fcd23..5f49411eedb 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use crate::array::{Array, BinaryArray, BooleanArray, FromFfi, Offset, PrimitiveArray, Utf8Array}; +use crate::array::{Array, ListArray, Offset}; use crate::datatypes::DataType; use crate::error::Error; @@ -8,14 +8,14 @@ use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::read::{IpcBuffer, Node}; use crate::types::NativeType; -use super::{ArrowArray, InternalArrowArray}; +use super::{try_from, ArrowArray, InternalArrowArray}; #[allow(dead_code)] struct PrivateData { // the owner of the pointers' regions data: T, buffers_ptr: Box<[*const std::os::raw::c_void]>, - //children_ptr: Box<[*mut ArrowArray]>, + children_ptr: Box<[*mut ArrowArray]>, dictionary_ptr: Option<*mut ArrowArray>, } @@ -83,42 +83,52 @@ fn get_validity<'a>( }) } -fn create_array, I: Iterator>>( +fn create_array< + T: Clone + AsRef<[u8]>, + I: Iterator>, + II: Iterator, +>( data: T, num_rows: usize, null_count: usize, buffers: I, + children: II, ) -> ArrowArray { - let n_buffers = buffers.size_hint().0 as i64; - let buffers_ptr = buffers .map(|maybe_buffer| match maybe_buffer { Some(b) => b as *const std::os::raw::c_void, None => std::ptr::null(), }) .collect::>(); + let n_buffers = buffers_ptr.len() as i64; + + let children_ptr = children + .map(|child| Box::into_raw(Box::new(child))) + .collect::>(); + let n_children = children_ptr.len() as i64; let mut private_data = Box::new(PrivateData:: { data, buffers_ptr, + children_ptr, dictionary_ptr: None, }); ArrowArray { length: num_rows as i64, null_count: null_count as i64, - offset: 0, + offset: 0, // IPC files are by definition not offset n_buffers, - n_children: 0, + n_children, buffers: private_data.buffers_ptr.as_mut_ptr(), - children: std::ptr::null_mut(), + children: private_data.children_ptr.as_mut_ptr(), dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), release: Some(release::), private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, } } -// callback used to drop [ArrowArray] when it is exported +/// callback used to drop [`ArrowArray`] when it is exported unsafe extern "C" fn release(array: *mut ArrowArray) { if array.is_null() { return; @@ -167,6 +177,7 @@ fn mmap_binary>( num_rows, null_count, [validity, Some(offsets), Some(values)].into_iter(), + [].into_iter(), )) } @@ -201,22 +212,10 @@ fn mmap_boolean>( num_rows, null_count, [validity, Some(values)].into_iter(), + [].into_iter(), )) } -fn boolean>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result { - let array = mmap_boolean(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is safe because we just (correctly) constructed `ArrowArray` - unsafe { BooleanArray::try_from_ffi(array) } -} - fn mmap_primitive>( data: T, node: &Node, @@ -244,100 +243,85 @@ fn mmap_primitive>( num_rows, null_count, [validity, Some(values)].into_iter(), + [].into_iter(), )) } -fn primitive>( +fn mmap_list>( data: T, node: &Node, block_offset: usize, + data_type: &DataType, + field_nodes: &mut VecDeque, buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_primitive::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is safe because we just (correctly) constructed `ArrowArray` - unsafe { PrimitiveArray::try_from_ffi(array) } -} +) -> Result { + let child = ListArray::::try_get_child(data_type)?.data_type(); -unsafe fn utf8>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_binary::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is unsafe because `mmap_utf8` does not validate invariants - unsafe { Utf8Array::::try_from_ffi(array) } -} + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; -unsafe fn binary>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_binary::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is unsafe because `mmap_utf8` does not validate invariants - unsafe { BinaryArray::::try_from_ffi(array) } -} + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; -fn mmap_list>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, -) -> Result { - todo!() -} + let data_ref = data.as_ref(); -unsafe fn list>( - data: T, - node: &Node, - block_offset: usize, - field_nodes: &mut VecDeque, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_list::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is unsafe because `mmap_utf8` does not validate invariants - unsafe { BinaryArray::::try_from_ffi(array) } + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + + let values = get_array(data.clone(), block_offset, child, field_nodes, buffers)?; + + // NOTE: offsets and values invariants are _not_ validated + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(offsets)].into_iter(), + [values].into_iter(), + )) } -/// Maps a memory region to an [`Array`]. -pub(crate) unsafe fn mmap>( +fn get_array>( data: T, block_offset: usize, - data_type: DataType, + data_type: &DataType, field_nodes: &mut VecDeque, buffers: &mut VecDeque, -) -> Result, Error> { +) -> Result { use crate::datatypes::PhysicalType::*; let node = field_nodes .pop_front() .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + match data_type.to_physical_type() { - Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + Boolean => mmap_boolean(data, &node, block_offset, buffers), Primitive(p) => with_match_primitive_type!(p, |$T| { - primitive::<$T, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + mmap_primitive::<$T, _>(data, &node, block_offset, buffers) }), - Utf8 => utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), - LargeUtf8 => { - utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) - } - Binary => { - binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers), + LargeBinary | LargeUtf8 => mmap_binary::(data, &node, block_offset, buffers), + List => mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers), + LargeList => { + mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers) } - LargeBinary => { - binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) - } - List => list::(data, &node, block_offset, field_nodes, buffers, data_type) - .map(|x| x.boxed()), _ => todo!(), } } + +/// Maps a memory region to an [`Array`]. +pub(crate) unsafe fn mmap>( + data: T, + block_offset: usize, + data_type: DataType, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result, Error> { + let array = get_array(data, block_offset, &data_type, field_nodes, buffers)?; + // The unsafety comes from the fact that `array` is not necessarily valid - + // the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data) + unsafe { try_from(InternalArrowArray::new(array, data_type)) } +} diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs index 60b152e5c61..57bdf3ef1cf 100644 --- a/src/mmap/mod.rs +++ b/src/mmap/mod.rs @@ -2,37 +2,28 @@ use std::collections::VecDeque; use crate::array::Array; +use crate::chunk::Chunk; use crate::error::Error; use crate::ffi::mmap; -use crate::io::ipc::read::read_file_metadata; use crate::io::ipc::read::reader::get_serialized_batch; -use crate::io::ipc::read::OutOfSpecKind; +use crate::io::ipc::read::FileMetadata; +use crate::io::ipc::read::{IpcBuffer, Node, OutOfSpecKind}; use crate::io::ipc::CONTINUATION_MARKER; use arrow_format::ipc::planus::ReadAsRoot; +use arrow_format::ipc::MessageRef; -/// something -/// # Safety -/// This operation is innerently unsafe as it assumes that `T` contains valid Arrow data -/// In particular: -/// * Offsets in variable-sized containers are valid; -/// * Utf8 is valid -pub unsafe fn map_chunk_unchecked>( - data: T, - index: usize, -) -> Result, Error> { - let mut bytes = data.as_ref(); - let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?; - - let block = metadata.blocks[index]; - +fn read_message( + mut bytes: &[u8], + block: arrow_format::ipc::Block, +) -> Result<(MessageRef, usize), Error> { let offset: usize = block .offset .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let meta_data_length: usize = block + let block_length: usize = block .meta_data_length .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; @@ -54,29 +45,67 @@ pub unsafe fn map_chunk_unchecked>( let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length]) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let batch = get_serialized_batch(&message)?; + Ok((message, offset + block_length)) +} + +fn read_batch<'a>( + message: &'a MessageRef, +) -> Result<(VecDeque>, VecDeque>), Error> { + let batch = get_serialized_batch(message)?; let buffers = batch .buffers() .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?; - let mut buffers = buffers.iter().collect::>(); + let buffers = buffers.iter().collect::>(); let field_nodes = batch .nodes() .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?; - let mut field_nodes = field_nodes.iter().collect::>(); - - println!("{:#?}", metadata.schema.fields); - let data_type = metadata.schema.fields[0].data_type.clone(); - println!("{:#?}", data_type); - - mmap::mmap( - data.clone(), - offset + meta_data_length, - data_type, - &mut field_nodes, - &mut buffers, - ) + let field_nodes = field_nodes.iter().collect::>(); + + Ok((buffers, field_nodes)) +} + +/// +/// # Errors +/// This function errors when: +/// * The IPC file is not valid +/// * the buffers on the file are un-aligned with their corresponding data. This can happen when: +/// * the file was written with 8-bit alignment +/// * the file contains type decimal 128 or 256 +/// # Safety +/// The caller must ensure that `data` contains a valid Arrow IPC file. +/// This operation is inerently unsafe as it assumes that `data` contains valid Arrow IPC file +/// For example: +/// * Offsets in variable-sized containers must be valid +/// * Utf8 is valid +pub unsafe fn mmap_unchecked>( + metadata: &FileMetadata, + data: T, + chunk: usize, +) -> Result>, Error> { + let block = metadata.blocks[chunk]; + let (message, offset) = read_message(data.as_ref(), block)?; + let (mut buffers, mut field_nodes) = read_batch(&message)?; + + let arrays = metadata + .schema + .fields + .iter() + .map(|f| &f.data_type) + .cloned() + .map(|data_type| { + mmap::mmap( + data.clone(), + offset, + data_type, + &mut field_nodes, + &mut buffers, + ) + }) + .collect::>()?; + + Chunk::try_new(arrays) } diff --git a/tests/it/io/ipc/mmap.rs b/tests/it/io/ipc/mmap.rs new file mode 100644 index 00000000000..ac4bd27b49a --- /dev/null +++ b/tests/it/io/ipc/mmap.rs @@ -0,0 +1,58 @@ +use arrow2::array::*; +use arrow2::chunk::Chunk; +use arrow2::datatypes::{Field, Schema}; +use arrow2::error::Result; +use arrow2::io::ipc::read::read_file_metadata; + +use super::write::file::write; + +fn round_trip(array: Box) -> Result<()> { + let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Chunk::try_new(vec![array.clone()])?; + + let data = write(&[columns], &schema, None, None)?; + + let metadata = read_file_metadata(&mut std::io::Cursor::new(&data))?; + + let new_array = unsafe { arrow2::mmap::mmap_unchecked(&metadata, data, 0)? }; + assert_eq!(new_array.into_arrays()[0], array); + Ok(()) +} + +#[test] +fn utf8() -> Result<()> { + let array = Utf8Array::::from([None, None, Some("bb")]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn primitive() -> Result<()> { + let array = PrimitiveArray::::from([None, None, Some(3)]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn boolean() -> Result<()> { + let array = BooleanArray::from([None, None, Some(true)]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn list() -> Result<()> { + let data = vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + None, + Some(vec![Some(4), None, Some(6)]), + ]; + + let mut array = MutableListArray::>::new(); + array.try_extend(data).unwrap(); + let array = array.into_box().slice(1, 2); + round_trip(array) +} diff --git a/tests/it/io/ipc/mod.rs b/tests/it/io/ipc/mod.rs index 63cce5b2e8f..6d3e71c5db4 100644 --- a/tests/it/io/ipc/mod.rs +++ b/tests/it/io/ipc/mod.rs @@ -15,3 +15,5 @@ mod read_stream_async; #[cfg(feature = "io_ipc_read_async")] mod read_file_async; + +mod mmap; diff --git a/tests/it/io/ipc/read/mmap.rs b/tests/it/io/ipc/read/mmap.rs deleted file mode 100644 index 232568e056c..00000000000 --- a/tests/it/io/ipc/read/mmap.rs +++ /dev/null @@ -1,40 +0,0 @@ -use arrow2::error::Result; -use arrow2::mmap::map_chunk_unchecked; - -use super::super::common::read_gzip_json; - -#[derive(Clone)] -struct Mmap(pub std::sync::Arc>); - -impl AsRef<[u8]> for Mmap { - #[inline] - fn as_ref(&self) -> &[u8] { - self.0.as_ref() - } -} - -fn test_file(version: &str, file_name: &str) -> Result<()> { - let testdata = crate::test_util::arrow_test_data(); - - let arrow_file = format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, file_name - ); - - let data = std::fs::read(arrow_file).unwrap(); - - let data = Mmap(std::sync::Arc::new(data)); - - // read expected JSON output - let (_schema, _, batches) = read_gzip_json(version, file_name)?; - - let array = unsafe { map_chunk_unchecked(data, 0)? }; - - assert_eq!(batches[0].arrays()[0], array); - Ok(()) -} - -#[test] -fn read_generated_100_primitive() -> Result<()> { - test_file("1.0.0-littleendian", "generated_primitive") -} diff --git a/tests/it/io/ipc/read/mod.rs b/tests/it/io/ipc/read/mod.rs index c3602cecc7a..6d1510b36c4 100644 --- a/tests/it/io/ipc/read/mod.rs +++ b/tests/it/io/ipc/read/mod.rs @@ -1,3 +1,2 @@ mod file; -mod mmap; mod stream; diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 98084e16f28..a899aa459d0 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -373,17 +373,3 @@ fn write_months_days_ns() -> Result<()> { let columns = Chunk::try_new(vec![array])?; round_trip(columns, schema, None, None) } - -#[test] -fn bla() -> Result<()> { - let array = Utf8Array::::from_slice(["aa", "bb"]) - .slice(1, 1) - .boxed(); - let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); - let columns = Chunk::try_new(vec![array.clone()])?; - - let data = write(&[columns], &schema, None, None)?; - let new_array = unsafe { arrow2::mmap::map_chunk_unchecked(data, 0)? }; - assert_eq!(new_array, array); - Ok(()) -} diff --git a/tests/it/io/ipc/write/mod.rs b/tests/it/io/ipc/write/mod.rs index f3a4fbfe57c..48ca5ea6cd4 100644 --- a/tests/it/io/ipc/write/mod.rs +++ b/tests/it/io/ipc/write/mod.rs @@ -1,3 +1,3 @@ -mod file; +pub mod file; mod file_append; mod stream;