From 807c98fbf89f32b0e143ad3f44f904635179285a Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 1 Aug 2022 17:46:54 +0000 Subject: [PATCH] Added mmap for utf8 --- src/ffi/mmap.rs | 142 ++++++++++++++++++++++------ src/ffi/mod.rs | 8 +- src/io/ipc/mod.rs | 3 +- src/lib.rs | 5 +- src/{io/ipc/mmap.rs => mmap/mod.rs} | 22 +++-- tests/it/io/ipc/read/mmap.rs | 4 +- tests/it/io/ipc/write/file.rs | 14 +++ 7 files changed, 150 insertions(+), 48 deletions(-) rename src/{io/ipc/mmap.rs => mmap/mod.rs} (77%) diff --git a/src/ffi/mmap.rs b/src/ffi/mmap.rs index 2ad8d8b1607..d3f2944765c 100644 --- a/src/ffi/mmap.rs +++ b/src/ffi/mmap.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use crate::array::{Array, BooleanArray, FromFfi}; +use crate::array::{Array, BooleanArray, FromFfi, Offset, Utf8Array}; use crate::datatypes::DataType; use crate::error::Error; @@ -36,6 +36,41 @@ fn get_buffer(buffers: &mut VecDeque) -> Result<(usize, usize), Error Ok((offset, length)) } +fn create_array, I: Iterator>>( + data: T, + num_rows: usize, + null_count: usize, + buffers: I, +) -> ArrowArray { + let n_buffers = buffers.size_hint().0 as i64; + + let buffers_ptr = buffers + .map(|maybe_buffer| match maybe_buffer { + Some(b) => b as *const std::os::raw::c_void, + None => std::ptr::null(), + }) + .collect::>(); + + let mut private_data = Box::new(PrivateData:: { + data, + buffers_ptr, + dictionary_ptr: None, + }); + + ArrowArray { + length: num_rows as i64, + null_count: null_count as i64, + offset: 0, + n_buffers, + n_children: 0, + buffers: private_data.buffers_ptr.as_mut_ptr(), + children: std::ptr::null_mut(), + dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), + release: Some(release::), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + } +} + // callback used to drop [ArrowArray] when it is exported unsafe extern "C" fn release(array: *mut ArrowArray) { if array.is_null() { @@ -56,7 +91,7 @@ unsafe extern "C" fn release(array: *mut ArrowArray) { array.release = None; } -fn mmap_boolean>( +fn mmap_utf8>( data: T, node: &Node, block_offset: usize, @@ -84,42 +119,72 @@ fn mmap_boolean>( None }; + let offsets = get_buffer(buffers)?; + let (offset, length) = offsets; + + // verify that they are in-bounds and get its pointer + let offsets = &data_ref[block_offset + offset..block_offset + offset + length]; + + // validate alignment + let _: &[O] = bytemuck::cast_slice(offsets); + + let offsets = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + let values = get_buffer(buffers)?; let (offset, length) = values; // verify that they are in-bounds and get its pointer let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); - // NOTE: this is valid for Boolean, but for others (e.g. Utf8), we need to validate other invariants - // or mark this as unsafe + // NOTE: offsets and values invariants are _not_ validated + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(offsets), Some(values)].into_iter(), + )) +} - let buffers_ptr = [validity, Some(values)] - .iter() - .map(|maybe_buffer| match maybe_buffer { - Some(b) => *b as *const std::os::raw::c_void, - None => std::ptr::null(), - }) - .collect::>(); - let n_buffers = buffers.len() as i64; +fn mmap_boolean>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, +) -> Result { + let num_rows: usize = node + .length() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - let mut private_data = Box::new(PrivateData:: { - data: data.clone(), - buffers_ptr, - dictionary_ptr: None, - }); + let null_count: usize = node + .null_count() + .try_into() + .map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?; - Ok(ArrowArray { - length: num_rows as i64, - null_count: null_count as i64, - offset: 0, - n_buffers, - n_children: 0, - buffers: private_data.buffers_ptr.as_mut_ptr(), - children: std::ptr::null_mut(), - dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), - release: Some(release::), - private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, - }) + let data_ref = data.as_ref(); + + let validity = get_buffer(buffers)?; + let (offset, length) = validity; + + let validity = if null_count > 0 { + // verify that they are in-bounds and get its pointer + Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr()) + } else { + None + }; + + let values = get_buffer(buffers)?; + let (offset, length) = values; + + // verify that they are in-bounds and get its pointer + let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr(); + + Ok(create_array( + data, + num_rows, + null_count, + [validity, Some(values)].into_iter(), + )) } fn boolean>( @@ -135,8 +200,21 @@ fn boolean>( unsafe { BooleanArray::try_from_ffi(array) } } +unsafe fn utf8>( + data: T, + node: &Node, + block_offset: usize, + buffers: &mut VecDeque, + data_type: DataType, +) -> Result, Error> { + let array = mmap_utf8::(data, node, block_offset, buffers)?; + let array = InternalArrowArray::new(array, data_type); + // this is unsafe because `mmap_utf8` does not validate invariants + unsafe { Utf8Array::::try_from_ffi(array) } +} + /// Maps a memory region to an [`Array`]. -pub fn mmap>( +pub(crate) unsafe fn mmap>( data: T, block_offset: usize, data_type: DataType, @@ -149,6 +227,10 @@ pub fn mmap>( .ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?; match data_type.to_physical_type() { Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + Utf8 => utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()), + LargeUtf8 => { + utf8::(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()) + } _ => todo!(), } } diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index d3766254794..05699e7a1b4 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -3,16 +3,10 @@ mod array; mod bridge; mod generated; -#[cfg(feature = "io_ipc")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] -mod mmap; +pub(crate) mod mmap; mod schema; mod stream; -#[cfg(feature = "io_ipc")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] -pub use mmap::mmap; - pub(crate) use array::try_from; pub(crate) use array::{ArrowArrayRef, InternalArrowArray}; diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index b2273a3f607..1f8476cef70 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -77,12 +77,11 @@ mod compression; mod endianess; pub mod append; -pub mod mmap; pub mod read; pub mod write; const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; -const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; +pub(crate) const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; /// Struct containing `dictionary_id` and nested `IpcField`, allowing users /// to specify the dictionary ids of the IPC fields when writing to IPC. diff --git a/src/lib.rs b/src/lib.rs index 01fe444d5cf..16bd56ed04c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,13 +17,16 @@ pub mod bitmap; pub mod buffer; pub mod chunk; pub mod error; +#[cfg(feature = "io_ipc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))] +pub mod mmap; + pub mod scalar; pub mod trusted_len; pub mod types; pub mod compute; pub mod io; -//pub mod record_batch; pub mod temporal_conversions; pub mod datatypes; diff --git a/src/io/ipc/mmap.rs b/src/mmap/mod.rs similarity index 77% rename from src/io/ipc/mmap.rs rename to src/mmap/mod.rs index 2228472d159..60b152e5c61 100644 --- a/src/io/ipc/mmap.rs +++ b/src/mmap/mod.rs @@ -5,15 +5,23 @@ use crate::array::Array; use crate::error::Error; use crate::ffi::mmap; -use super::read::read_file_metadata; -use super::read::reader::get_serialized_batch; -use super::read::OutOfSpecKind; -use super::CONTINUATION_MARKER; +use crate::io::ipc::read::read_file_metadata; +use crate::io::ipc::read::reader::get_serialized_batch; +use crate::io::ipc::read::OutOfSpecKind; +use crate::io::ipc::CONTINUATION_MARKER; use arrow_format::ipc::planus::ReadAsRoot; /// something -pub fn map_chunk>(data: T, index: usize) -> Result, Error> { +/// # Safety +/// This operation is innerently unsafe as it assumes that `T` contains valid Arrow data +/// In particular: +/// * Offsets in variable-sized containers are valid; +/// * Utf8 is valid +pub unsafe fn map_chunk_unchecked>( + data: T, + index: usize, +) -> Result, Error> { let mut bytes = data.as_ref(); let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?; @@ -60,9 +68,11 @@ pub fn map_chunk>(data: T, index: usize) -> Result>(); + println!("{:#?}", metadata.schema.fields); let data_type = metadata.schema.fields[0].data_type.clone(); + println!("{:#?}", data_type); - mmap( + mmap::mmap( data.clone(), offset + meta_data_length, data_type, diff --git a/tests/it/io/ipc/read/mmap.rs b/tests/it/io/ipc/read/mmap.rs index 076b6be5484..232568e056c 100644 --- a/tests/it/io/ipc/read/mmap.rs +++ b/tests/it/io/ipc/read/mmap.rs @@ -1,5 +1,5 @@ use arrow2::error::Result; -use arrow2::io::ipc::mmap::map_chunk; +use arrow2::mmap::map_chunk_unchecked; use super::super::common::read_gzip_json; @@ -28,7 +28,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { // read expected JSON output let (_schema, _, batches) = read_gzip_json(version, file_name)?; - let array = map_chunk(data, 0)?; + let array = unsafe { map_chunk_unchecked(data, 0)? }; assert_eq!(batches[0].arrays()[0], array); Ok(()) diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index a899aa459d0..98084e16f28 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -373,3 +373,17 @@ fn write_months_days_ns() -> Result<()> { let columns = Chunk::try_new(vec![array])?; round_trip(columns, schema, None, None) } + +#[test] +fn bla() -> Result<()> { + let array = Utf8Array::::from_slice(["aa", "bb"]) + .slice(1, 1) + .boxed(); + let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); + let columns = Chunk::try_new(vec![array.clone()])?; + + let data = write(&[columns], &schema, None, None)?; + let new_array = unsafe { arrow2::mmap::map_chunk_unchecked(data, 0)? }; + assert_eq!(new_array, array); + Ok(()) +}