diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c9b25238f80..74ebe22eed5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -114,6 +114,25 @@ jobs: - name: Run run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_json_integration io::ipc::write::write_sliced_list + miri-checks-mmap: + name: MIRI on IO IPC mmaping + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly-2022-07-12 + override: true + - uses: Swatinem/rust-cache@v1 + with: + key: key1 + - name: Install Miri + run: | + rustup component add miri + cargo miri setup + - name: Run + run: cargo miri test --tests --features io_ipc io::ipc::mmap + feature-compilation: name: Feature coverage runs-on: ubuntu-latest diff --git a/examples/ipc_file_mmap.rs b/examples/ipc_file_mmap.rs new file mode 100644 index 00000000000..038a8eb8226 --- /dev/null +++ b/examples/ipc_file_mmap.rs @@ -0,0 +1,35 @@ +//! Example showing how to memory map an Arrow IPC file into a [`Chunk`]. +use std::sync::Arc; + +use arrow2::error::Result; +use arrow2::io::ipc::read; +use arrow2::mmap::{mmap_dictionaries_unchecked, mmap_unchecked}; + +// Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which +// usually `Arc` supports. Here we mock it +#[derive(Clone)] +struct Mmap(Arc>); + +impl AsRef<[u8]> for Mmap { + #[inline] + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +fn main() -> Result<()> { + // given a mmap + let mmap = Mmap(Arc::new(vec![])); + + // read the metadata + let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?; + + // mmap the dictionaries + let dictionaries = unsafe { mmap_dictionaries_unchecked(&metadata, mmap.clone())? }; + + // and finally mmap a chunk (0 in this case). + let chunk = unsafe { mmap_unchecked(&metadata, &dictionaries, mmap, 0) }?; + + println!("{chunk:?}"); + Ok(()) +} diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index 85ffbce583e..f79cced198e 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -14,6 +14,7 @@ - [Read Parquet](./io/parquet_read.md) - [Write Parquet](./io/parquet_write.md) - [Read Arrow](./io/ipc_read.md) + - [Memory map Arrow](./io/ipc_mmap.md) - [Read Arrow stream](./io/ipc_stream_read.md) - [Write Arrow](./io/ipc_write.md) - [Read Avro](./io/avro_read.md) diff --git a/guide/src/io/ipc_mmap.md b/guide/src/io/ipc_mmap.md new file mode 100644 index 00000000000..be5bcbe7c65 --- /dev/null +++ b/guide/src/io/ipc_mmap.md @@ -0,0 +1,10 @@ +# Read Arrow + +When compiled with feature `io_ipc`, this crate can be used to memory map IPC Arrow files +into arrays. + +The example below shows how to memory map an IPC Arrow file into `Chunk`es: + +```rust +{{#include ../../../examples/ipc_file_mmap.rs}} +``` diff --git a/src/array/list/mod.rs b/src/array/list/mod.rs index 88ab6513234..13f781f680f 100644 --- a/src/array/list/mod.rs +++ b/src/array/list/mod.rs @@ -324,7 +324,7 @@ impl ListArray { /// Returns a the inner [`Field`] /// # Errors /// Panics iff the logical type is not consistent with this struct. - fn try_get_child(data_type: &DataType) -> Result<&Field, Error> { + pub fn try_get_child(data_type: &DataType) -> Result<&Field, Error> { if O::IS_LARGE { match data_type.to_logical_type() { DataType::LargeList(child) => Ok(child.as_ref()), diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 46858deff73..b50fdaa7259 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -493,7 +493,7 @@ impl InternalArrowArray { } } -impl ArrowArrayRef for Box { +impl ArrowArrayRef for InternalArrowArray { /// the data_type as declared in the schema fn data_type(&self) -> &DataType { &self.data_type diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs new file mode 100644 index 00000000000..4a84d9804b0 --- /dev/null +++ b/src/ffi/mmap.rs @@ -0,0 +1,611 @@ +use std::collections::VecDeque; + +use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, Offset, StructArray}; +use crate::datatypes::DataType; +use crate::error::Error; + +use crate::io::ipc::read::{Dictionaries, OutOfSpecKind}; +use crate::io::ipc::read::{IpcBuffer, Node}; +use crate::io::ipc::IpcField; +use crate::types::NativeType; + +use super::{export_array_to_c, try_from, ArrowArray, InternalArrowArray}; + +#[allow(dead_code)] +struct PrivateData { + // the owner of the pointers' regions + data: T, + buffers_ptr: Box<[*const std::os::raw::c_void]>, + children_ptr: Box<[*mut ArrowArray]>, + dictionary_ptr: Option<*mut ArrowArray>, +} + +fn get_buffer_bounds(buffers: &mut VecDeque) -> Result<(usize, usize), Error> { + let buffer = buffers + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + + let offset: usize = buffer + .offset() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let length: usize = buffer + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok((offset, length)) +} + +fn get_buffer<'a, T: NativeType>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + num_rows: usize, +) -> Result<&'a [u8], Error> { + let (offset, length) = get_buffer_bounds(buffers)?; + + // verify that they are in-bounds + let values = data + .get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?; + + // validate alignment + let v: &[T] = bytemuck::try_cast_slice(values) + .map_err(|_| Error::OutOfSpec("buffer not aligned for mmap".to_string()))?; + + if v.len() < num_rows { + return Err(Error::OutOfSpec( + "buffer's length is too small in mmap".to_string(), + )); + } + + Ok(values) +} + +fn get_validity<'a>( + data: &'a [u8], + block_offset: usize, + buffers: &mut VecDeque, + null_count: usize, +) -> Result, Error> { + let validity = get_buffer_bounds(buffers)?; + let (offset, length) = validity; + + Ok(if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some( + data.get(block_offset + offset..block_offset + offset + length) + .ok_or_else(|| Error::OutOfSpec("buffer out of bounds".to_string()))?, + ) + } else { + None + }) +} + +fn create_array< + T: Clone + AsRef<[u8]>, + I: Iterator>, + II: Iterator, +>( + data: T, + num_rows: usize, + null_count: usize, + buffers: I, + children: II, + dictionary: Option, +) -> ArrowArray { + let buffers_ptr = buffers + .map(|maybe_buffer| match maybe_buffer { + Some(b) => b as *const std::os::raw::c_void, + None => std::ptr::null(), + }) + .collect::>(); + let n_buffers = buffers_ptr.len() as i64; + + let children_ptr = children + .map(|child| Box::into_raw(Box::new(child))) + .collect::>(); + let n_children = children_ptr.len() as i64; + + let dictionary_ptr = dictionary.map(|array| Box::into_raw(Box::new(array))); + + let mut private_data = Box::new(PrivateData:: { + data, + buffers_ptr, + children_ptr, + dictionary_ptr, + }); + + ArrowArray { + length: num_rows as i64, + null_count: null_count as i64, + offset: 0, // IPC files are by definition not offset + n_buffers, + n_children, + buffers: private_data.buffers_ptr.as_mut_ptr(), + children: private_data.children_ptr.as_mut_ptr(), + dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), + release: Some(release::), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + } +} + +/// callback used to drop [`ArrowArray`] when it is exported +unsafe extern "C" fn release(array: *mut ArrowArray) { + if array.is_null() { + return; + } + let array = &mut *array; + + // take ownership of `private_data`, therefore dropping it + let private = Box::from_raw(array.private_data as *mut PrivateData); + for child in private.children_ptr.iter() { + let _ = Box::from_raw(*child); + } + + if let Some(ptr) = private.dictionary_ptr { + let _ = Box::from_raw(ptr); + } + + array.release = None; +} + +fn mmap_binary>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + let values = get_buffer::(data_ref, block_offset, buffers, 0)?.as_ptr(); + + // NOTE: offsets and values invariants are _not_ validated + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(offsets), Some(values)].into_iter(), + [].into_iter(), + None, + )) +} + +fn mmap_fixed_size_binary>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + None, + )) +} + +fn mmap_null>( + data: T, + node: &Node, + _block_offset: usize, + _buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + Ok(create_array( + data, + num_rows, + null_count, + [].into_iter(), + [].into_iter(), + None, + )) +} + +fn mmap_boolean>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer_bounds(buffers)?; + let (offset, length) = values; + + // verify that they are in-bounds and get its pointer + let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + None, + )) +} + +fn mmap_primitive>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let data_ref = data.as_ref(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::

(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + None, + )) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_list>( + data: T, + node: &Node, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let child = ListArray::::try_get_child(data_type)?.data_type(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let offsets = get_buffer::(data_ref, block_offset, buffers, num_rows + 1)?.as_ptr(); + + let values = get_array( + data.clone(), + block_offset, + child, + &ipc_field.fields[0], + dictionaries, + field_nodes, + buffers, + )?; + + // NOTE: offsets and values invariants are _not_ validated + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(offsets)].into_iter(), + [values].into_iter(), + None, + )) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_fixed_size_list>( + data: T, + node: &Node, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let child = FixedSizeListArray::try_child_and_size(data_type)? + .0 + .data_type(); + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_array( + data.clone(), + block_offset, + child, + &ipc_field.fields[0], + dictionaries, + field_nodes, + buffers, + )?; + + Ok(create_array( + data, + num_rows, + null_count, + [validity].into_iter(), + [values].into_iter(), + None, + )) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_struct>( + data: T, + node: &Node, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let children = StructArray::try_get_fields(data_type)?; + + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = children + .iter() + .map(|f| &f.data_type) + .zip(ipc_field.fields.iter()) + .map(|(child, ipc)| { + get_array( + data.clone(), + block_offset, + child, + ipc, + dictionaries, + field_nodes, + buffers, + ) + }) + .collect::, Error>>()?; + + Ok(create_array( + data, + num_rows, + null_count, + [validity].into_iter(), + values.into_iter(), + None, + )) +} + +#[allow(clippy::too_many_arguments)] +fn mmap_dict>( + data: T, + node: &Node, + block_offset: usize, + _: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + _: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let data_ref = data.as_ref(); + + let dictionary = dictionaries + .get(&ipc_field.dictionary_id.unwrap()) + .ok_or_else(|| Error::oos("Missing dictionary"))? + .clone(); + + let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr()); + + let values = get_buffer::(data_ref, block_offset, buffers, num_rows)?.as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + [].into_iter(), + Some(export_array_to_c(dictionary)), + )) +} + +fn get_array>( + data: T, + block_offset: usize, + data_type: &DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result { + use crate::datatypes::PhysicalType::*; + let node = field_nodes + .pop_front() + .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; + + match data_type.to_physical_type() { + Null => mmap_null(data, &node, block_offset, buffers), + Boolean => mmap_boolean(data, &node, block_offset, buffers), + Primitive(p) => with_match_primitive_type!(p, |$T| { + mmap_primitive::<$T, _>(data, &node, block_offset, buffers) + }), + Utf8 | Binary => mmap_binary::(data, &node, block_offset, buffers), + FixedSizeBinary => mmap_fixed_size_binary(data, &node, block_offset, buffers), + LargeBinary | LargeUtf8 => mmap_binary::(data, &node, block_offset, buffers), + List => mmap_list::( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + LargeList => mmap_list::( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + FixedSizeList => mmap_fixed_size_list( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + Struct => mmap_struct( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ), + Dictionary(key_type) => match_integer_type!(key_type, |$T| { + mmap_dict::<$T, _>( + data, + &node, + block_offset, + data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + ) + }), + _ => todo!(), + } +} + +/// Maps a memory region to an [`Array`]. +pub(crate) unsafe fn mmap>( + data: T, + block_offset: usize, + data_type: DataType, + ipc_field: &IpcField, + dictionaries: &Dictionaries, + field_nodes: &mut VecDeque, + buffers: &mut VecDeque, +) -> Result, Error> { + let array = get_array( + data, + block_offset, + &data_type, + ipc_field, + dictionaries, + field_nodes, + buffers, + )?; + // The unsafety comes from the fact that `array` is not necessarily valid - + // the IPC file may be corrupted (e.g. invalid offsets or non-utf8 data) + unsafe { try_from(InternalArrowArray::new(array, data_type)) } +} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 10fc2fb994a..b416d1c4648 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,6 +3,9 @@ mod array; mod bridge; mod generated; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +pub(crate) mod mmap; mod schema; mod stream; @@ -44,5 +47,5 @@ pub unsafe fn import_array_from_c( array: ArrowArray, data_type: DataType, ) -> Result> { - try_from(Box::new(InternalArrowArray::new(array, data_type))) + try_from(InternalArrowArray::new(array, data_type)) } diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index 3d8a8ccb994..1f8476cef70 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -81,7 +81,7 @@ pub mod read; pub mod write; const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; -const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; +pub(crate) const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; /// Struct containing `dictionary_id` and nested `IpcField`, allowing users /// to specify the dictionary ids of the IPC fields when writing to IPC. diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index 306df0061cf..9a1ea3ce1c3 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -212,7 +212,7 @@ fn find_first_dict_field<'a>( find_first_dict_field_d(id, &field.data_type, ipc_field) } -fn first_dict_field<'a>( +pub(crate) fn first_dict_field<'a>( id: i64, fields: &'a [Field], ipc_fields: &'a [IpcField], @@ -253,45 +253,41 @@ pub fn read_dictionary( .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferId(err)))?; let (first_field, first_ipc_field) = first_dict_field(id, fields, &ipc_schema.fields)?; - // As the dictionary batch does not contain the type of the - // values array, we need to retrieve this from the schema. - // Get an array representing this dictionary's values. - let dictionary_values: Box = match first_field.data_type.to_logical_type() { - DataType::Dictionary(_, ref value_type, _) => { - let batch = batch - .data() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?; - - // Make a fake schema for the dictionary batch. - let fields = vec![Field::new("", value_type.as_ref().clone(), false)]; - let ipc_schema = IpcSchema { - fields: vec![first_ipc_field.clone()], - is_little_endian: ipc_schema.is_little_endian, - }; - let chunk = read_record_batch( - batch, - &fields, - &ipc_schema, - None, - None, // we must read the whole dictionary - dictionaries, - arrow_format::ipc::MetadataVersion::V5, - reader, - block_offset, - file_size, - scratch, - )?; - chunk.into_arrays().pop().unwrap() - } - _ => { + let batch = batch + .data() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?; + + let value_type = + if let DataType::Dictionary(_, value_type, _) = first_field.data_type.to_logical_type() { + value_type.as_ref() + } else { return Err(Error::from(OutOfSpecKind::InvalidIdDataType { requested_id: id, - })) - } + })); + }; + + // Make a fake schema for the dictionary batch. + let fields = vec![Field::new("", value_type.clone(), false)]; + let ipc_schema = IpcSchema { + fields: vec![first_ipc_field.clone()], + is_little_endian: ipc_schema.is_little_endian, }; - - dictionaries.insert(id, dictionary_values); + let chunk = read_record_batch( + batch, + &fields, + &ipc_schema, + None, + None, // we must read the whole dictionary + dictionaries, + arrow_format::ipc::MetadataVersion::V5, + reader, + block_offset, + file_size, + scratch, + )?; + + dictionaries.insert(id, chunk.into_arrays().pop().unwrap()); Ok(()) } diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 4b36a44026e..82999c28ea0 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -14,7 +14,7 @@ use crate::error::{Error, Result}; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; -use super::reader::{deserialize_footer, get_serialized_batch}; +use super::reader::{deserialize_footer, get_record_batch}; use super::Dictionaries; use super::FileMetadata; use super::OutOfSpecKind; @@ -205,7 +205,7 @@ where let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let batch = get_serialized_batch(&message)?; + let batch = get_record_batch(message)?; let block_length: usize = message .body_length() diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index f623e0aef56..2645d91440f 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -27,6 +27,7 @@ pub mod stream_async; #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] pub mod file_async; +pub(crate) use common::first_dict_field; pub use common::{read_dictionary, read_record_batch}; pub use reader::{ read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index ab72adec96e..9fb428eced5 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -63,6 +63,19 @@ fn read_dictionary_message( Ok(()) } +pub(crate) fn get_dictionary_batch<'a>( + message: &'a arrow_format::ipc::MessageRef, +) -> Result> { + let header = message + .header() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + match header { + arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => Ok(batch), + _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), + } +} + fn read_dictionary_block( reader: &mut R, metadata: &FileMetadata, @@ -84,27 +97,18 @@ fn read_dictionary_block( let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let header = message - .header() - .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? - .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageHeader))?; + let batch = get_dictionary_batch(&message)?; - match header { - arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - let block_offset = offset + length; - read_dictionary( - batch, - &metadata.schema.fields, - &metadata.ipc_schema, - dictionaries, - reader, - block_offset, - metadata.size, - dictionary_scratch, - ) - } - _ => Err(Error::from(OutOfSpecKind::UnexpectedMessageType)), - } + read_dictionary( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + dictionaries, + reader, + offset + length, + metadata.size, + dictionary_scratch, + ) } /// Reads all file's dictionaries, if any @@ -116,7 +120,7 @@ pub fn read_file_dictionaries( ) -> Result { let mut dictionaries = Default::default(); - let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() { + let blocks = if let Some(blocks) = &metadata.dictionaries { blocks } else { return Ok(AHashMap::new()); @@ -230,9 +234,9 @@ pub fn read_file_metadata(reader: &mut R) -> Result( - message: &'a arrow_format::ipc::MessageRef, -) -> Result> { +pub(crate) fn get_record_batch( + message: arrow_format::ipc::MessageRef, +) -> Result { let header = message .header() .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferHeader(err)))? @@ -268,6 +272,11 @@ pub fn read_batch( .try_into() .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let length: u64 = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + // read length reader.seek(SeekFrom::Start(offset))?; let mut meta_buf = [0; 4]; @@ -290,17 +299,7 @@ pub fn read_batch( let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref()) .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - let batch = get_serialized_batch(&message)?; - - let offset: u64 = block - .offset - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - - let length: u64 = block - .meta_data_length - .try_into() - .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + let batch = get_record_batch(message)?; read_record_batch( batch, diff --git a/src/lib.rs b/src/lib.rs index 01fe444d5cf..16bd56ed04c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,13 +17,16 @@ pub mod bitmap; pub mod buffer; pub mod chunk; pub mod error; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +pub mod mmap; + pub mod scalar; pub mod trusted_len; pub mod types; pub mod compute; pub mod io; -//pub mod record_batch; pub mod temporal_conversions; pub mod datatypes; diff --git a/src/mmap/mod.rs b/src/mmap/mod.rs new file mode 100644 index 00000000000..b7822b5f4cd --- /dev/null +++ b/src/mmap/mod.rs @@ -0,0 +1,225 @@ +//! Memory maps regions defined on the IPC format into [`Array`]. +use std::collections::VecDeque; + +use crate::array::Array; +use crate::chunk::Chunk; +use crate::datatypes::{DataType, Field}; +use crate::error::Error; +use crate::ffi::mmap; + +use crate::io::ipc::read::reader::{get_dictionary_batch, get_record_batch}; +use crate::io::ipc::read::{first_dict_field, Dictionaries, FileMetadata}; +use crate::io::ipc::read::{IpcBuffer, Node, OutOfSpecKind}; +use crate::io::ipc::{IpcField, CONTINUATION_MARKER}; + +use arrow_format::ipc::planus::ReadAsRoot; +use arrow_format::ipc::{Block, MessageRef, RecordBatchRef}; + +fn read_message( + mut bytes: &[u8], + block: arrow_format::ipc::Block, +) -> Result<(MessageRef, usize), Error> { + let offset: usize = block + .offset + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let block_length: usize = block + .meta_data_length + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + bytes = &bytes[offset..]; + let mut message_length = bytes[..4].try_into().unwrap(); + bytes = &bytes[4..]; + + if message_length == CONTINUATION_MARKER { + // continuation marker encountered, read message next + message_length = bytes[..4].try_into().unwrap(); + bytes = &bytes[4..]; + }; + + let message_length: usize = i32::from_le_bytes(message_length) + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; + + let message = arrow_format::ipc::MessageRef::read_as_root(&bytes[..message_length]) + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferMessage(err)))?; + + Ok((message, offset + block_length)) +} + +fn get_buffers_nodes( + batch: RecordBatchRef, +) -> Result<(VecDeque, VecDeque), Error> { + let compression = batch.compression()?; + if compression.is_some() { + return Err(Error::nyi( + "mmap can only be done on uncompressed IPC files", + )); + } + + let buffers = batch + .buffers() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferBuffers(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageBuffers))?; + let buffers = buffers.iter().collect::>(); + + let field_nodes = batch + .nodes() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferNodes(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?; + let field_nodes = field_nodes.iter().collect::>(); + + Ok((buffers, field_nodes)) +} + +unsafe fn _mmap_record>( + fields: &[Field], + ipc_fields: &[IpcField], + data: T, + batch: RecordBatchRef, + offset: usize, + dictionaries: &Dictionaries, +) -> Result>, Error> { + let (mut buffers, mut field_nodes) = get_buffers_nodes(batch)?; + + fields + .iter() + .map(|f| &f.data_type) + .cloned() + .zip(ipc_fields) + .map(|(data_type, ipc_field)| { + mmap::mmap( + data.clone(), + offset, + data_type, + ipc_field, + dictionaries, + &mut field_nodes, + &mut buffers, + ) + }) + .collect::>() + .and_then(Chunk::try_new) +} + +unsafe fn _mmap_unchecked>( + fields: &[Field], + ipc_fields: &[IpcField], + data: T, + block: Block, + dictionaries: &Dictionaries, +) -> Result>, Error> { + let (message, offset) = read_message(data.as_ref(), block)?; + let batch = get_record_batch(message)?; + _mmap_record( + fields, + ipc_fields, + data.clone(), + batch, + offset, + dictionaries, + ) +} + +/// Memory maps an record batch from an IPC file into a [`Chunk`]. +/// # Errors +/// This function errors when: +/// * The IPC file is not valid +/// * the buffers on the file are un-aligned with their corresponding data. This can happen when: +/// * the file was written with 8-bit alignment +/// * the file contains type decimal 128 or 256 +/// # Safety +/// The caller must ensure that `data` contains a valid buffers, for example: +/// * Offsets in variable-sized containers must be in-bounds and increasing +/// * Utf8 data is valid +pub unsafe fn mmap_unchecked>( + metadata: &FileMetadata, + dictionaries: &Dictionaries, + data: T, + chunk: usize, +) -> Result>, Error> { + let block = metadata.blocks[chunk]; + + let (message, offset) = read_message(data.as_ref(), block)?; + let batch = get_record_batch(message)?; + _mmap_record( + &metadata.schema.fields, + &metadata.ipc_schema.fields, + data.clone(), + batch, + offset, + dictionaries, + ) +} + +unsafe fn mmap_dictionary>( + metadata: &FileMetadata, + data: T, + block: Block, + dictionaries: &mut Dictionaries, +) -> Result<(), Error> { + let (message, offset) = read_message(data.as_ref(), block)?; + let batch = get_dictionary_batch(&message)?; + + let id = batch + .id() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferId(err)))?; + let (first_field, first_ipc_field) = + first_dict_field(id, &metadata.schema.fields, &metadata.ipc_schema.fields)?; + + let batch = batch + .data() + .map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferData(err)))? + .ok_or_else(|| Error::from(OutOfSpecKind::MissingData))?; + + let value_type = + if let DataType::Dictionary(_, value_type, _) = first_field.data_type.to_logical_type() { + value_type.as_ref() + } else { + return Err(Error::from(OutOfSpecKind::InvalidIdDataType { + requested_id: id, + })); + }; + + // Make a fake schema for the dictionary batch. + let field = Field::new("", value_type.clone(), false); + + let chunk = _mmap_record( + &[field], + &[first_ipc_field.clone()], + data.clone(), + batch, + offset, + dictionaries, + )?; + + dictionaries.insert(id, chunk.into_arrays().pop().unwrap()); + + Ok(()) +} + +/// Memory maps dictionaries from an IPC file into +/// # Safety +/// The caller must ensure that `data` contains a valid buffers, for example: +/// * Offsets in variable-sized containers must be in-bounds and increasing +/// * Utf8 data is valid +pub unsafe fn mmap_dictionaries_unchecked>( + metadata: &FileMetadata, + data: T, +) -> Result { + let blocks = if let Some(blocks) = &metadata.dictionaries { + blocks + } else { + return Ok(Default::default()); + }; + + let mut dictionaries = Default::default(); + + blocks + .iter() + .cloned() + .try_for_each(|block| mmap_dictionary(metadata, data.clone(), block, &mut dictionaries))?; + Ok(dictionaries) +} diff --git a/tests/it/io/ipc/mmap.rs b/tests/it/io/ipc/mmap.rs new file mode 100644 index 00000000000..9142227b1fc --- /dev/null +++ b/tests/it/io/ipc/mmap.rs @@ -0,0 +1,118 @@ +use arrow2::array::*; +use arrow2::chunk::Chunk; +use arrow2::datatypes::{DataType, Field, Schema}; +use arrow2::error::Result; +use arrow2::io::ipc::read::read_file_metadata; + +use super::write::file::write; + +fn round_trip(array: Box) -> Result<()> { + let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Chunk::try_new(vec![array.clone()])?; + + let data = write(&[columns], &schema, None, None)?; + + let metadata = read_file_metadata(&mut std::io::Cursor::new(&data))?; + + let dictionaries = + unsafe { arrow2::mmap::mmap_dictionaries_unchecked(&metadata, data.clone())? }; + + let new_array = unsafe { arrow2::mmap::mmap_unchecked(&metadata, &dictionaries, data, 0)? }; + assert_eq!(new_array.into_arrays()[0], array); + Ok(()) +} + +#[test] +fn utf8() -> Result<()> { + let array = Utf8Array::::from([None, None, Some("bb")]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn fixed_size_binary() -> Result<()> { + let array = FixedSizeBinaryArray::from([None, None, Some([1, 2])]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn primitive() -> Result<()> { + let array = PrimitiveArray::::from([None, None, Some(3)]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn boolean() -> Result<()> { + let array = BooleanArray::from([None, None, Some(true)]) + .slice(1, 2) + .boxed(); + round_trip(array) +} + +#[test] +fn null() -> Result<()> { + let array = NullArray::new(DataType::Null, 10).boxed(); + round_trip(array) +} + +#[test] +fn fixed_size_list() -> Result<()> { + let data = vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + None, + Some(vec![Some(4), None, Some(6)]), + ]; + + let mut array = MutableFixedSizeListArray::new(MutablePrimitiveArray::::new(), 3); + array.try_extend(data)?; + + let array: FixedSizeListArray = array.into(); + round_trip(array.slice(1, 2).boxed()) +} + +#[test] +fn list() -> Result<()> { + let data = vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + None, + Some(vec![Some(4), None, Some(6)]), + ]; + + let mut array = MutableListArray::>::new(); + array.try_extend(data).unwrap(); + let array = array.into_box().slice(1, 2); + round_trip(array) +} + +#[test] +fn struct_() -> Result<()> { + let array = PrimitiveArray::::from([None, None, None, Some(3), Some(4)]).boxed(); + + let array = StructArray::new( + DataType::Struct(vec![Field::new("f1", array.data_type().clone(), true)]), + vec![array], + Some([true, true, false, true, false].into()), + ) + .slice(1, 4) + .boxed(); + + round_trip(array) +} + +#[test] +fn dict() -> Result<()> { + let keys = PrimitiveArray::::from([None, None, None, Some(3), Some(4)]); + + let values = PrimitiveArray::::from_slice([0, 1, 2, 3, 4, 5]).boxed(); + + let array = DictionaryArray::try_from_keys(keys, values)? + .slice(1, 4) + .boxed(); + + round_trip(array) +} diff --git a/tests/it/io/ipc/mod.rs b/tests/it/io/ipc/mod.rs index 63cce5b2e8f..6d3e71c5db4 100644 --- a/tests/it/io/ipc/mod.rs +++ b/tests/it/io/ipc/mod.rs @@ -15,3 +15,5 @@ mod read_stream_async; #[cfg(feature = "io_ipc_read_async")] mod read_file_async; + +mod mmap; diff --git a/tests/it/io/ipc/write/mod.rs b/tests/it/io/ipc/write/mod.rs index f3a4fbfe57c..48ca5ea6cd4 100644 --- a/tests/it/io/ipc/write/mod.rs +++ b/tests/it/io/ipc/write/mod.rs @@ -1,3 +1,3 @@ -mod file; +pub mod file; mod file_append; mod stream;