From ec383adf2edb68dee30babb1880da04c527f1d99 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 31 May 2022 06:28:59 +0000 Subject: [PATCH] Added ability for random access reads to IPC --- examples/ipc_file_read.rs | 37 +++++++++- src/io/ipc/append/mod.rs | 11 +-- src/io/ipc/read/mod.rs | 4 +- src/io/ipc/read/reader.rs | 141 ++++++++++++++++++-------------------- 4 files changed, 106 insertions(+), 87 deletions(-) diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 7fafb5726c9..cdaba030d8c 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -5,24 +5,51 @@ use arrow2::array::Array; use arrow2::chunk::Chunk; use arrow2::datatypes::Schema; use arrow2::error::Result; -use arrow2::io::ipc::read::{read_file_metadata, FileReader}; +use arrow2::io::ipc::read; use arrow2::io::print; +/// Simplest way: read all record batches from the file. This can be used e.g. for random access. fn read_batches(path: &str) -> Result<(Schema, Vec>>)> { let mut file = File::open(path)?; // read the files' metadata. At this point, we can distribute the read whatever we like. - let metadata = read_file_metadata(&mut file)?; + let metadata = read::read_file_metadata(&mut file)?; let schema = metadata.schema.clone(); // Simplest way: use the reader, an iterator over batches. - let reader = FileReader::new(file, metadata, None); + let reader = read::FileReader::new(file, metadata, None); let columns = reader.collect::>>()?; Ok((schema, columns)) } +/// Random access way: read a single record batch from the file. This can be used e.g. for random access. +fn read_batch(path: &str) -> Result<(Schema, Chunk>)> { + let mut file = File::open(path)?; + + // read the files' metadata. At this point, we can distribute the read whatever we like. + let metadata = read::read_file_metadata(&mut file)?; + + let schema = metadata.schema.clone(); + + // advanced way: read the dictionary + let dictionaries = read::read_file_dictionaries(&mut file, &metadata)?; + + let chunk_index = 0; + + let chunk = read::read_batch( + &mut file, + &dictionaries, + &metadata, + None, + chunk_index, + &mut vec![], + )?; + + Ok((schema, chunk)) +} + fn main() -> Result<()> { use std::env; let args: Vec = env::args().collect(); @@ -32,5 +59,9 @@ fn main() -> Result<()> { let (schema, batches) = read_batches(file_path)?; let names = schema.fields.iter().map(|f| &f.name).collect::>(); println!("{}", print::write(&batches, &names)); + + let (schema, batch) = read_batch(file_path)?; + let names = schema.fields.iter().map(|f| &f.name).collect::>(); + println!("{}", print::write(&[batch], &names)); Ok(()) } diff --git a/src/io/ipc/append/mod.rs b/src/io/ipc/append/mod.rs index 693d5ec5c60..4a225c4db61 100644 --- a/src/io/ipc/append/mod.rs +++ b/src/io/ipc/append/mod.rs @@ -32,16 +32,7 @@ impl FileWriter { )); } - let dictionaries = if let Some(blocks) = &metadata.dictionaries { - read::reader::read_dictionaries( - &mut writer, - &metadata.schema.fields, - &metadata.ipc_schema, - blocks, - )? - } else { - Default::default() - }; + let dictionaries = read::read_file_dictionaries(&mut writer, &metadata)?; let last_block = metadata.blocks.last().ok_or_else(|| { Error::oos("An Arrow IPC file must have at least 1 message (the schema message)") diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index 83e41a79187..fa220226b0c 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -25,7 +25,9 @@ pub mod stream_async; pub mod file_async; pub use common::{read_dictionary, read_record_batch}; -pub use reader::{read_file_metadata, FileMetadata, FileReader}; +pub use reader::{ + read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader, +}; pub use schema::deserialize_schema; pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState}; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index e0be7880efd..173e060e15a 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use crate::array::Array; use crate::chunk::Chunk; -use crate::datatypes::{Field, Schema}; +use crate::datatypes::Schema; use crate::error::{Error, Result}; use crate::io::ipc::IpcSchema; @@ -65,46 +65,63 @@ fn read_dictionary_message( Ok(()) } -pub(crate) fn read_dictionaries( +fn read_dictionary_block( reader: &mut R, - fields: &[Field], - ipc_schema: &IpcSchema, - blocks: &[arrow_format::ipc::Block], + metadata: &FileMetadata, + block: &arrow_format::ipc::Block, + dictionaries: &mut Dictionaries, + scratch: &mut Vec, +) -> Result<()> { + let offset = block.offset as u64; + let length = block.meta_data_length as u64; + read_dictionary_message(reader, offset, scratch)?; + + let message = arrow_format::ipc::MessageRef::read_as_root(scratch) + .map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?; + + let header = message + .header()? + .ok_or_else(|| Error::oos("Message must have an header"))?; + + match header { + arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { + let block_offset = offset + length; + read_dictionary( + batch, + &metadata.schema.fields, + &metadata.ipc_schema, + dictionaries, + reader, + block_offset, + )?; + } + t => { + return Err(Error::OutOfSpec(format!( + "Expecting DictionaryBatch in dictionary blocks, found {:?}.", + t + ))); + } + }; + Ok(()) +} + +/// Reads all file's dictionaries, if any +/// This function is IO-bounded +pub fn read_file_dictionaries( + reader: &mut R, + metadata: &FileMetadata, ) -> Result { let mut dictionaries = Default::default(); let mut data = vec![]; + let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() { + blocks + } else { + return Ok(HashMap::new()); + }; + for block in blocks { - let offset = block.offset as u64; - let length = block.meta_data_length as u64; - read_dictionary_message(reader, offset, &mut data)?; - - let message = arrow_format::ipc::MessageRef::read_as_root(&data) - .map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?; - - let header = message - .header()? - .ok_or_else(|| Error::oos("Message must have an header"))?; - - match header { - arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - let block_offset = offset + length; - read_dictionary( - batch, - fields, - ipc_schema, - &mut dictionaries, - reader, - block_offset, - )?; - } - t => { - return Err(Error::OutOfSpec(format!( - "Expecting DictionaryBatch in dictionary blocks, found {:?}.", - t - ))); - } - }; + read_dictionary_block(reader, metadata, block, &mut dictionaries, &mut data)?; } Ok(dictionaries) } @@ -183,19 +200,6 @@ pub fn read_file_metadata(reader: &mut R) -> Result( @@ -216,16 +220,22 @@ pub(super) fn get_serialized_batch<'a>( } } -/// Read a batch from the reader. +/// Reads the record batch at position `index` from the reader. +/// +/// This function is useful for random access to the file. For example, if +/// you have indexed the file somewhere else, this allows pruning +/// certain parts of the file. +/// # Panics +/// This function panics iff `index >= metadata.blocks.len()` pub fn read_batch( reader: &mut R, dictionaries: &Dictionaries, metadata: &FileMetadata, projection: Option<&[usize]>, - block: usize, - block_data: &mut Vec, + index: usize, + stratch: &mut Vec, ) -> Result>> { - let block = metadata.blocks[block]; + let block = metadata.blocks[index]; // read length reader.seek(SeekFrom::Start(block.offset as u64))?; @@ -237,11 +247,11 @@ pub fn read_batch( } let meta_len = i32::from_le_bytes(meta_buf) as usize; - block_data.clear(); - block_data.resize(meta_len, 0); - reader.read_exact(block_data)?; + stratch.clear(); + stratch.resize(meta_len, 0); + reader.read_exact(stratch)?; - let message = arrow_format::ipc::MessageRef::read_as_root(&block_data[..]) + let message = arrow_format::ipc::MessageRef::read_as_root(stratch) .map_err(|err| Error::oos(format!("Unable parse message: {:?}", err)))?; let batch = get_serialized_batch(&message)?; @@ -300,23 +310,8 @@ impl FileReader { } fn read_dictionaries(&mut self) -> Result<()> { - match ( - &mut self.dictionaries, - self.metadata.dictionaries.as_deref(), - ) { - (None, Some(blocks)) => { - let dictionaries = read_dictionaries( - &mut self.reader, - &self.metadata.schema.fields, - &self.metadata.ipc_schema, - blocks, - )?; - self.dictionaries = Some(dictionaries); - } - (None, None) => { - self.dictionaries = Some(Default::default()); - } - _ => {} + if self.dictionaries.is_none() { + self.dictionaries = Some(read_file_dictionaries(&mut self.reader, &self.metadata)?); }; Ok(()) }