Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for random access reads from IPC #1034

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,51 @@ use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::datatypes::Schema;
use arrow2::error::Result;
use arrow2::io::ipc::read::{read_file_metadata, FileReader};
use arrow2::io::ipc::read;
use arrow2::io::print;

/// Simplest way: read all record batches from the file. This can be used e.g. for random access.
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Arc<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
let metadata = read_file_metadata(&mut file)?;
let metadata = read::read_file_metadata(&mut file)?;

let schema = metadata.schema.clone();

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(file, metadata, None);
let reader = read::FileReader::new(file, metadata, None);

let columns = reader.collect::<Result<Vec<_>>>()?;
Ok((schema, columns))
}

/// Random access way: read a single record batch from the file. This can be used e.g. for random access.
fn read_batch(path: &str) -> Result<(Schema, Chunk<Arc<dyn Array>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
let metadata = read::read_file_metadata(&mut file)?;

let schema = metadata.schema.clone();

// advanced way: read the dictionary
let dictionaries = read::read_file_dictionaries(&mut file, &metadata)?;

let chunk_index = 0;

let chunk = read::read_batch(
&mut file,
&dictionaries,
&metadata,
None,
chunk_index,
&mut vec![],
)?;

Ok((schema, chunk))
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
Expand All @@ -32,5 +59,9 @@ fn main() -> Result<()> {
let (schema, batches) = read_batches(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&batches, &names));

let (schema, batch) = read_batch(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&[batch], &names));
Ok(())
}
11 changes: 1 addition & 10 deletions src/io/ipc/append/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,7 @@ impl<R: Read + Seek + Write> FileWriter<R> {
));
}

let dictionaries = if let Some(blocks) = &metadata.dictionaries {
read::reader::read_dictionaries(
&mut writer,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};
let dictionaries = read::read_file_dictionaries(&mut writer, &metadata)?;

let last_block = metadata.blocks.last().ok_or_else(|| {
Error::oos("An Arrow IPC file must have at least 1 message (the schema message)")
Expand Down
4 changes: 3 additions & 1 deletion src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ pub mod stream_async;
pub mod file_async;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use reader::{
read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, FileReader,
};
pub use schema::deserialize_schema;
pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState};

Expand Down
141 changes: 68 additions & 73 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::io::ipc::IpcSchema;

Expand Down Expand Up @@ -65,46 +65,63 @@ fn read_dictionary_message<R: Read + Seek>(
Ok(())
}

pub(crate) fn read_dictionaries<R: Read + Seek>(
fn read_dictionary_block<R: Read + Seek>(
reader: &mut R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: &[arrow_format::ipc::Block],
metadata: &FileMetadata,
block: &arrow_format::ipc::Block,
dictionaries: &mut Dictionaries,
scratch: &mut Vec<u8>,
) -> Result<()> {
let offset = block.offset as u64;
let length = block.meta_data_length as u64;
read_dictionary_message(reader, offset, scratch)?;

let message = arrow_format::ipc::MessageRef::read_as_root(scratch)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let header = message
.header()?
.ok_or_else(|| Error::oos("Message must have an header"))?;

match header {
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let block_offset = offset + length;
read_dictionary(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(Error::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
Ok(())
}

/// Reads all file's dictionaries, if any
/// This function is IO-bounded
pub fn read_file_dictionaries<R: Read + Seek>(
reader: &mut R,
metadata: &FileMetadata,
) -> Result<Dictionaries> {
let mut dictionaries = Default::default();
let mut data = vec![];

let blocks = if let Some(blocks) = metadata.dictionaries.as_deref() {
blocks
} else {
return Ok(HashMap::new());
};

for block in blocks {
let offset = block.offset as u64;
let length = block.meta_data_length as u64;
read_dictionary_message(reader, offset, &mut data)?;

let message = arrow_format::ipc::MessageRef::read_as_root(&data)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let header = message
.header()?
.ok_or_else(|| Error::oos("Message must have an header"))?;

match header {
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let block_offset = offset + length;
read_dictionary(
batch,
fields,
ipc_schema,
&mut dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(Error::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
read_dictionary_block(reader, metadata, block, &mut dictionaries, &mut data)?;
}
Ok(dictionaries)
}
Expand Down Expand Up @@ -183,19 +200,6 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
reader.read_exact(&mut footer_data)?;

deserialize_footer(&footer_data)

/*
// read dictionaries
metadata.dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};*/
}

pub(super) fn get_serialized_batch<'a>(
Expand All @@ -216,16 +220,22 @@ pub(super) fn get_serialized_batch<'a>(
}
}

/// Read a batch from the reader.
/// Reads the record batch at position `index` from the reader.
///
/// This function is useful for random access to the file. For example, if
/// you have indexed the file somewhere else, this allows pruning
/// certain parts of the file.
/// # Panics
/// This function panics iff `index >= metadata.blocks.len()`
pub fn read_batch<R: Read + Seek>(
reader: &mut R,
dictionaries: &Dictionaries,
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
block_data: &mut Vec<u8>,
index: usize,
stratch: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>> {
let block = metadata.blocks[block];
let block = metadata.blocks[index];

// read length
reader.seek(SeekFrom::Start(block.offset as u64))?;
Expand All @@ -237,11 +247,11 @@ pub fn read_batch<R: Read + Seek>(
}
let meta_len = i32::from_le_bytes(meta_buf) as usize;

block_data.clear();
block_data.resize(meta_len, 0);
reader.read_exact(block_data)?;
stratch.clear();
stratch.resize(meta_len, 0);
reader.read_exact(stratch)?;

let message = arrow_format::ipc::MessageRef::read_as_root(&block_data[..])
let message = arrow_format::ipc::MessageRef::read_as_root(stratch)
.map_err(|err| Error::oos(format!("Unable parse message: {:?}", err)))?;

let batch = get_serialized_batch(&message)?;
Expand Down Expand Up @@ -300,23 +310,8 @@ impl<R: Read + Seek> FileReader<R> {
}

fn read_dictionaries(&mut self) -> Result<()> {
match (
&mut self.dictionaries,
self.metadata.dictionaries.as_deref(),
) {
(None, Some(blocks)) => {
let dictionaries = read_dictionaries(
&mut self.reader,
&self.metadata.schema.fields,
&self.metadata.ipc_schema,
blocks,
)?;
self.dictionaries = Some(dictionaries);
}
(None, None) => {
self.dictionaries = Some(Default::default());
}
_ => {}
if self.dictionaries.is_none() {
self.dictionaries = Some(read_file_dictionaries(&mut self.reader, &self.metadata)?);
};
Ok(())
}
Expand Down