Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
ipc FileMetaData expose sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 4, 2022
1 parent 5711cfc commit fed029b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 53 deletions.
4 changes: 2 additions & 2 deletions examples/ipc_file_mmap.rs
Expand Up @@ -8,7 +8,7 @@ use arrow2::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
// Arrow2 requires a struct that implements `Clone + AsRef<[u8]>`, which
// usually `Arc<Mmap>` supports. Here we mock it
#[derive(Clone)]
struct Mmap(Arc<Vec<u8>>);
struct Mmap(Vec<u8>);

impl AsRef<[u8]> for Mmap {
#[inline]
Expand All @@ -19,7 +19,7 @@ impl AsRef<[u8]> for Mmap {

fn main() -> Result<()> {
// given a mmap
let mmap = Mmap(Arc::new(vec![]));
let mmap = Arc::new(Mmap(vec![]));

// read the metadata
let metadata = read::read_file_metadata(&mut std::io::Cursor::new(mmap.as_ref()))?;
Expand Down
69 changes: 35 additions & 34 deletions src/ffi/mmap.rs
@@ -1,4 +1,5 @@
use std::collections::VecDeque;
use std::sync::Arc;

use crate::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, Offset, StructArray};
use crate::datatypes::DataType;
Expand Down Expand Up @@ -85,11 +86,11 @@ fn get_validity<'a>(
}

fn create_array<
T: Clone + AsRef<[u8]>,
T: AsRef<[u8]>,
I: Iterator<Item = Option<*const u8>>,
II: Iterator<Item = ArrowArray>,
>(
data: T,
data: Arc<T>,
num_rows: usize,
null_count: usize,
buffers: I,
Expand All @@ -111,7 +112,7 @@ fn create_array<

let dictionary_ptr = dictionary.map(|array| Box::into_raw(Box::new(array)));

let mut private_data = Box::new(PrivateData::<T> {
let mut private_data = Box::new(PrivateData::<Arc<T>> {
data,
buffers_ptr,
children_ptr,
Expand All @@ -127,7 +128,7 @@ fn create_array<
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: private_data.children_ptr.as_mut_ptr(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(release::<T>),
release: Some(release::<Arc<T>>),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}
Expand All @@ -152,8 +153,8 @@ unsafe extern "C" fn release<T>(array: *mut ArrowArray) {
array.release = None;
}

fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_binary<O: Offset, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -168,7 +169,7 @@ fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -186,8 +187,8 @@ fn mmap_binary<O: Offset, T: Clone + AsRef<[u8]>>(
))
}

fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_fixed_size_binary<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -202,7 +203,7 @@ fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -218,8 +219,8 @@ fn mmap_fixed_size_binary<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_null<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_null<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
_block_offset: usize,
_buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -244,8 +245,8 @@ fn mmap_null<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_boolean<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_boolean<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -260,7 +261,7 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -280,13 +281,13 @@ fn mmap_boolean<T: Clone + AsRef<[u8]>>(
))
}

fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_primitive<P: NativeType, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<ArrowArray, Error> {
let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let num_rows: usize = node
.length()
Expand All @@ -313,8 +314,8 @@ fn mmap_primitive<P: NativeType, T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_list<O: Offset, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -335,7 +336,7 @@ fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand Down Expand Up @@ -363,8 +364,8 @@ fn mmap_list<O: Offset, T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_fixed_size_list<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -387,7 +388,7 @@ fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand All @@ -412,8 +413,8 @@ fn mmap_fixed_size_list<T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_struct<T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_struct<T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
data_type: &DataType,
Expand All @@ -434,7 +435,7 @@ fn mmap_struct<T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let validity = get_validity(data_ref, block_offset, buffers, null_count)?.map(|x| x.as_ptr());

Expand Down Expand Up @@ -466,8 +467,8 @@ fn mmap_struct<T: Clone + AsRef<[u8]>>(
}

#[allow(clippy::too_many_arguments)]
fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
data: T,
fn mmap_dict<K: DictionaryKey, T: AsRef<[u8]>>(
data: Arc<T>,
node: &Node,
block_offset: usize,
_: &DataType,
Expand All @@ -486,7 +487,7 @@ fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let data_ref = data.as_ref();
let data_ref = data.as_ref().as_ref();

let dictionary = dictionaries
.get(&ipc_field.dictionary_id.unwrap())
Expand All @@ -507,8 +508,8 @@ fn mmap_dict<K: DictionaryKey, T: Clone + AsRef<[u8]>>(
))
}

fn get_array<T: Clone + AsRef<[u8]>>(
data: T,
fn get_array<T: AsRef<[u8]>>(
data: Arc<T>,
block_offset: usize,
data_type: &DataType,
ipc_field: &IpcField,
Expand Down Expand Up @@ -587,8 +588,8 @@ fn get_array<T: Clone + AsRef<[u8]>>(
}

/// Maps a memory region to an [`Array`].
pub(crate) unsafe fn mmap<T: Clone + AsRef<[u8]>>(
data: T,
pub(crate) unsafe fn mmap<T: AsRef<[u8]>>(
data: Arc<T>,
block_offset: usize,
data_type: DataType,
ipc_field: &IpcField,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/reader.rs
Expand Up @@ -27,13 +27,13 @@ pub struct FileMetadata {
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
pub(crate) blocks: Vec<arrow_format::ipc::Block>,
pub blocks: Vec<arrow_format::ipc::Block>,

/// Dictionaries associated to each dict_id
pub(crate) dictionaries: Option<Vec<arrow_format::ipc::Block>>,

/// The total size of the file in bytes
pub(crate) size: u64,
pub size: u64,
}

fn read_dictionary_message<R: Read + Seek>(
Expand Down
27 changes: 14 additions & 13 deletions src/mmap/mod.rs
@@ -1,5 +1,6 @@
//! Memory maps regions defined on the IPC format into [`Array`].
use std::collections::VecDeque;
use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
Expand Down Expand Up @@ -74,10 +75,10 @@ fn get_buffers_nodes(
Ok((buffers, field_nodes))
}

unsafe fn _mmap_record<T: Clone + AsRef<[u8]>>(
unsafe fn _mmap_record<T: AsRef<[u8]>>(
fields: &[Field],
ipc_fields: &[IpcField],
data: T,
data: Arc<T>,
batch: RecordBatchRef,
offset: usize,
dictionaries: &Dictionaries,
Expand All @@ -104,14 +105,14 @@ unsafe fn _mmap_record<T: Clone + AsRef<[u8]>>(
.and_then(Chunk::try_new)
}

unsafe fn _mmap_unchecked<T: Clone + AsRef<[u8]>>(
unsafe fn _mmap_unchecked<T: AsRef<[u8]>>(
fields: &[Field],
ipc_fields: &[IpcField],
data: T,
data: Arc<T>,
block: Block,
dictionaries: &Dictionaries,
) -> Result<Chunk<Box<dyn Array>>, Error> {
let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_record_batch(message)?;
_mmap_record(
fields,
Expand All @@ -134,15 +135,15 @@ unsafe fn _mmap_unchecked<T: Clone + AsRef<[u8]>>(
/// The caller must ensure that `data` contains a valid buffers, for example:
/// * Offsets in variable-sized containers must be in-bounds and increasing
/// * Utf8 data is valid
pub unsafe fn mmap_unchecked<T: Clone + AsRef<[u8]>>(
pub unsafe fn mmap_unchecked<T: AsRef<[u8]>>(
metadata: &FileMetadata,
dictionaries: &Dictionaries,
data: T,
data: Arc<T>,
chunk: usize,
) -> Result<Chunk<Box<dyn Array>>, Error> {
let block = metadata.blocks[chunk];

let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_record_batch(message)?;
_mmap_record(
&metadata.schema.fields,
Expand All @@ -154,13 +155,13 @@ pub unsafe fn mmap_unchecked<T: Clone + AsRef<[u8]>>(
)
}

unsafe fn mmap_dictionary<T: Clone + AsRef<[u8]>>(
unsafe fn mmap_dictionary<T: AsRef<[u8]>>(
metadata: &FileMetadata,
data: T,
data: Arc<T>,
block: Block,
dictionaries: &mut Dictionaries,
) -> Result<(), Error> {
let (message, offset) = read_message(data.as_ref(), block)?;
let (message, offset) = read_message(data.as_ref().as_ref(), block)?;
let batch = get_dictionary_batch(&message)?;

let id = batch
Expand Down Expand Up @@ -205,9 +206,9 @@ unsafe fn mmap_dictionary<T: Clone + AsRef<[u8]>>(
/// The caller must ensure that `data` contains a valid buffers, for example:
/// * Offsets in variable-sized containers must be in-bounds and increasing
/// * Utf8 data is valid
pub unsafe fn mmap_dictionaries_unchecked<T: Clone + AsRef<[u8]>>(
pub unsafe fn mmap_dictionaries_unchecked<T: AsRef<[u8]>>(
metadata: &FileMetadata,
data: T,
data: Arc<T>,
) -> Result<Dictionaries, Error> {
let blocks = if let Some(blocks) = &metadata.dictionaries {
blocks
Expand Down
5 changes: 3 additions & 2 deletions tests/it/io/ipc/mmap.rs
Expand Up @@ -3,16 +3,17 @@ use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::read::read_file_metadata;
use std::sync::Arc;

use super::write::file::write;

fn round_trip(array: Box<dyn Array>) -> Result<()> {
let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array.clone()])?;

let data = write(&[columns], &schema, None, None)?;
let data = Arc::new(write(&[columns], &schema, None, None)?);

let metadata = read_file_metadata(&mut std::io::Cursor::new(&data))?;
let metadata = read_file_metadata(&mut std::io::Cursor::new(data.as_ref()))?;

let dictionaries =
unsafe { arrow2::mmap::mmap_dictionaries_unchecked(&metadata, data.clone())? };
Expand Down

0 comments on commit fed029b

Please sign in to comment.