From 8d4f7414cbfca6c1a4876934b6a0ec4ab914f5b4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 31 Jul 2022 07:33:43 +0000 Subject: [PATCH 1/8] Added first impl of mmap --- src/ffi/array.rs | 2 +- src/ffi/mmap.rs | 154 +++++++++++++++++++++++++++++++++++ src/ffi/mod.rs | 9 +- src/io/ipc/mmap.rs | 72 ++++++++++++++++ src/io/ipc/mod.rs | 1 + src/io/ipc/read/reader.rs | 17 ++-- tests/it/io/ipc/read/mmap.rs | 40 +++++++++ tests/it/io/ipc/read/mod.rs | 1 + 8 files changed, 283 insertions(+), 13 deletions(-) create mode 100644 src/ffi/mmap.rs create mode 100644 src/io/ipc/mmap.rs create mode 100644 tests/it/io/ipc/read/mmap.rs diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 46858deff73..b50fdaa7259 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -493,7 +493,7 @@ impl InternalArrowArray { } } -impl ArrowArrayRef for Box { +impl ArrowArrayRef for InternalArrowArray { /// the data_type as declared in the schema fn data_type(&self) -> &DataType { &self.data_type diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs new file mode 100644 index 00000000000..2ad8d8b1607 --- /dev/null +++ b/src/ffi/mmap.rs @@ -0,0 +1,154 @@ +use std::collections::VecDeque; + +use crate::array::{Array, BooleanArray, FromFfi}; +use crate::datatypes::DataType; +use crate::error::Error; + +use crate::io::ipc::read::OutOfSpecKind; +use crate::io::ipc::read::{IpcBuffer, Node}; + +use super::{ArrowArray, InternalArrowArray}; + +#[allow(dead_code)] +struct PrivateData { + // the owner of the pointers' regions + data: T, + buffers_ptr: Box<[*const std::os::raw::c_void]>, + //children_ptr: Box<[*mut ArrowArray]>, + dictionary_ptr: Option<*mut ArrowArray>, +} + +fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { + let buffer = buffers + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + + let offset: usize = buffer + .offset() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let length: usize = buffer + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok((offset, length)) +} + +// callback used to drop [ArrowArray] when it is exported +unsafe extern "C" fn release(array: *mut ArrowArray) { + if array.is_null() { + return; + } + let array = &mut *array; + + // take ownership of `private_data`, therefore dropping it + let private = Box::from_raw(array.private_data as *mut PrivateData); + /*for child in private.children_ptr.iter() { + let _ = Box::from_raw(*child); + }*/ + + if let Some(ptr) = private.dictionary_ptr { + let _ = Box::from_raw(ptr); + } + + array.release = None; +} + +fn mmap_boolean>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_buffer(buffers)?; + let (offset, length) = validity; + + let validity = if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) + } else { + None + }; + + let values = get_buffer(buffers)?; + let (offset, length) = values; + + // verify that they are in-bounds and get its pointer + let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + + // NOTE: this is valid for Boolean, but for others (e.g. Utf8), we need to validate other invariants + // or mark this as unsafe + + let buffers_ptr = [validity, Some(values)] + .iter() + .map(|maybe_buffer| match maybe_buffer { + Some(b) => *b as *const std::os::raw::c_void, + None => std::ptr::null(), + }) + .collect::>(); + let n_buffers = buffers.len() as i64; + + let mut private_data = Box::new(PrivateData:: { + data: data.clone(), + buffers_ptr, + dictionary_ptr: None, + }); + + Ok(ArrowArray { + length: num_rows as i64, + null_count: null_count as i64, + offset: 0, + n_buffers, + n_children: 0, + buffers: private_data.buffers_ptr.as_mut_ptr(), + children: std::ptr::null_mut(), + dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), + release: Some(release::), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + }) +} + +fn boolean>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result { + let array = mmap_boolean(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is safe because we just (correctly) constructed `ArrowArray` + unsafe { BooleanArray::try_from_ffi(array) } +} + +/// Maps a memory region to an [`Array`]. +pub fn mmap>( + data: T, + block_offset: usize, + data_type: DataType, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result, Error> { + use crate::datatypes::PhysicalType::*; + let node = field_nodes + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + match data_type.to_physical_type() { + Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + _ => todo!(), + } +} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 10fc2fb994a..d3766254794 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,9 +3,16 @@ mod array; mod bridge; mod generated; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +mod mmap; mod schema; mod stream; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +pub use mmap::mmap; + pub(crate) use array::try_from; pub(crate) use array::{ArrowArrayRef, InternalArrowArray}; @@ -44,5 +51,5 @@ pub unsafe fn import_array_from_c( array: ArrowArray, data_type: DataType, ) -> Result> { - try_from(Box::new(InternalArrowArray::new(array, data_type))) + try_from(InternalArrowArray::new(array, data_type)) } diff --git a/src/io/ipc/mmap.rs b/src/io/ipc/mmap.rs new file mode 100644 index 00000000000..2228472d159 --- /dev/null +++ b/src/io/ipc/mmap.rs @@ -0,0 +1,72 @@ +//! Memory maps regions defined on the IPC format into [`Array`]. +use std::collections::VecDeque; + +use crate::array::Array; +use crate::error::Error; +use crate::ffi::mmap; + +use super::read::read_file_metadata; +use super::read::reader::get_serialized_batch; +use super::read::OutOfSpecKind; +use super::CONTINUATION_MARKER; + +use arrow_format::ipc::planus::ReadAsRoot; + +/// something +pub fn map_chunk>(data: T, index: usize) -> Result, Error> { + let mut bytes = data.as_ref(); + let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?; + + let block = metadata.blocks[index]; + + let offset: usize = block + .offset + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let meta_data_length: usize = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + bytes = &bytes[offset..]; + let mut message_length = bytes[..4].try_into().unwrap(); + bytes = &bytes[4..]; + + if message_length == CONTINUATION_MARKER { + // continuation marker encountered, read message next + message_length = bytes[..4].try_into().unwrap(); + bytes = &bytes[4..]; + }; + + let message_length: usize = i32::from_le_bytes(message_length) + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length]) + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; + + let batch = get_serialized_batch(&message)?; + + let buffers = batch + .buffers() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?; + let mut buffers = buffers.iter().collect::>(); + + let field_nodes = batch + .nodes() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?; + let mut field_nodes = field_nodes.iter().collect::>(); + + let data_type = metadata.schema.fields[0].data_type.clone(); + + mmap( + data.clone(), + offset + meta_data_length, + data_type, + &mut field_nodes, + &mut buffers, + ) +} diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index 3d8a8ccb994..b2273a3f607 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -77,6 +77,7 @@ mod compression; mod endianess; pub mod append; +pub mod mmap; pub mod read; pub mod write; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index ab72adec96e..d46090bac43 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -230,7 +230,7 @@ pub fn read_file_metadata(reader: &mut R) -> Result( +pub(crate) fn get_serialized_batch<'a>( message: &'a arrow_format::ipc::MessageRef, ) -> Result> { let header = message @@ -268,6 +268,11 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length: u64 = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + // read length reader.seek(SeekFrom::Start(offset))?; let mut meta_buf = [0; 4]; @@ -292,16 +297,6 @@ pub fn read_batch( let batch = get_serialized_batch(&message)?; - let offset: u64 = block - .offset - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let length: u64 = block - .meta_data_length - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - read_record_batch( batch, &metadata.schema.fields, diff --git a/tests/it/io/ipc/read/mmap.rs b/tests/it/io/ipc/read/mmap.rs new file mode 100644 index 00000000000..076b6be5484 --- /dev/null +++ b/tests/it/io/ipc/read/mmap.rs @@ -0,0 +1,40 @@ +use arrow2::error::Result; +use arrow2::io::ipc::mmap::map_chunk; + +use super::super::common::read_gzip_json; + +#[derive(Clone)] +struct Mmap(pub std::sync::Arc>); + +impl AsRef<[u8]> for Mmap { + #[inline] + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +fn test_file(version: &str, file_name: &str) -> Result<()> { + let testdata = crate::test_util::arrow_test_data(); + + let arrow_file = format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, file_name + ); + + let data = std::fs::read(arrow_file).unwrap(); + + let data = Mmap(std::sync::Arc::new(data)); + + // read expected JSON output + let (_schema, _, batches) = read_gzip_json(version, file_name)?; + + let array = map_chunk(data, 0)?; + + assert_eq!(batches[0].arrays()[0], array); + Ok(()) +} + +#[test] +fn read_generated_100_primitive() -> Result<()> { + test_file("1.0.0-littleendian", "generated_primitive") +} diff --git a/tests/it/io/ipc/read/mod.rs b/tests/it/io/ipc/read/mod.rs index 6d1510b36c4..c3602cecc7a 100644 --- a/tests/it/io/ipc/read/mod.rs +++ b/tests/it/io/ipc/read/mod.rs @@ -1,2 +1,3 @@ mod file; +mod mmap; mod stream; From 741e53c0ddfd35e13bf3d9a929a3b32f9ff5115c Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 1 Aug 2022 17:46:54 +0000 Subject: [PATCH 2/8] Added mmap for utf8 --- src/ffi/mmap.rs | 142 ++++++++++++++++++++++------ src/ffi/mod.rs | 6 +- src/io/ipc/mod.rs | 3 +- src/lib.rs | 5 +- src/{io/ipc/mmap.rs => mmap/mod.rs} | 22 +++-- tests/it/io/ipc/read/mmap.rs | 4 +- tests/it/io/ipc/write/file.rs | 14 +++ 7 files changed, 150 insertions(+), 46 deletions(-) rename src/{io/ipc/mmap.rs => mmap/mod.rs} (77%) diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 2ad8d8b1607..d3f2944765c 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use crate::array::{Array, BooleanArray, FromFfi}; +use crate::array::{Array, BooleanArray, FromFfi, Offset, Utf8Array}; use crate::datatypes::DataType; use crate::error::Error; @@ -36,6 +36,41 @@ fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error Ok((offset, length)) } +fn create_array, I: Iterator>>( + data: T, + num_rows: usize, + null_count: usize, + buffers: I, +) -> ArrowArray { + let n_buffers = buffers.size_hint().0 as i64; + + let buffers_ptr = buffers + .map(|maybe_buffer| match maybe_buffer { + Some(b) => b as *const std::os::raw::c_void, + None => std::ptr::null(), + }) + .collect::>(); + + let mut private_data = Box::new(PrivateData:: { + data, + buffers_ptr, + dictionary_ptr: None, + }); + + ArrowArray { + length: num_rows as i64, + null_count: null_count as i64, + offset: 0, + n_buffers, + n_children: 0, + buffers: private_data.buffers_ptr.as_mut_ptr(), + children: std::ptr::null_mut(), + dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), + release: Some(release::), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + } +} + // callback used to drop [ArrowArray] when it is exported unsafe extern "C" fn release(array: *mut ArrowArray) { if array.is_null() { @@ -56,7 +91,7 @@ unsafe extern "C" fn release(array: *mut ArrowArray) { array.release = None; } -fn mmap_boolean>( +fn mmap_utf8>( data: T, node: &Node, block_offset: usize, @@ -84,42 +119,72 @@ fn mmap_boolean>( None }; + let offsets = get_buffer(buffers)?; + let (offset, length) = offsets; + + // verify that they are in-bounds and get its pointer + let offsets = &data_ref[block_offset + offset..block_offset + offset + length]; + + // validate alignment + let _: &[O] = bytemuck::cast_slice(offsets); + + let offsets = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + let values = get_buffer(buffers)?; let (offset, length) = values; // verify that they are in-bounds and get its pointer let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); - // NOTE: this is valid for Boolean, but for others (e.g. Utf8), we need to validate other invariants - // or mark this as unsafe + // NOTE: offsets and values invariants are _not_ validated + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(offsets), Some(values)].into_iter(), + )) +} - let buffers_ptr = [validity, Some(values)] - .iter() - .map(|maybe_buffer| match maybe_buffer { - Some(b) => *b as *const std::os::raw::c_void, - None => std::ptr::null(), - }) - .collect::>(); - let n_buffers = buffers.len() as i64; +fn mmap_boolean>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let mut private_data = Box::new(PrivateData:: { - data: data.clone(), - buffers_ptr, - dictionary_ptr: None, - }); + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - Ok(ArrowArray { - length: num_rows as i64, - null_count: null_count as i64, - offset: 0, - n_buffers, - n_children: 0, - buffers: private_data.buffers_ptr.as_mut_ptr(), - children: std::ptr::null_mut(), - dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), - release: Some(release::), - private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, - }) + let data_ref = data.as_ref(); + + let validity = get_buffer(buffers)?; + let (offset, length) = validity; + + let validity = if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) + } else { + None + }; + + let values = get_buffer(buffers)?; + let (offset, length) = values; + + // verify that they are in-bounds and get its pointer + let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + )) } fn boolean>( @@ -135,8 +200,21 @@ fn boolean>( unsafe { BooleanArray::try_from_ffi(array) } } +unsafe fn utf8>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_utf8::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is unsafe because `mmap_utf8` does not validate invariants + unsafe { Utf8Array::::try_from_ffi(array) } +} + /// Maps a memory region to an [`Array`]. -pub fn mmap>( +pub(crate) unsafe fn mmap>( data: T, block_offset: usize, data_type: DataType, @@ -149,6 +227,10 @@ pub fn mmap>( .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; match data_type.to_physical_type() { Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + Utf8 => utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + LargeUtf8 => { + utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + } _ => todo!(), } } diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index d3766254794..b416d1c4648 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -5,14 +5,10 @@ mod bridge; mod generated; #[cfg(feature = "io_ipc")] #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] -mod mmap; +pub(crate) mod mmap; mod schema; mod stream; -#[cfg(feature = "io_ipc")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] -pub use mmap::mmap; - pub(crate) use array::try_from; pub(crate) use array::{ArrowArrayRef, InternalArrowArray}; diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index b2273a3f607..1f8476cef70 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -77,12 +77,11 @@ mod compression; mod endianess; pub mod append; -pub mod mmap; pub mod read; pub mod write; const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; -const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; +pub(crate) const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; /// Struct containing `dictionary_id` and nested `IpcField`, allowing users /// to specify the dictionary ids of the IPC fields when writing to IPC. diff --git a/src/lib.rs b/src/lib.rs index 01fe444d5cf..16bd56ed04c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,13 +17,16 @@ pub mod bitmap; pub mod buffer; pub mod chunk; pub mod error; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +pub mod mmap; + pub mod scalar; pub mod trusted_len; pub mod types; pub mod compute; pub mod io; -//pub mod record_batch; pub mod temporal_conversions; pub mod datatypes; diff --git a/src/io/ipc/mmap.rs b/src/mmap/mod.rs similarity index 77% rename from src/io/ipc/mmap.rs rename to src/mmap/mod.rs index 2228472d159..60b152e5c61 100644 --- a/src/io/ipc/mmap.rs +++ b/src/mmap/mod.rs @@ -5,15 +5,23 @@ use crate::array::Array; use crate::error::Error; use crate::ffi::mmap; -use super::read::read_file_metadata; -use super::read::reader::get_serialized_batch; -use super::read::OutOfSpecKind; -use super::CONTINUATION_MARKER; +use crate::io::ipc::read::read_file_metadata; +use crate::io::ipc::read::reader::get_serialized_batch; +use crate::io::ipc::read::OutOfSpecKind; +use crate::io::ipc::CONTINUATION_MARKER; use arrow_format::ipc::planus::ReadAsRoot; /// something -pub fn map_chunk>(data: T, index: usize) -> Result, Error> { +/// # Safety +/// This operation is innerently unsafe as it assumes that `T` contains valid Arrow data +/// In particular: +/// * Offsets in variable-sized containers are valid; +/// * Utf8 is valid +pub unsafe fn map_chunk_unchecked>( + data: T, + index: usize, +) -> Result, Error> { let mut bytes = data.as_ref(); let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?; @@ -60,9 +68,11 @@ pub fn map_chunk>(data: T, index: usize) -> Result>(); + println!("{:#?}", metadata.schema.fields); let data_type = metadata.schema.fields[0].data_type.clone(); + println!("{:#?}", data_type); - mmap( + mmap::mmap( data.clone(), offset + meta_data_length, data_type, diff --git a/tests/it/io/ipc/read/mmap.rs b/tests/it/io/ipc/read/mmap.rs index 076b6be5484..232568e056c 100644 --- a/tests/it/io/ipc/read/mmap.rs +++ b/tests/it/io/ipc/read/mmap.rs @@ -1,5 +1,5 @@ use arrow2::error::Result; -use arrow2::io::ipc::mmap::map_chunk; +use arrow2::mmap::map_chunk_unchecked; use super::super::common::read_gzip_json; @@ -28,7 +28,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { // read expected JSON output let (_schema, _, batches) = read_gzip_json(version, file_name)?; - let array = map_chunk(data, 0)?; + let array = unsafe { map_chunk_unchecked(data, 0)? }; assert_eq!(batches[0].arrays()[0], array); Ok(()) diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index a899aa459d0..98084e16f28 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -373,3 +373,17 @@ fn write_months_days_ns() -> Result<()> { let columns = Chunk::try_new(vec![array])?; round_trip(columns, schema, None, None) } + +#[test] +fn bla() -> Result<()> { + let array = Utf8Array::::from_slice(["aa", "bb"]) + .slice(1, 1) + .boxed(); + let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Chunk::try_new(vec![array.clone()])?; + + let data = write(&[columns], &schema, None, None)?; + let new_array = unsafe { arrow2::mmap::map_chunk_unchecked(data, 0)? }; + assert_eq!(new_array, array); + Ok(()) +} From a69347bba2c7d8ebfbf26e3761e7b40c087aaa68 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 3 Aug 2022 04:02:13 +0000 Subject: [PATCH 3/8] More types --- src/ffi/mmap.rs | 185 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 146 insertions(+), 39 deletions(-) diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index d3f2944765c..fb1da4fcd23 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,11 +1,12 @@ use std::collections::VecDeque; -use crate::array::{Array, BooleanArray, FromFfi, Offset, Utf8Array}; +use crate::array::{Array, BinaryArray, BooleanArray, FromFfi, Offset, PrimitiveArray, Utf8Array}; use crate::datatypes::DataType; use crate::error::Error; use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::read::{IpcBuffer, Node}; +use crate::types::NativeType; use super::{ArrowArray, InternalArrowArray}; @@ -18,7 +19,7 @@ struct PrivateData { dictionary_ptr: Option<*mut ArrowArray>, } -fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { +fn get_buffer_bounds(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { let buffer = buffers .pop_front() .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; @@ -36,6 +37,52 @@ fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error Ok((offset, length)) } +fn get_buffer<'a, T: NativeType>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + num_rows: usize, +) -> Result<&'a [u8], Error> { + let (offset, length) = get_buffer_bounds(buffers)?; + + // verify that they are in-bounds + let values = data + .get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?; + + // validate alignment + let v: &[T] = bytemuck::try_cast_slice(values) + .map_err(|_| Error::OutOfSpec("buffer not aligned for mmap".to_string()))?; + + if v.len() < num_rows { + return Err(Error::OutOfSpec( + "buffer's length is too small in mmap".to_string(), + )); + } + + Ok(values) +} + +fn get_validity<'a>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + null_count: usize, +) -> Result, Error> { + let validity = get_buffer_bounds(buffers)?; + let (offset, length) = validity; + + Ok(if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some( + data.get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?, + ) + } else { + None + }) +} + fn create_array, I: Iterator>>( data: T, num_rows: usize, @@ -91,7 +138,7 @@ unsafe extern "C" fn release(array: *mut ArrowArray) { array.release = None; } -fn mmap_utf8>( +fn mmap_binary>( data: T, node: &Node, block_offset: usize, @@ -109,32 +156,10 @@ fn mmap_utf8>( let data_ref = data.as_ref(); - let validity = get_buffer(buffers)?; - let (offset, length) = validity; - - let validity = if null_count > 0 { - // verify that they are in-bounds and get its pointer - Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) - } else { - None - }; + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - let offsets = get_buffer(buffers)?; - let (offset, length) = offsets; - - // verify that they are in-bounds and get its pointer - let offsets = &data_ref[block_offset + offset..block_offset + offset + length]; - - // validate alignment - let _: &[O] = bytemuck::cast_slice(offsets); - - let offsets = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); - - let values = get_buffer(buffers)?; - let (offset, length) = values; - - // verify that they are in-bounds and get its pointer - let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + let values = get_buffer::(data_ref, block_offset, buffers, 0)?.as_ptr(); // NOTE: offsets and values invariants are _not_ validated Ok(create_array( @@ -163,17 +188,9 @@ fn mmap_boolean>( let data_ref = data.as_ref(); - let validity = get_buffer(buffers)?; - let (offset, length) = validity; - - let validity = if null_count > 0 { - // verify that they are in-bounds and get its pointer - Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) - } else { - None - }; + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - let values = get_buffer(buffers)?; + let values = get_buffer_bounds(buffers)?; let (offset, length) = values; // verify that they are in-bounds and get its pointer @@ -200,6 +217,49 @@ fn boolean>( unsafe { BooleanArray::try_from_ffi(array) } } +fn mmap_primitive>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let data_ref = data.as_ref(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::

(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + )) +} + +fn primitive>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_primitive::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is safe because we just (correctly) constructed `ArrowArray` + unsafe { PrimitiveArray::try_from_ffi(array) } +} + unsafe fn utf8>( data: T, node: &Node, @@ -207,12 +267,48 @@ unsafe fn utf8>( buffers: &mut VecDeque, data_type: DataType, ) -> Result, Error> { - let array = mmap_utf8::(data, node, block_offset, buffers)?; + let array = mmap_binary::(data, node, block_offset, buffers)?; let array = InternalArrowArray::new(array, data_type); // this is unsafe because `mmap_utf8` does not validate invariants unsafe { Utf8Array::::try_from_ffi(array) } } +unsafe fn binary>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_binary::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is unsafe because `mmap_utf8` does not validate invariants + unsafe { BinaryArray::::try_from_ffi(array) } +} + +fn mmap_list>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + todo!() +} + +unsafe fn list>( + data: T, + node: &Node, + block_offset: usize, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_list::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is unsafe because `mmap_utf8` does not validate invariants + unsafe { BinaryArray::::try_from_ffi(array) } +} + /// Maps a memory region to an [`Array`]. pub(crate) unsafe fn mmap>( data: T, @@ -227,10 +323,21 @@ pub(crate) unsafe fn mmap>( .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; match data_type.to_physical_type() { Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + Primitive(p) => with_match_primitive_type!(p, |$T| { + primitive::<$T, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + }), Utf8 => utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), LargeUtf8 => { utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) } + Binary => { + binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + } + LargeBinary => { + binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + } + List => list::(data, &node, block_offset, field_nodes, buffers, data_type) + .map(|x| x.boxed()), _ => todo!(), } } From 1aba896dd95f2fc31afe0ae2ba397916e8c108fb Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 3 Aug 2022 05:12:23 +0000 Subject: [PATCH 4/8] Improved --- src/array/list/mod.rs | 2 +- src/ffi/mmap.rs | 168 +++++++++++++++------------------- src/mmap/mod.rs | 102 ++++++++++++++------- tests/it/io/ipc/mmap.rs | 58 ++++++++++++ tests/it/io/ipc/mod.rs | 2 + tests/it/io/ipc/read/mmap.rs | 40 -------- tests/it/io/ipc/read/mod.rs | 1 - tests/it/io/ipc/write/file.rs | 14 --- tests/it/io/ipc/write/mod.rs | 2 +- 9 files changed, 207 insertions(+), 182 deletions(-) create mode 100644 tests/it/io/ipc/mmap.rs delete mode 100644 tests/it/io/ipc/read/mmap.rs diff --git a/src/array/list/mod.rs b/src/array/list/mod.rs index 88ab6513234..13f781f680f 100644 --- a/src/array/list/mod.rs +++ b/src/array/list/mod.rs @@ -324,7 +324,7 @@ impl ListArray { /// Returns a the inner [`Field`] /// # Errors /// Panics iff the logical type is not consistent with this struct. - fn try_get_child(data_type: &DataType) -> Result<&Field, Error> { + pub fn try_get_child(data_type: &DataType) -> Result<&Field, Error> { if O::IS_LARGE { match data_type.to_logical_type() { DataType::LargeList(child) => Ok(child.as_ref()), diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index fb1da4fcd23..5f49411eedb 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use crate::array::{Array, BinaryArray, BooleanArray, FromFfi, Offset, PrimitiveArray, Utf8Array}; +use crate::array::{Array, ListArray, Offset}; use crate::datatypes::DataType; use crate::error::Error; @@ -8,14 +8,14 @@ use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::read::{IpcBuffer, Node}; use crate::types::NativeType; -use super::{ArrowArray, InternalArrowArray}; +use super::{try_from, ArrowArray, InternalArrowArray}; #[allow(dead_code)] struct PrivateData { // the owner of the pointers' regions data: T, buffers_ptr: Box<[*const std::os::raw::c_void]>, - //children_ptr: Box<[*mut ArrowArray]>, + children_ptr: Box<[*mut ArrowArray]>, dictionary_ptr: Option<*mut ArrowArray>, } @@ -83,42 +83,52 @@ fn get_validity<'a>( }) } -fn create_array, I: Iterator>>( +fn create_array< + T: Clone + AsRef<[u8]>, + I: Iterator>, + II: Iterator, +>( data: T, num_rows: usize, null_count: usize, buffers: I, + children: II, ) -> ArrowArray { - let n_buffers = buffers.size_hint().0 as i64; - let buffers_ptr = buffers .map(|maybe_buffer| match maybe_buffer { Some(b) => b as *const std::os::raw::c_void, None => std::ptr::null(), }) .collect::>(); + let n_buffers = buffers_ptr.len() as i64; + + let children_ptr = children + .map(|child| Box::into_raw(Box::new(child))) + .collect::>(); + let n_children = children_ptr.len() as i64; let mut private_data = Box::new(PrivateData:: { data, buffers_ptr, + children_ptr, dictionary_ptr: None, }); ArrowArray { length: num_rows as i64, null_count: null_count as i64, - offset: 0, + offset: 0, // IPC files are by definition not offset n_buffers, - n_children: 0, + n_children, buffers: private_data.buffers_ptr.as_mut_ptr(), - children: std::ptr::null_mut(), + children: private_data.children_ptr.as_mut_ptr(), dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), release: Some(release::), private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, } } -// callback used to drop [ArrowArray] when it is exported +/// callback used to drop [`ArrowArray`] when it is exported unsafe extern "C" fn release(array: *mut ArrowArray) { if array.is_null() { return; @@ -167,6 +177,7 @@ fn mmap_binary>( num_rows, null_count, [validity, Some(offsets), Some(values)].into_iter(), + [].into_iter(), )) } @@ -201,22 +212,10 @@ fn mmap_boolean>( num_rows, null_count, [validity, Some(values)].into_iter(), + [].into_iter(), )) } -fn boolean>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result { - let array = mmap_boolean(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is safe because we just (correctly) constructed `ArrowArray` - unsafe { BooleanArray::try_from_ffi(array) } -} - fn mmap_primitive>( data: T, node: &Node, @@ -244,100 +243,85 @@ fn mmap_primitive>( num_rows, null_count, [validity, Some(values)].into_iter(), + [].into_iter(), )) } -fn primitive>( +fn mmap_list>( data: T, node: &Node, block_offset: usize, + data_type: &DataType, + field_nodes: &mut VecDeque, buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_primitive::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is safe because we just (correctly) constructed `ArrowArray` - unsafe { PrimitiveArray::try_from_ffi(array) } -} +) -> Result { + let child = ListArray::::try_get_child(data_type)?.data_type(); -unsafe fn utf8>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_binary::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is unsafe because `mmap_utf8` does not validate invariants - unsafe { Utf8Array::::try_from_ffi(array) } -} + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; -unsafe fn binary>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_binary::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is unsafe because `mmap_utf8` does not validate invariants - unsafe { BinaryArray::::try_from_ffi(array) } -} + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; -fn mmap_list>( - data: T, - node: &Node, - block_offset: usize, - buffers: &mut VecDeque, -) -> Result { - todo!() -} + let data_ref = data.as_ref(); -unsafe fn list>( - data: T, - node: &Node, - block_offset: usize, - field_nodes: &mut VecDeque, - buffers: &mut VecDeque, - data_type: DataType, -) -> Result, Error> { - let array = mmap_list::(data, node, block_offset, buffers)?; - let array = InternalArrowArray::new(array, data_type); - // this is unsafe because `mmap_utf8` does not validate invariants - unsafe { BinaryArray::::try_from_ffi(array) } + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + + let values = get_array(data.clone(), block_offset, child, field_nodes, buffers)?; + + // NOTE: offsets and values invariants are _not_ validated + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(offsets)].into_iter(), + [values].into_iter(), + )) } -/// Maps a memory region to an [`Array`]. -pub(crate) unsafe fn mmap>( +fn get_array>( data: T, block_offset: usize, - data_type: DataType, + data_type: &DataType, field_nodes: &mut VecDeque, buffers: &mut VecDeque, -) -> Result, Error> { +) -> Result { use crate::datatypes::PhysicalType::*; let node = field_nodes .pop_front() .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + match data_type.to_physical_type() { - Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + Boolean => mmap_boolean(data, &node, block_offset, buffers), Primitive(p) => with_match_primitive_type!(p, |$T| { - primitive::<$T, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + mmap_primitive::<$T, _>(data, &node, block_offset, buffers) }), - Utf8 => utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), - LargeUtf8 => { - utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) - } - Binary => { - binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers), + LargeBinary | LargeUtf8 => mmap_binary::(data, &node, block_offset, buffers), + List => mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers), + LargeList => { + mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers) } - LargeBinary => { - binary::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) - } - List => list::(data, &node, block_offset, field_nodes, buffers, data_type) - .map(|x| x.boxed()), _ => todo!(), } } + +/// Maps a memory region to an [`Array`]. +pub(crate) unsafe fn mmap>( + data: T, + block_offset: usize, + data_type: DataType, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result, Error> { + let array = get_array(data, block_offset, &data_type, field_nodes, buffers)?; + // The unsafety comes from the fact that `array` is not necessarily valid - + // the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data) + unsafe { try_from(InternalArrowArray::new(array, data_type)) } +} diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs index 60b152e5c61..3ea255df0aa 100644 --- a/src/mmap/mod.rs +++ b/src/mmap/mod.rs @@ -2,37 +2,28 @@ use std::collections::VecDeque; use crate::array::Array; +use crate::chunk::Chunk; use crate::error::Error; use crate::ffi::mmap; -use crate::io::ipc::read::read_file_metadata; use crate::io::ipc::read::reader::get_serialized_batch; -use crate::io::ipc::read::OutOfSpecKind; +use crate::io::ipc::read::FileMetadata; +use crate::io::ipc::read::{IpcBuffer, Node, OutOfSpecKind}; use crate::io::ipc::CONTINUATION_MARKER; use arrow_format::ipc::planus::ReadAsRoot; +use arrow_format::ipc::MessageRef; -/// something -/// # Safety -/// This operation is innerently unsafe as it assumes that `T` contains valid Arrow data -/// In particular: -/// * Offsets in variable-sized containers are valid; -/// * Utf8 is valid -pub unsafe fn map_chunk_unchecked>( - data: T, - index: usize, -) -> Result, Error> { - let mut bytes = data.as_ref(); - let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?; - - let block = metadata.blocks[index]; - +fn read_message( + mut bytes: &[u8], + block: arrow_format::ipc::Block, +) -> Result<(MessageRef, usize), Error> { let offset: usize = block .offset .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let meta_data_length: usize = block + let block_length: usize = block .meta_data_length .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; @@ -54,29 +45,74 @@ pub unsafe fn map_chunk_unchecked>( let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length]) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let batch = get_serialized_batch(&message)?; + Ok((message, offset + block_length)) +} + +fn read_batch<'a>( + message: &'a MessageRef, +) -> Result<(VecDeque>, VecDeque>), Error> { + let batch = get_serialized_batch(message)?; + + let compression = batch.compression()?; + if compression.is_some() { + return Err(Error::nyi( + "mmap can only be done on uncompressed IPC files", + )); + } let buffers = batch .buffers() .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?; - let mut buffers = buffers.iter().collect::>(); + let buffers = buffers.iter().collect::>(); let field_nodes = batch .nodes() .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))? .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?; - let mut field_nodes = field_nodes.iter().collect::>(); - - println!("{:#?}", metadata.schema.fields); - let data_type = metadata.schema.fields[0].data_type.clone(); - println!("{:#?}", data_type); - - mmap::mmap( - data.clone(), - offset + meta_data_length, - data_type, - &mut field_nodes, - &mut buffers, - ) + let field_nodes = field_nodes.iter().collect::>(); + + Ok((buffers, field_nodes)) +} + +/// +/// # Errors +/// This function errors when: +/// * The IPC file is not valid +/// * the buffers on the file are un-aligned with their corresponding data. This can happen when: +/// * the file was written with 8-bit alignment +/// * the file contains type decimal 128 or 256 +/// # Safety +/// The caller must ensure that `data` contains a valid Arrow IPC file. +/// This operation is inerently unsafe as it assumes that `data` contains valid Arrow IPC file +/// For example: +/// * Offsets in variable-sized containers must be valid +/// * Utf8 is valid +pub unsafe fn mmap_unchecked>( + metadata: &FileMetadata, + data: T, + chunk: usize, +) -> Result>, Error> { + let block = metadata.blocks[chunk]; + let (message, offset) = read_message(data.as_ref(), block)?; + let (mut buffers, mut field_nodes) = read_batch(&message)?; + + let arrays = metadata + .schema + .fields + .iter() + .map(|f| &f.data_type) + .cloned() + .map(|data_type| { + mmap::mmap( + data.clone(), + offset, + data_type, + &mut field_nodes, + &mut buffers, + ) + }) + .collect::>()?; + + Chunk::try_new(arrays) } diff --git a/tests/it/io/ipc/mmap.rs b/tests/it/io/ipc/mmap.rs new file mode 100644 index 00000000000..ac4bd27b49a --- /dev/null +++ b/tests/it/io/ipc/mmap.rs @@ -0,0 +1,58 @@ +use arrow2::array::*; +use arrow2::chunk::Chunk; +use arrow2::datatypes::{Field, Schema}; +use arrow2::error::Result; +use arrow2::io::ipc::read::read_file_metadata; + +use super::write::file::write; + +fn round_trip(array: Box) -> Result<()> { + let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Chunk::try_new(vec![array.clone()])?; + + let data = write(&[columns], &schema, None, None)?; + + let metadata = read_file_metadata(&mut std::io::Cursor::new(&data))?; + + let new_array = unsafe { arrow2::mmap::mmap_unchecked(&metadata, data, 0)? }; + assert_eq!(new_array.into_arrays()[0], array); + Ok(()) +} + +#[test] +fn utf8() -> Result<()> { + let array = Utf8Array::::from([None, None, Some("bb")]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn primitive() -> Result<()> { + let array = PrimitiveArray::::from([None, None, Some(3)]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn boolean() -> Result<()> { + let array = BooleanArray::from([None, None, Some(true)]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn list() -> Result<()> { + let data = vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + None, + Some(vec![Some(4), None, Some(6)]), + ]; + + let mut array = MutableListArray::>::new(); + array.try_extend(data).unwrap(); + let array = array.into_box().slice(1, 2); + round_trip(array) +} diff --git a/tests/it/io/ipc/mod.rs b/tests/it/io/ipc/mod.rs index 63cce5b2e8f..6d3e71c5db4 100644 --- a/tests/it/io/ipc/mod.rs +++ b/tests/it/io/ipc/mod.rs @@ -15,3 +15,5 @@ mod read_stream_async; #[cfg(feature = "io_ipc_read_async")] mod read_file_async; + +mod mmap; diff --git a/tests/it/io/ipc/read/mmap.rs b/tests/it/io/ipc/read/mmap.rs deleted file mode 100644 index 232568e056c..00000000000 --- a/tests/it/io/ipc/read/mmap.rs +++ /dev/null @@ -1,40 +0,0 @@ -use arrow2::error::Result; -use arrow2::mmap::map_chunk_unchecked; - -use super::super::common::read_gzip_json; - -#[derive(Clone)] -struct Mmap(pub std::sync::Arc>); - -impl AsRef<[u8]> for Mmap { - #[inline] - fn as_ref(&self) -> &[u8] { - self.0.as_ref() - } -} - -fn test_file(version: &str, file_name: &str) -> Result<()> { - let testdata = crate::test_util::arrow_test_data(); - - let arrow_file = format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, file_name - ); - - let data = std::fs::read(arrow_file).unwrap(); - - let data = Mmap(std::sync::Arc::new(data)); - - // read expected JSON output - let (_schema, _, batches) = read_gzip_json(version, file_name)?; - - let array = unsafe { map_chunk_unchecked(data, 0)? }; - - assert_eq!(batches[0].arrays()[0], array); - Ok(()) -} - -#[test] -fn read_generated_100_primitive() -> Result<()> { - test_file("1.0.0-littleendian", "generated_primitive") -} diff --git a/tests/it/io/ipc/read/mod.rs b/tests/it/io/ipc/read/mod.rs index c3602cecc7a..6d1510b36c4 100644 --- a/tests/it/io/ipc/read/mod.rs +++ b/tests/it/io/ipc/read/mod.rs @@ -1,3 +1,2 @@ mod file; -mod mmap; mod stream; diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 98084e16f28..a899aa459d0 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -373,17 +373,3 @@ fn write_months_days_ns() -> Result<()> { let columns = Chunk::try_new(vec![array])?; round_trip(columns, schema, None, None) } - -#[test] -fn bla() -> Result<()> { - let array = Utf8Array::::from_slice(["aa", "bb"]) - .slice(1, 1) - .boxed(); - let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); - let columns = Chunk::try_new(vec![array.clone()])?; - - let data = write(&[columns], &schema, None, None)?; - let new_array = unsafe { arrow2::mmap::map_chunk_unchecked(data, 0)? }; - assert_eq!(new_array, array); - Ok(()) -} diff --git a/tests/it/io/ipc/write/mod.rs b/tests/it/io/ipc/write/mod.rs index f3a4fbfe57c..48ca5ea6cd4 100644 --- a/tests/it/io/ipc/write/mod.rs +++ b/tests/it/io/ipc/write/mod.rs @@ -1,3 +1,3 @@ -mod file; +pub mod file; mod file_append; mod stream; From ec6165f5461133f34b89929d94d9a73534a20c77 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 3 Aug 2022 05:21:16 +0000 Subject: [PATCH 5/8] Added fixed size binary --- src/ffi/mmap.rs | 33 +++++++++++++++++++++++++++++++++ tests/it/io/ipc/mmap.rs | 8 ++++++++ 2 files changed, 41 insertions(+) diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 5f49411eedb..3fa9cf91748 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -181,6 +181,37 @@ fn mmap_binary>( )) } +fn mmap_fixed_size_binary>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + )) +} + fn mmap_boolean>( data: T, node: &Node, @@ -303,11 +334,13 @@ fn get_array>( mmap_primitive::<$T, _>(data, &node, block_offset, buffers) }), Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers), + FixedSizeBinary => mmap_fixed_size_binary(data, &node, block_offset, buffers), LargeBinary | LargeUtf8 => mmap_binary::(data, &node, block_offset, buffers), List => mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers), LargeList => { mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers) } + _ => todo!(), } } diff --git a/tests/it/io/ipc/mmap.rs b/tests/it/io/ipc/mmap.rs index ac4bd27b49a..7393eb6ea6f 100644 --- a/tests/it/io/ipc/mmap.rs +++ b/tests/it/io/ipc/mmap.rs @@ -27,6 +27,14 @@ fn utf8() -> Result<()> { round_trip(array) } +#[test] +fn fixed_size_binary() -> Result<()> { + let array = FixedSizeBinaryArray::from([None, None, Some([1, 2])]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + #[test] fn primitive() -> Result<()> { let array = PrimitiveArray::::from([None, None, Some(3)]) From 7c2249a2054f85027222fbd027cabb245e391ad6 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 3 Aug 2022 05:35:30 +0000 Subject: [PATCH 6/8] Struct and fixed size --- src/ffi/mmap.rs | 108 +++++++++++++++++++++++++++++++++++++++- tests/it/io/ipc/mmap.rs | 38 +++++++++++++- 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 3fa9cf91748..8d006d0a442 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use crate::array::{Array, ListArray, Offset}; +use crate::array::{Array, FixedSizeListArray, ListArray, Offset, StructArray}; use crate::datatypes::DataType; use crate::error::Error; @@ -212,6 +212,31 @@ fn mmap_fixed_size_binary>( )) } +fn mmap_null>( + data: T, + node: &Node, + _block_offset: usize, + _buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok(create_array( + data, + num_rows, + null_count, + [].into_iter(), + [].into_iter(), + )) +} + fn mmap_boolean>( data: T, node: &Node, @@ -316,6 +341,82 @@ fn mmap_list>( )) } +fn mmap_fixed_size_list>( + data: T, + node: &Node, + block_offset: usize, + data_type: &DataType, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let child = FixedSizeListArray::try_child_and_size(data_type)? + .0 + .data_type(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_array(data.clone(), block_offset, child, field_nodes, buffers)?; + + Ok(create_array( + data, + num_rows, + null_count, + [validity].into_iter(), + [values].into_iter(), + )) +} + +fn mmap_struct>( + data: T, + node: &Node, + block_offset: usize, + data_type: &DataType, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let children = StructArray::try_get_fields(data_type)?; + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = children + .iter() + .map(|f| &f.data_type) + .map(|child| get_array(data.clone(), block_offset, child, field_nodes, buffers)) + .collect::, Error>>()?; + + Ok(create_array( + data, + num_rows, + null_count, + [validity].into_iter(), + values.into_iter(), + )) +} + fn get_array>( data: T, block_offset: usize, @@ -329,6 +430,7 @@ fn get_array>( .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; match data_type.to_physical_type() { + Null => mmap_null(data, &node, block_offset, buffers), Boolean => mmap_boolean(data, &node, block_offset, buffers), Primitive(p) => with_match_primitive_type!(p, |$T| { mmap_primitive::<$T, _>(data, &node, block_offset, buffers) @@ -340,6 +442,10 @@ fn get_array>( LargeList => { mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers) } + FixedSizeList => { + mmap_fixed_size_list(data, &node, block_offset, data_type, field_nodes, buffers) + } + Struct => mmap_struct(data, &node, block_offset, data_type, field_nodes, buffers), _ => todo!(), } diff --git a/tests/it/io/ipc/mmap.rs b/tests/it/io/ipc/mmap.rs index 7393eb6ea6f..df2d49c692d 100644 --- a/tests/it/io/ipc/mmap.rs +++ b/tests/it/io/ipc/mmap.rs @@ -1,6 +1,6 @@ use arrow2::array::*; use arrow2::chunk::Chunk; -use arrow2::datatypes::{Field, Schema}; +use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::read::read_file_metadata; @@ -51,6 +51,27 @@ fn boolean() -> Result<()> { round_trip(array) } +#[test] +fn null() -> Result<()> { + let array = NullArray::new(DataType::Null, 10).boxed(); + round_trip(array) +} + +#[test] +fn fixed_size_list() -> Result<()> { + let data = vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + None, + Some(vec![Some(4), None, Some(6)]), + ]; + + let mut array = MutableFixedSizeListArray::new(MutablePrimitiveArray::::new(), 3); + array.try_extend(data)?; + + let array: FixedSizeListArray = array.into(); + round_trip(array.slice(1, 2).boxed()) +} + #[test] fn list() -> Result<()> { let data = vec![ @@ -64,3 +85,18 @@ fn list() -> Result<()> { let array = array.into_box().slice(1, 2); round_trip(array) } + +#[test] +fn struct_() -> Result<()> { + let array = PrimitiveArray::::from([None, None, None, Some(3), Some(4)]).boxed(); + + let array = StructArray::new( + DataType::Struct(vec![Field::new("f1", array.data_type().clone(), true)]), + vec![array], + Some([true, true, false, true, false].into()), + ) + .slice(1, 4) + .boxed(); + + round_trip(array) +} From f0e3e02c5188eb7d10792b11cde0ab30f447d789 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 3 Aug 2022 16:35:22 +0000 Subject: [PATCH 7/8] Docs --- examples/ipc_file_mmap.rs | 31 +++++++++++++++++++++++++++++++ guide/src/SUMMARY.md | 1 + guide/src/io/ipc_mmap.md | 10 ++++++++++ src/ffi/mmap.rs | 4 ++-- src/mmap/mod.rs | 17 +++++++---------- 5 files changed, 51 insertions(+), 12 deletions(-) create mode 100644 examples/ipc_file_mmap.rs create mode 100644 guide/src/io/ipc_mmap.md diff --git a/examples/ipc_file_mmap.rs b/examples/ipc_file_mmap.rs new file mode 100644 index 00000000000..8c4dc7123ab --- /dev/null +++ b/examples/ipc_file_mmap.rs @@ -0,0 +1,31 @@ +//! Example showing how to memory map an Arrow IPC file into a [`Chunk`]. +use std::sync::Arc; + +use arrow2::error::Result; +use arrow2::io::ipc::read; +use arrow2::mmap::mmap_unchecked; + +// Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which +// usually `Arc` supports. This is how it could look like +#[derive(Clone)] +struct Mmap(Arc>); + +impl AsRef<[u8]> for Mmap { + #[inline] + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +fn main() -> Result<()> { + // given a mmap + let mmap = Mmap(Arc::new(vec![])); + + // we read the metadata + let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?; + + let chunk = unsafe { mmap_unchecked(&metadata, mmap, 0) }?; + + println!("{chunk:?}"); + Ok(()) +} diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index 85ffbce583e..f79cced198e 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -14,6 +14,7 @@ - [Read Parquet](./io/parquet_read.md) - [Write Parquet](./io/parquet_write.md) - [Read Arrow](./io/ipc_read.md) + - [Memory map Arrow](./io/ipc_mmap.md) - [Read Arrow stream](./io/ipc_stream_read.md) - [Write Arrow](./io/ipc_write.md) - [Read Avro](./io/avro_read.md) diff --git a/guide/src/io/ipc_mmap.md b/guide/src/io/ipc_mmap.md new file mode 100644 index 00000000000..be5bcbe7c65 --- /dev/null +++ b/guide/src/io/ipc_mmap.md @@ -0,0 +1,10 @@ +# Read Arrow + +When compiled with feature `io_ipc`, this crate can be used to memory map IPC Arrow files +into arrays. + +The example below shows how to memory map an IPC Arrow file into `Chunk`es: + +```rust +{{#include ../../../examples/ipc_file_mmap.rs}} +``` diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 8d006d0a442..9027bf9fc30 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -137,9 +137,9 @@ unsafe extern "C" fn release(array: *mut ArrowArray) { // take ownership of `private_data`, therefore dropping it let private = Box::from_raw(array.private_data as *mut PrivateData); - /*for child in private.children_ptr.iter() { + for child in private.children_ptr.iter() { let _ = Box::from_raw(*child); - }*/ + } if let Some(ptr) = private.dictionary_ptr { let _ = Box::from_raw(ptr); diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs index 3ea255df0aa..2a75d67f4ec 100644 --- a/src/mmap/mod.rs +++ b/src/mmap/mod.rs @@ -75,7 +75,7 @@ fn read_batch<'a>( Ok((buffers, field_nodes)) } -/// +/// Memory maps an record batch from an IPC file into a [`Chunk`]. /// # Errors /// This function errors when: /// * The IPC file is not valid @@ -83,11 +83,9 @@ fn read_batch<'a>( /// * the file was written with 8-bit alignment /// * the file contains type decimal 128 or 256 /// # Safety -/// The caller must ensure that `data` contains a valid Arrow IPC file. -/// This operation is inerently unsafe as it assumes that `data` contains valid Arrow IPC file -/// For example: -/// * Offsets in variable-sized containers must be valid -/// * Utf8 is valid +/// The caller must ensure that `data` contains a valid buffers, for example: +/// * Offsets in variable-sized containers must be in-bounds and increasing +/// * Utf8 data is valid pub unsafe fn mmap_unchecked>( metadata: &FileMetadata, data: T, @@ -97,7 +95,7 @@ pub unsafe fn mmap_unchecked>( let (message, offset) = read_message(data.as_ref(), block)?; let (mut buffers, mut field_nodes) = read_batch(&message)?; - let arrays = metadata + metadata .schema .fields .iter() @@ -112,7 +110,6 @@ pub unsafe fn mmap_unchecked>( &mut buffers, ) }) - .collect::>()?; - - Chunk::try_new(arrays) + .collect::>() + .and_then(Chunk::try_new) } From f263be27dfacc4aa293eb44f515692e3982258bd Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 3 Aug 2022 22:04:21 +0000 Subject: [PATCH 8/8] Dictionaries and MIRI --- .github/workflows/test.yml | 19 ++++ examples/ipc_file_mmap.rs | 12 ++- src/ffi/mmap.rs | 179 ++++++++++++++++++++++++++++++---- src/io/ipc/read/common.rs | 70 +++++++------ src/io/ipc/read/file_async.rs | 4 +- src/io/ipc/read/mod.rs | 1 + src/io/ipc/read/reader.rs | 54 +++++----- src/mmap/mod.rs | 160 +++++++++++++++++++++++++----- tests/it/io/ipc/mmap.rs | 18 +++- 9 files changed, 406 insertions(+), 111 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c9b25238f80..74ebe22eed5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -114,6 +114,25 @@ jobs: - name: Run run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_json_integration io::ipc::write::write_sliced_list + miri-checks-mmap: + name: MIRI on IO IPC mmaping + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly-2022-07-12 + override: true + - uses: Swatinem/rust-cache@v1 + with: + key: key1 + - name: Install Miri + run: | + rustup component add miri + cargo miri setup + - name: Run + run: cargo miri test --tests --features io_ipc io::ipc::mmap + feature-compilation: name: Feature coverage runs-on: ubuntu-latest diff --git a/examples/ipc_file_mmap.rs b/examples/ipc_file_mmap.rs index 8c4dc7123ab..038a8eb8226 100644 --- a/examples/ipc_file_mmap.rs +++ b/examples/ipc_file_mmap.rs @@ -3,10 +3,10 @@ use std::sync::Arc; use arrow2::error::Result; use arrow2::io::ipc::read; -use arrow2::mmap::mmap_unchecked; +use arrow2::mmap::{mmap_dictionaries_unchecked, mmap_unchecked}; // Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which -// usually `Arc` supports. This is how it could look like +// usually `Arc` supports. Here we mock it #[derive(Clone)] struct Mmap(Arc>); @@ -21,10 +21,14 @@ fn main() -> Result<()> { // given a mmap let mmap = Mmap(Arc::new(vec![])); - // we read the metadata + // read the metadata let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?; - let chunk = unsafe { mmap_unchecked(&metadata, mmap, 0) }?; + // mmap the dictionaries + let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? }; + + // and finally mmap a chunk (0 in this case). + let chunk = unsafe { mmap_unchecked(&metadata, &dictionaries, mmap, 0) }?; println!("{chunk:?}"); Ok(()) diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 9027bf9fc30..4a84d9804b0 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,14 +1,15 @@ use std::collections::VecDeque; -use crate::array::{Array, FixedSizeListArray, ListArray, Offset, StructArray}; +use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, Offset, StructArray}; use crate::datatypes::DataType; use crate::error::Error; -use crate::io::ipc::read::OutOfSpecKind; +use crate::io::ipc::read::{Dictionaries, OutOfSpecKind}; use crate::io::ipc::read::{IpcBuffer, Node}; +use crate::io::ipc::IpcField; use crate::types::NativeType; -use super::{try_from, ArrowArray, InternalArrowArray}; +use super::{export_array_to_c, try_from, ArrowArray, InternalArrowArray}; #[allow(dead_code)] struct PrivateData { @@ -93,6 +94,7 @@ fn create_array< null_count: usize, buffers: I, children: II, + dictionary: Option, ) -> ArrowArray { let buffers_ptr = buffers .map(|maybe_buffer| match maybe_buffer { @@ -107,11 +109,13 @@ fn create_array< .collect::>(); let n_children = children_ptr.len() as i64; + let dictionary_ptr = dictionary.map(|array| Box::into_raw(Box::new(array))); + let mut private_data = Box::new(PrivateData:: { data, buffers_ptr, children_ptr, - dictionary_ptr: None, + dictionary_ptr, }); ArrowArray { @@ -178,6 +182,7 @@ fn mmap_binary>( null_count, [validity, Some(offsets), Some(values)].into_iter(), [].into_iter(), + None, )) } @@ -209,6 +214,7 @@ fn mmap_fixed_size_binary>( null_count, [validity, Some(values)].into_iter(), [].into_iter(), + None, )) } @@ -234,6 +240,7 @@ fn mmap_null>( null_count, [].into_iter(), [].into_iter(), + None, )) } @@ -269,6 +276,7 @@ fn mmap_boolean>( null_count, [validity, Some(values)].into_iter(), [].into_iter(), + None, )) } @@ -300,14 +308,18 @@ fn mmap_primitive>( null_count, [validity, Some(values)].into_iter(), [].into_iter(), + None, )) } +#[allow(clippy::too_many_arguments)] fn mmap_list>( data: T, node: &Node, block_offset: usize, data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, field_nodes: &mut VecDeque, buffers: &mut VecDeque, ) -> Result { @@ -329,7 +341,15 @@ fn mmap_list>( let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); - let values = get_array(data.clone(), block_offset, child, field_nodes, buffers)?; + let values = get_array( + data.clone(), + block_offset, + child, + &ipc_field.fields[0], + dictionaries, + field_nodes, + buffers, + )?; // NOTE: offsets and values invariants are _not_ validated Ok(create_array( @@ -338,14 +358,18 @@ fn mmap_list>( null_count, [validity, Some(offsets)].into_iter(), [values].into_iter(), + None, )) } +#[allow(clippy::too_many_arguments)] fn mmap_fixed_size_list>( data: T, node: &Node, block_offset: usize, data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, field_nodes: &mut VecDeque, buffers: &mut VecDeque, ) -> Result { @@ -367,7 +391,15 @@ fn mmap_fixed_size_list>( let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); - let values = get_array(data.clone(), block_offset, child, field_nodes, buffers)?; + let values = get_array( + data.clone(), + block_offset, + child, + &ipc_field.fields[0], + dictionaries, + field_nodes, + buffers, + )?; Ok(create_array( data, @@ -375,14 +407,18 @@ fn mmap_fixed_size_list>( null_count, [validity].into_iter(), [values].into_iter(), + None, )) } +#[allow(clippy::too_many_arguments)] fn mmap_struct>( data: T, node: &Node, block_offset: usize, data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, field_nodes: &mut VecDeque, buffers: &mut VecDeque, ) -> Result { @@ -405,7 +441,18 @@ fn mmap_struct>( let values = children .iter() .map(|f| &f.data_type) - .map(|child| get_array(data.clone(), block_offset, child, field_nodes, buffers)) + .zip(ipc_field.fields.iter()) + .map(|(child, ipc)| { + get_array( + data.clone(), + block_offset, + child, + ipc, + dictionaries, + field_nodes, + buffers, + ) + }) .collect::, Error>>()?; Ok(create_array( @@ -414,6 +461,49 @@ fn mmap_struct>( null_count, [validity].into_iter(), values.into_iter(), + None, + )) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_dict>( + data: T, + node: &Node, + block_offset: usize, + _: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + _: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let dictionary = dictionaries + .get(&ipc_field.dictionary_id.unwrap()) + .ok_or_else(|| Error::oos("Missing dictionary"))? + .clone(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + Some(export_array_to_c(dictionary)), )) } @@ -421,6 +511,8 @@ fn get_array>( data: T, block_offset: usize, data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, field_nodes: &mut VecDeque, buffers: &mut VecDeque, ) -> Result { @@ -438,15 +530,58 @@ fn get_array>( Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers), FixedSizeBinary => mmap_fixed_size_binary(data, &node, block_offset, buffers), LargeBinary | LargeUtf8 => mmap_binary::(data, &node, block_offset, buffers), - List => mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers), - LargeList => { - mmap_list::(data, &node, block_offset, data_type, field_nodes, buffers) - } - FixedSizeList => { - mmap_fixed_size_list(data, &node, block_offset, data_type, field_nodes, buffers) - } - Struct => mmap_struct(data, &node, block_offset, data_type, field_nodes, buffers), - + List => mmap_list::( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + LargeList => mmap_list::( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + FixedSizeList => mmap_fixed_size_list( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + Struct => mmap_struct( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + Dictionary(key_type) => match_integer_type!(key_type, |$T| { + mmap_dict::<$T, _>( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ) + }), _ => todo!(), } } @@ -456,10 +591,20 @@ pub(crate) unsafe fn mmap>( data: T, block_offset: usize, data_type: DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, field_nodes: &mut VecDeque, buffers: &mut VecDeque, ) -> Result, Error> { - let array = get_array(data, block_offset, &data_type, field_nodes, buffers)?; + let array = get_array( + data, + block_offset, + &data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + )?; // The unsafety comes from the fact that `array` is not necessarily valid - // the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data) unsafe { try_from(InternalArrowArray::new(array, data_type)) } diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 306df0061cf..9a1ea3ce1c3 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -212,7 +212,7 @@ fn find_first_dict_field<'a>( find_first_dict_field_d(id, &field.data_type, ipc_field) } -fn first_dict_field<'a>( +pub(crate) fn first_dict_field<'a>( id: i64, fields: &'a [Field], ipc_fields: &'a [IpcField], @@ -253,45 +253,41 @@ pub fn read_dictionary( .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferId(err)))?; let (first_field, first_ipc_field) = first_dict_field(id, fields, &ipc_schema.fields)?; - // As the dictionary batch does not contain the type of the - // values array, we need to retrieve this from the schema. - // Get an array representing this dictionary's values. - let dictionary_values: Box = match first_field.data_type.to_logical_type() { - DataType::Dictionary(_, ref value_type, _) => { - let batch = batch - .data() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?; - - // Make a fake schema for the dictionary batch. - let fields = vec![Field::new("", value_type.as_ref().clone(), false)]; - let ipc_schema = IpcSchema { - fields: vec![first_ipc_field.clone()], - is_little_endian: ipc_schema.is_little_endian, - }; - let chunk = read_record_batch( - batch, - &fields, - &ipc_schema, - None, - None, // we must read the whole dictionary - dictionaries, - arrow_format::ipc::MetadataVersion::V5, - reader, - block_offset, - file_size, - scratch, - )?; - chunk.into_arrays().pop().unwrap() - } - _ => { + let batch = batch + .data() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?; + + let value_type = + if let DataType::Dictionary(_, value_type, _) = first_field.data_type.to_logical_type() { + value_type.as_ref() + } else { return Err(Error::from(OutOfSpecKind::InvalidIdDataType { requested_id: id, - })) - } + })); + }; + + // Make a fake schema for the dictionary batch. + let fields = vec![Field::new("", value_type.clone(), false)]; + let ipc_schema = IpcSchema { + fields: vec![first_ipc_field.clone()], + is_little_endian: ipc_schema.is_little_endian, }; - - dictionaries.insert(id, dictionary_values); + let chunk = read_record_batch( + batch, + &fields, + &ipc_schema, + None, + None, // we must read the whole dictionary + dictionaries, + arrow_format::ipc::MetadataVersion::V5, + reader, + block_offset, + file_size, + scratch, + )?; + + dictionaries.insert(id, chunk.into_arrays().pop().unwrap()); Ok(()) } diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 4b36a44026e..82999c28ea0 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -14,7 +14,7 @@ use crate::error::{Error, Result}; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; -use super::reader::{deserialize_footer, get_serialized_batch}; +use super::reader::{deserialize_footer, get_record_batch}; use super::Dictionaries; use super::FileMetadata; use super::OutOfSpecKind; @@ -205,7 +205,7 @@ where let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let batch = get_serialized_batch(&message)?; + let batch = get_record_batch(message)?; let block_length: usize = message .body_length() diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index f623e0aef56..2645d91440f 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -27,6 +27,7 @@ pub mod stream_async; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] pub mod file_async; +pub(crate) use common::first_dict_field; pub use common::{read_dictionary, read_record_batch}; pub use reader::{ read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index d46090bac43..9fb428eced5 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -63,6 +63,19 @@ fn read_dictionary_message( Ok(()) } +pub(crate) fn get_dictionary_batch<'a>( + message: &'a arrow_format::ipc::MessageRef, +) -> Result> { + let header = message + .header() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + match header { + arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch), + _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), + } +} + fn read_dictionary_block( reader: &mut R, metadata: &FileMetadata, @@ -84,27 +97,18 @@ fn read_dictionary_block( let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let header = message - .header() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + let batch = get_dictionary_batch(&message)?; - match header { - arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - let block_offset = offset + length; - read_dictionary( - batch, - &metadata.schema.fields, - &metadata.ipc_schema, - dictionaries, - reader, - block_offset, - metadata.size, - dictionary_scratch, - ) - } - _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), - } + read_dictionary( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + dictionaries, + reader, + offset + length, + metadata.size, + dictionary_scratch, + ) } /// Reads all file's dictionaries, if any @@ -116,7 +120,7 @@ pub fn read_file_dictionaries( ) -> Result { let mut dictionaries = Default::default(); - let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() { + let blocks = if let Some(blocks) = &metadata.dictionaries { blocks } else { return Ok(AHashMap::new()); @@ -230,9 +234,9 @@ pub fn read_file_metadata(reader: &mut R) -> Result( - message: &'a arrow_format::ipc::MessageRef, -) -> Result> { +pub(crate) fn get_record_batch( + message: arrow_format::ipc::MessageRef, +) -> Result { let header = message .header() .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? @@ -295,7 +299,7 @@ pub fn read_batch( let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let batch = get_serialized_batch(&message)?; + let batch = get_record_batch(message)?; read_record_batch( batch, diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs index 2a75d67f4ec..b7822b5f4cd 100644 --- a/src/mmap/mod.rs +++ b/src/mmap/mod.rs @@ -3,16 +3,17 @@ use std::collections::VecDeque; use crate::array::Array; use crate::chunk::Chunk; +use crate::datatypes::{DataType, Field}; use crate::error::Error; use crate::ffi::mmap; -use crate::io::ipc::read::reader::get_serialized_batch; -use crate::io::ipc::read::FileMetadata; +use crate::io::ipc::read::reader::{get_dictionary_batch, get_record_batch}; +use crate::io::ipc::read::{first_dict_field, Dictionaries, FileMetadata}; use crate::io::ipc::read::{IpcBuffer, Node, OutOfSpecKind}; -use crate::io::ipc::CONTINUATION_MARKER; +use crate::io::ipc::{IpcField, CONTINUATION_MARKER}; use arrow_format::ipc::planus::ReadAsRoot; -use arrow_format::ipc::MessageRef; +use arrow_format::ipc::{Block, MessageRef, RecordBatchRef}; fn read_message( mut bytes: &[u8], @@ -48,11 +49,9 @@ fn read_message( Ok((message, offset + block_length)) } -fn read_batch<'a>( - message: &'a MessageRef, -) -> Result<(VecDeque>, VecDeque>), Error> { - let batch = get_serialized_batch(message)?; - +fn get_buffers_nodes( + batch: RecordBatchRef, +) -> Result<(VecDeque, VecDeque), Error> { let compression = batch.compression()?; if compression.is_some() { return Err(Error::nyi( @@ -75,6 +74,55 @@ fn read_batch<'a>( Ok((buffers, field_nodes)) } +unsafe fn _mmap_record>( + fields: &[Field], + ipc_fields: &[IpcField], + data: T, + batch: RecordBatchRef, + offset: usize, + dictionaries: &Dictionaries, +) -> Result>, Error> { + let (mut buffers, mut field_nodes) = get_buffers_nodes(batch)?; + + fields + .iter() + .map(|f| &f.data_type) + .cloned() + .zip(ipc_fields) + .map(|(data_type, ipc_field)| { + mmap::mmap( + data.clone(), + offset, + data_type, + ipc_field, + dictionaries, + &mut field_nodes, + &mut buffers, + ) + }) + .collect::>() + .and_then(Chunk::try_new) +} + +unsafe fn _mmap_unchecked>( + fields: &[Field], + ipc_fields: &[IpcField], + data: T, + block: Block, + dictionaries: &Dictionaries, +) -> Result>, Error> { + let (message, offset) = read_message(data.as_ref(), block)?; + let batch = get_record_batch(message)?; + _mmap_record( + fields, + ipc_fields, + data.clone(), + batch, + offset, + dictionaries, + ) +} + /// Memory maps an record batch from an IPC file into a [`Chunk`]. /// # Errors /// This function errors when: @@ -88,28 +136,90 @@ fn read_batch<'a>( /// * Utf8 data is valid pub unsafe fn mmap_unchecked>( metadata: &FileMetadata, + dictionaries: &Dictionaries, data: T, chunk: usize, ) -> Result>, Error> { let block = metadata.blocks[chunk]; + let (message, offset) = read_message(data.as_ref(), block)?; - let (mut buffers, mut field_nodes) = read_batch(&message)?; + let batch = get_record_batch(message)?; + _mmap_record( + &metadata.schema.fields, + &metadata.ipc_schema.fields, + data.clone(), + batch, + offset, + dictionaries, + ) +} - metadata - .schema - .fields +unsafe fn mmap_dictionary>( + metadata: &FileMetadata, + data: T, + block: Block, + dictionaries: &mut Dictionaries, +) -> Result<(), Error> { + let (message, offset) = read_message(data.as_ref(), block)?; + let batch = get_dictionary_batch(&message)?; + + let id = batch + .id() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferId(err)))?; + let (first_field, first_ipc_field) = + first_dict_field(id, &metadata.schema.fields, &metadata.ipc_schema.fields)?; + + let batch = batch + .data() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?; + + let value_type = + if let DataType::Dictionary(_, value_type, _) = first_field.data_type.to_logical_type() { + value_type.as_ref() + } else { + return Err(Error::from(OutOfSpecKind::InvalidIdDataType { + requested_id: id, + })); + }; + + // Make a fake schema for the dictionary batch. + let field = Field::new("", value_type.clone(), false); + + let chunk = _mmap_record( + &[field], + &[first_ipc_field.clone()], + data.clone(), + batch, + offset, + dictionaries, + )?; + + dictionaries.insert(id, chunk.into_arrays().pop().unwrap()); + + Ok(()) +} + +/// Memory maps dictionaries from an IPC file into +/// # Safety +/// The caller must ensure that `data` contains a valid buffers, for example: +/// * Offsets in variable-sized containers must be in-bounds and increasing +/// * Utf8 data is valid +pub unsafe fn mmap_dictionaries_unchecked>( + metadata: &FileMetadata, + data: T, +) -> Result { + let blocks = if let Some(blocks) = &metadata.dictionaries { + blocks + } else { + return Ok(Default::default()); + }; + + let mut dictionaries = Default::default(); + + blocks .iter() - .map(|f| &f.data_type) .cloned() - .map(|data_type| { - mmap::mmap( - data.clone(), - offset, - data_type, - &mut field_nodes, - &mut buffers, - ) - }) - .collect::>() - .and_then(Chunk::try_new) + .try_for_each(|block| mmap_dictionary(metadata, data.clone(), block, &mut dictionaries))?; + Ok(dictionaries) } diff --git a/tests/it/io/ipc/mmap.rs b/tests/it/io/ipc/mmap.rs index df2d49c692d..9142227b1fc 100644 --- a/tests/it/io/ipc/mmap.rs +++ b/tests/it/io/ipc/mmap.rs @@ -14,7 +14,10 @@ fn round_trip(array: Box) -> Result<()> { let metadata = read_file_metadata(&mut std::io::Cursor::new(&data))?; - let new_array = unsafe { arrow2::mmap::mmap_unchecked(&metadata, data, 0)? }; + let dictionaries = + unsafe { arrow2::mmap::mmap_dictionaries_unchecked(&metadata, data.clone())? }; + + let new_array = unsafe { arrow2::mmap::mmap_unchecked(&metadata, &dictionaries, data, 0)? }; assert_eq!(new_array.into_arrays()[0], array); Ok(()) } @@ -100,3 +103,16 @@ fn struct_() -> Result<()> { round_trip(array) } + +#[test] +fn dict() -> Result<()> { + let keys = PrimitiveArray::::from([None, None, None, Some(3), Some(4)]); + + let values = PrimitiveArray::::from_slice([0, 1, 2, 3, 4, 5]).boxed(); + + let array = DictionaryArray::try_from_keys(keys, values)? + .slice(1, 4) + .boxed(); + + round_trip(array) +}