Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added mmap for utf8
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 1, 2022
1 parent 35934f7 commit 807c98f
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 48 deletions.
142 changes: 112 additions & 30 deletions src/ffi/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;

use crate::array::{Array, BooleanArray, FromFfi};
use crate::array::{Array, BooleanArray, FromFfi, Offset, Utf8Array};
use crate::datatypes::DataType;
use crate::error::Error;

Expand Down Expand Up @@ -36,6 +36,41 @@ fn get_buffer(buffers: &mut VecDeque<IpcBuffer>) -> Result<(usize, usize), Error
Ok((offset, length))
}

fn create_array<T: Clone + AsRef<[u8]>, I: Iterator<Item = Option<*const u8>>>(
data: T,
num_rows: usize,
null_count: usize,
buffers: I,
) -> ArrowArray {
let n_buffers = buffers.size_hint().0 as i64;

let buffers_ptr = buffers
.map(|maybe_buffer| match maybe_buffer {
Some(b) => b as *const std::os::raw::c_void,
None => std::ptr::null(),
})
.collect::<Box<[_]>>();

let mut private_data = Box::new(PrivateData::<T> {
data,
buffers_ptr,
dictionary_ptr: None,
});

ArrowArray {
length: num_rows as i64,
null_count: null_count as i64,
offset: 0,
n_buffers,
n_children: 0,
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: std::ptr::null_mut(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(release::<T>),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}

// callback used to drop [ArrowArray] when it is exported
unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
if array.is_null() {
Expand All @@ -56,7 +91,7 @@ unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
array.release = None;
}

fn mmap_boolean<T: Clone + AsRef<[u8]>>(
fn mmap_utf8<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
Expand Down Expand Up @@ -84,42 +119,72 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
None
};

let offsets = get_buffer(buffers)?;
let (offset, length) = offsets;

// verify that they are in-bounds and get its pointer
let offsets = &data_ref[block_offset + offset..block_offset + offset + length];

// validate alignment
let _: &[O] = bytemuck::cast_slice(offsets);

let offsets = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();

let values = get_buffer(buffers)?;
let (offset, length) = values;

// verify that they are in-bounds and get its pointer
let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();

// NOTE: this is valid for Boolean, but for others (e.g. Utf8), we need to validate other invariants
// or mark this as unsafe
// NOTE: offsets and values invariants are _not_ validated
Ok(create_array(
data,
num_rows,
null_count,
[validity, Some(offsets), Some(values)].into_iter(),
))
}

let buffers_ptr = [validity, Some(values)]
.iter()
.map(|maybe_buffer| match maybe_buffer {
Some(b) => *b as *const std::os::raw::c_void,
None => std::ptr::null(),
})
.collect::<Box<[_]>>();
let n_buffers = buffers.len() as i64;
fn mmap_boolean<T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
let num_rows: usize = node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let mut private_data = Box::new(PrivateData::<T> {
data: data.clone(),
buffers_ptr,
dictionary_ptr: None,
});
let null_count: usize = node
.null_count()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

Ok(ArrowArray {
length: num_rows as i64,
null_count: null_count as i64,
offset: 0,
n_buffers,
n_children: 0,
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: std::ptr::null_mut(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(release::<T>),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
})
let data_ref = data.as_ref();

let validity = get_buffer(buffers)?;
let (offset, length) = validity;

let validity = if null_count > 0 {
// verify that they are in-bounds and get its pointer
Some(data_ref[block_offset + offset..block_offset + offset + length].as_ptr())
} else {
None
};

let values = get_buffer(buffers)?;
let (offset, length) = values;

// verify that they are in-bounds and get its pointer
let values = data_ref[block_offset + offset..block_offset + offset + length].as_ptr();

Ok(create_array(
data,
num_rows,
null_count,
[validity, Some(values)].into_iter(),
))
}

fn boolean<T: Clone + AsRef<[u8]>>(
Expand All @@ -135,8 +200,21 @@ fn boolean<T: Clone + AsRef<[u8]>>(
unsafe { BooleanArray::try_from_ffi(array) }
}

unsafe fn utf8<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
data_type: DataType,
) -> Result<Utf8Array<O>, Error> {
let array = mmap_utf8::<O, _>(data, node, block_offset, buffers)?;
let array = InternalArrowArray::new(array, data_type);
// this is unsafe because `mmap_utf8` does not validate invariants
unsafe { Utf8Array::<O>::try_from_ffi(array) }
}

/// Maps a memory region to an [`Array`].
pub fn mmap<T: Clone + AsRef<[u8]>>(
pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
data: T,
block_offset: usize,
data_type: DataType,
Expand All @@ -149,6 +227,10 @@ pub fn mmap<T: Clone + AsRef<[u8]>>(
.ok_or_else(|| Error::from(OutOfSpecKind::ExpectedBuffer))?;
match data_type.to_physical_type() {
Boolean => boolean(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()),
Utf8 => utf8::<i32, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed()),
LargeUtf8 => {
utf8::<i64, _>(data, &node, block_offset, buffers, data_type).map(|x| x.boxed())
}
_ => todo!(),
}
}
8 changes: 1 addition & 7 deletions src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@
mod array;
mod bridge;
mod generated;
#[cfg(feature = "io_ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))]
mod mmap;
pub(crate) mod mmap;
mod schema;
mod stream;

#[cfg(feature = "io_ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))]
pub use mmap::mmap;

pub(crate) use array::try_from;
pub(crate) use array::{ArrowArrayRef, InternalArrowArray};

Expand Down
3 changes: 1 addition & 2 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@ mod compression;
mod endianess;

pub mod append;
pub mod mmap;
pub mod read;
pub mod write;

const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
pub(crate) const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Struct containing `dictionary_id` and nested `IpcField`, allowing users
/// to specify the dictionary ids of the IPC fields when writing to IPC.
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ pub mod bitmap;
pub mod buffer;
pub mod chunk;
pub mod error;
#[cfg(feature = "io_ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc")))]
pub mod mmap;

pub mod scalar;
pub mod trusted_len;
pub mod types;

pub mod compute;
pub mod io;
//pub mod record_batch;
pub mod temporal_conversions;

pub mod datatypes;
Expand Down
22 changes: 16 additions & 6 deletions src/io/ipc/mmap.rs → src/mmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@ use crate::array::Array;
use crate::error::Error;
use crate::ffi::mmap;

use super::read::read_file_metadata;
use super::read::reader::get_serialized_batch;
use super::read::OutOfSpecKind;
use super::CONTINUATION_MARKER;
use crate::io::ipc::read::read_file_metadata;
use crate::io::ipc::read::reader::get_serialized_batch;
use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::CONTINUATION_MARKER;

use arrow_format::ipc::planus::ReadAsRoot;

/// something
pub fn map_chunk<T: Clone + AsRef<[u8]>>(data: T, index: usize) -> Result<Box<dyn Array>, Error> {
/// # Safety
/// This operation is innerently unsafe as it assumes that `T` contains valid Arrow data
/// In particular:
/// * Offsets in variable-sized containers are valid;
/// * Utf8 is valid
pub unsafe fn map_chunk_unchecked<T: Clone + AsRef<[u8]>>(
data: T,
index: usize,
) -> Result<Box<dyn Array>, Error> {
let mut bytes = data.as_ref();
let metadata = read_file_metadata(&mut std::io::Cursor::new(bytes))?;

Expand Down Expand Up @@ -60,9 +68,11 @@ pub fn map_chunk<T: Clone + AsRef<[u8]>>(data: T, index: usize) -> Result<Box<dy
.ok_or_else(|| Error::from(OutOfSpecKind::MissingMessageNodes))?;
let mut field_nodes = field_nodes.iter().collect::<VecDeque<_>>();

println!("{:#?}", metadata.schema.fields);
let data_type = metadata.schema.fields[0].data_type.clone();
println!("{:#?}", data_type);

mmap(
mmap::mmap(
data.clone(),
offset + meta_data_length,
data_type,
Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/ipc/read/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow2::error::Result;
use arrow2::io::ipc::mmap::map_chunk;
use arrow2::mmap::map_chunk_unchecked;

use super::super::common::read_gzip_json;

Expand Down Expand Up @@ -28,7 +28,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {
// read expected JSON output
let (_schema, _, batches) = read_gzip_json(version, file_name)?;

let array = map_chunk(data, 0)?;
let array = unsafe { map_chunk_unchecked(data, 0)? };

assert_eq!(batches[0].arrays()[0], array);
Ok(())
Expand Down
14 changes: 14 additions & 0 deletions tests/it/io/ipc/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,17 @@ fn write_months_days_ns() -> Result<()> {
let columns = Chunk::try_new(vec![array])?;
round_trip(columns, schema, None, None)
}

#[test]
fn bla() -> Result<()> {
let array = Utf8Array::<i32>::from_slice(["aa", "bb"])
.slice(1, 1)
.boxed();
let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array.clone()])?;

let data = write(&[columns], &schema, None, None)?;
let new_array = unsafe { arrow2::mmap::map_chunk_unchecked(data, 0)? };
assert_eq!(new_array, array);
Ok(())
}

0 comments on commit 807c98f

Please sign in to comment.