Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added ability for random access reads to IPC (#1034)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 31, 2022
1 parent e4a70f8 commit 47d5bd2
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 87 deletions.
37 changes: 34 additions & 3 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,51 @@ use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::Schema;
use arrow2::error::Result;
use arrow2::io::ipc::read::{read_file_metadata, FileReader};
use arrow2::io::ipc::read;
use arrow2::io::print;

/// Simplest way: read all record batches from the file. This can be used e.g. for random access.
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Arc<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
let metadata = read_file_metadata(&mut file)?;
let metadata = read::read_file_metadata(&mut file)?;

let schema = metadata.schema.clone();

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(file, metadata, None);
let reader = read::FileReader::new(file, metadata, None);

let columns = reader.collect::<Result<Vec<_>>>()?;
Ok((schema, columns))
}

/// Random access way: read a single record batch from the file. This can be used e.g. for random access.
fn read_batch(path: &str) -> Result<(Schema, Chunk<Arc<dyn Array>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
let metadata = read::read_file_metadata(&mut file)?;

let schema = metadata.schema.clone();

// advanced way: read the dictionary
let dictionaries = read::read_file_dictionaries(&mut file, &metadata)?;

let chunk_index = 0;

let chunk = read::read_batch(
&mut file,
&dictionaries,
&metadata,
None,
chunk_index,
&mut vec![],
)?;

Ok((schema, chunk))
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
Expand All @@ -32,5 +59,9 @@ fn main() -> Result<()> {
let (schema, batches) = read_batches(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&batches, &names));

let (schema, batch) = read_batch(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&[batch], &names));
Ok(())
}
11 changes: 1 addition & 10 deletions src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,7 @@ impl<R: Read + Seek + Write> FileWriter<R> {
));
}

let dictionaries = if let Some(blocks) = &metadata.dictionaries {
read::reader::read_dictionaries(
&mut writer,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};
let dictionaries = read::read_file_dictionaries(&mut writer, &metadata)?;

let last_block = metadata.blocks.last().ok_or_else(|| {
Error::oos("An Arrow IPC file must have at least 1 message (the schema message)")
Expand Down
4 changes: 3 additions & 1 deletion src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub mod stream_async;
pub mod file_async;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use reader::{
read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader,
};
pub use schema::deserialize_schema;
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};

Expand Down
141 changes: 68 additions & 73 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::io::ipc::IpcSchema;

Expand Down Expand Up @@ -65,46 +65,63 @@ fn read_dictionary_message<R: Read + Seek>(
Ok(())
}

pub(crate) fn read_dictionaries<R: Read + Seek>(
fn read_dictionary_block<R: Read + Seek>(
reader: &mut R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: &[arrow_format::ipc::Block],
metadata: &FileMetadata,
block: &arrow_format::ipc::Block,
dictionaries: &mut Dictionaries,
scratch: &mut Vec<u8>,
) -> Result<()> {
let offset = block.offset as u64;
let length = block.meta_data_length as u64;
read_dictionary_message(reader, offset, scratch)?;

let message = arrow_format::ipc::MessageRef::read_as_root(scratch)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let header = message
.header()?
.ok_or_else(|| Error::oos("Message must have an header"))?;

match header {
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let block_offset = offset + length;
read_dictionary(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(Error::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
Ok(())
}

/// Reads all file's dictionaries, if any
/// This function is IO-bounded
pub fn read_file_dictionaries<R: Read + Seek>(
reader: &mut R,
metadata: &FileMetadata,
) -> Result<Dictionaries> {
let mut dictionaries = Default::default();
let mut data = vec![];

let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() {
blocks
} else {
return Ok(HashMap::new());
};

for block in blocks {
let offset = block.offset as u64;
let length = block.meta_data_length as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = arrow_format::ipc::MessageRef::read_as_root(&data)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let header = message
.header()?
.ok_or_else(|| Error::oos("Message must have an header"))?;

match header {
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let block_offset = offset + length;
read_dictionary(
batch,
fields,
ipc_schema,
&mut dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(Error::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
read_dictionary_block(reader, metadata, block, &mut dictionaries, &mut data)?;
}
Ok(dictionaries)
}
Expand Down Expand Up @@ -183,19 +200,6 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
reader.read_exact(&mut footer_data)?;

deserialize_footer(&footer_data)

/*
// read dictionaries
metadata.dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};*/
}

pub(super) fn get_serialized_batch<'a>(
Expand All @@ -216,16 +220,22 @@ pub(super) fn get_serialized_batch<'a>(
}
}

/// Read a batch from the reader.
/// Reads the record batch at position `index` from the reader.
///
/// This function is useful for random access to the file. For example, if
/// you have indexed the file somewhere else, this allows pruning
/// certain parts of the file.
/// # Panics
/// This function panics iff `index >= metadata.blocks.len()`
pub fn read_batch<R: Read + Seek>(
reader: &mut R,
dictionaries: &Dictionaries,
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
block_data: &mut Vec<u8>,
index: usize,
stratch: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>> {
let block = metadata.blocks[block];
let block = metadata.blocks[index];

// read length
reader.seek(SeekFrom::Start(block.offset as u64))?;
Expand All @@ -237,11 +247,11 @@ pub fn read_batch<R: Read + Seek>(
}
let meta_len = i32::from_le_bytes(meta_buf) as usize;

block_data.clear();
block_data.resize(meta_len, 0);
reader.read_exact(block_data)?;
stratch.clear();
stratch.resize(meta_len, 0);
reader.read_exact(stratch)?;

let message = arrow_format::ipc::MessageRef::read_as_root(&block_data[..])
let message = arrow_format::ipc::MessageRef::read_as_root(stratch)
.map_err(|err| Error::oos(format!("Unable parse message: {:?}", err)))?;

let batch = get_serialized_batch(&message)?;
Expand Down Expand Up @@ -300,23 +310,8 @@ impl<R: Read + Seek> FileReader<R> {
}

fn read_dictionaries(&mut self) -> Result<()> {
match (
&mut self.dictionaries,
self.metadata.dictionaries.as_deref(),
) {
(None, Some(blocks)) => {
let dictionaries = read_dictionaries(
&mut self.reader,
&self.metadata.schema.fields,
&self.metadata.ipc_schema,
blocks,
)?;
self.dictionaries = Some(dictionaries);
}
(None, None) => {
self.dictionaries = Some(Default::default());
}
_ => {}
if self.dictionaries.is_none() {
self.dictionaries = Some(read_file_dictionaries(&mut self.reader, &self.metadata)?);
};
Ok(())
}
Expand Down

0 comments on commit 47d5bd2

Please sign in to comment.