Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified code
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 29, 2022
1 parent 9ecb8ef commit ba04c64
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 102 deletions.
80 changes: 34 additions & 46 deletions src/io/ipc/read/file_async.rs
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use arrow_format::ipc::{
planus::{ReadAsRoot, Vector},
BlockRef, FooterRef, MessageHeaderRef, MessageRef,
BlockRef, MessageHeaderRef, MessageRef,
};
use futures::{
stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt,
Expand All @@ -18,8 +18,7 @@ use crate::error::{ArrowError, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::reader::get_serialized_batch;
use super::schema::fb_to_schema;
use super::reader::{deserialize_footer, get_serialized_batch};
use super::Dictionaries;
use super::FileMetadata;

Expand Down Expand Up @@ -114,62 +113,51 @@ impl<'a> Stream for FileStream<'a> {
}
}

/// Reads the footer's length and magic number in footer
async fn read_footer_len<R: AsyncRead + AsyncSeek + Unpin>(reader: &mut R) -> Result<usize> {
// read footer length and magic number in footer
reader.seek(SeekFrom::End(-10)).await?;
let mut footer: [u8; 10] = [0; 10];

reader.read_exact(&mut footer).await?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct footer".to_string(),
));
}
footer_len
.try_into()
.map_err(|_| ArrowError::oos("The footer's lenght must be a positive number"))
}

/// Read the metadata from an IPC file.
pub async fn read_file_metadata_async<R>(mut reader: R) -> Result<FileMetadata>
pub async fn read_file_metadata_async<R>(reader: &mut R) -> Result<FileMetadata>
where
R: AsyncRead + AsyncSeek + Unpin,
{
// Check header
let mut magic = [0; 6];
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow header".to_string(),
));
}
// Check footer
reader.seek(SeekFrom::End(-6)).await?;
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow footer".to_string(),
));
}
// Get footer size
let mut footer_size = [0; 4];
reader.seek(SeekFrom::End(-10)).await?;
reader.read_exact(&mut footer_size).await?;
let footer_size = i32::from_le_bytes(footer_size);
let footer_size = read_footer_len(reader).await?;
// Read footer
let mut footer = vec![0; footer_size as usize];
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;
let footer = FooterRef::read_as_root(&footer[..])
.map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("unable to get record batches from footer".to_string())
})?;
let schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(schema)?;
let dictionary_blocks = footer.dictionaries()?;
let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema.fields[..], &ipc_schema, blocks).await?
let (mut metadata, dictionary_blocks) = deserialize_footer(&footer)?;

metadata.dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)
.await?
} else {
Default::default()
};

Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
Ok(metadata)
}

async fn read_dictionaries<R>(
Expand Down
139 changes: 83 additions & 56 deletions src/io/ipc/read/reader.rs
Expand Up @@ -108,63 +108,89 @@ fn read_dictionaries<R: Read + Seek>(
Ok(dictionaries)
}

/// Read the IPC file's metadata
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
/// Reads the footer's length and magic number in footer
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> Result<usize> {
// read footer length and magic number in footer
reader.seek(SeekFrom::End(-10))?;
let mut footer: [u8; 10] = [0; 10];

reader.read_exact(&mut footer)?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct footer".to_string(),
));
}
// read footer length
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = i32::from_le_bytes(footer_size);

// read footer
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
footer_len
.try_into()
.map_err(|_| ArrowError::oos("The footer's lenght must be a positive number"))
}

let footer = arrow_format::ipc::FooterRef::read_as_root(&footer_data)
pub(super) fn deserialize_footer(
footer_data: &[u8],
) -> Result<(FileMetadata, Option<Vector<arrow_format::ipc::BlockRef>>)> {
let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data)
.map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("Unable to get record batches from footer".to_string())
})?;

let blocks = blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?;

let ipc_schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("Unable to get the schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(ipc_schema)?;

let dictionary_blocks = footer.dictionaries()?;
Ok((
FileMetadata {
schema,
ipc_schema,
blocks,
dictionaries: Default::default(),
},
footer.dictionaries()?,
))
}

/// Read the IPC file's metadata
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
// check if header contain the correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct header".to_string(),
));
}

let footer_len = read_footer_len(reader)?;

// read footer
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;

let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema.fields, &ipc_schema, blocks)?
let (mut metadata, dictionary_blocks) = deserialize_footer(&footer_data)?;

// read dictionaries
metadata.dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(
reader,
&metadata.schema.fields,
&metadata.ipc_schema,
blocks,
)?
} else {
Default::default()
};

Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
Ok(metadata)
}

pub(super) fn get_serialized_batch<'a>(
Expand Down Expand Up @@ -272,26 +298,27 @@ impl<R: Read + Seek> Iterator for FileReader<R> {

fn next(&mut self) -> Option<Self::Item> {
// get current block
if self.current_block < self.metadata.blocks.len() {
let block = self.current_block;
self.current_block += 1;
let chunk = read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut self.buffer,
);

let chunk = if let Some((projection, map, _)) = &self.projection {
// re-order according to projection
chunk.map(|chunk| apply_projection(chunk, projection, map))
} else {
chunk
};
Some(chunk)
} else {
None
if self.current_block == self.metadata.blocks.len() {
return None;
}

let block = self.current_block;
self.current_block += 1;

let chunk = read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut self.buffer,
);

let chunk = if let Some((projection, map, _)) = &self.projection {
// re-order according to projection
chunk.map(|chunk| apply_projection(chunk, projection, map))
} else {
chunk
};
Some(chunk)
}
}

0 comments on commit ba04c64

Please sign in to comment.