diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index 99ab8817cbb..13530dc2809 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use arrow_format::ipc::{ planus::{ReadAsRoot, Vector}, - BlockRef, FooterRef, MessageHeaderRef, MessageRef, + BlockRef, MessageHeaderRef, MessageRef, }; use futures::{ stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt, @@ -18,8 +18,7 @@ use crate::error::{ArrowError, Result}; use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; -use super::reader::get_serialized_batch; -use super::schema::fb_to_schema; +use super::reader::{deserialize_footer, get_serialized_batch}; use super::Dictionaries; use super::FileMetadata; @@ -114,62 +113,51 @@ impl<'a> Stream for FileStream<'a> { } } +/// Reads the footer's length and magic number in footer +async fn read_footer_len(reader: &mut R) -> Result { + // read footer length and magic number in footer + reader.seek(SeekFrom::End(-10)).await?; + let mut footer: [u8; 10] = [0; 10]; + + reader.read_exact(&mut footer).await?; + let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap()); + + if footer[4..] != ARROW_MAGIC { + return Err(ArrowError::OutOfSpec( + "Arrow file does not contain correct footer".to_string(), + )); + } + footer_len + .try_into() + .map_err(|_| ArrowError::oos("The footer's lenght must be a positive number")) +} + /// Read the metadata from an IPC file. -pub async fn read_file_metadata_async(mut reader: R) -> Result +pub async fn read_file_metadata_async(reader: &mut R) -> Result where R: AsyncRead + AsyncSeek + Unpin, { - // Check header - let mut magic = [0; 6]; - reader.read_exact(&mut magic).await?; - if magic != ARROW_MAGIC { - return Err(ArrowError::OutOfSpec( - "file does not contain correct Arrow header".to_string(), - )); - } - // Check footer - reader.seek(SeekFrom::End(-6)).await?; - reader.read_exact(&mut magic).await?; - if magic != ARROW_MAGIC { - return Err(ArrowError::OutOfSpec( - "file does not contain correct Arrow footer".to_string(), - )); - } - // Get footer size - let mut footer_size = [0; 4]; - reader.seek(SeekFrom::End(-10)).await?; - reader.read_exact(&mut footer_size).await?; - let footer_size = i32::from_le_bytes(footer_size); + let footer_size = read_footer_len(reader).await?; // Read footer let mut footer = vec![0; footer_size as usize]; reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?; reader.read_exact(&mut footer).await?; - let footer = FooterRef::read_as_root(&footer[..]) - .map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?; - let blocks = footer.record_batches()?.ok_or_else(|| { - ArrowError::OutOfSpec("unable to get record batches from footer".to_string()) - })?; - let schema = footer - .schema()? - .ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?; - let (schema, ipc_schema) = fb_to_schema(schema)?; - let dictionary_blocks = footer.dictionaries()?; - let dictionaries = if let Some(blocks) = dictionary_blocks { - read_dictionaries(reader, &schema.fields[..], &ipc_schema, blocks).await? + let (mut metadata, dictionary_blocks) = deserialize_footer(&footer)?; + + metadata.dictionaries = if let Some(blocks) = dictionary_blocks { + read_dictionaries( + reader, + &metadata.schema.fields, + &metadata.ipc_schema, + blocks, + ) + .await? } else { Default::default() }; - Ok(FileMetadata { - schema, - ipc_schema, - blocks: blocks - .iter() - .map(|block| Ok(block.try_into()?)) - .collect::>>()?, - dictionaries, - }) + Ok(metadata) } async fn read_dictionaries( diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 0ffab629793..307bfb6357d 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -108,63 +108,89 @@ fn read_dictionaries( Ok(dictionaries) } -/// Read the IPC file's metadata -pub fn read_file_metadata(reader: &mut R) -> Result { - // check if header and footer contain correct magic bytes - let mut magic_buffer: [u8; 6] = [0; 6]; - reader.read_exact(&mut magic_buffer)?; - if magic_buffer != ARROW_MAGIC { - return Err(ArrowError::OutOfSpec( - "Arrow file does not contain correct header".to_string(), - )); - } - reader.seek(SeekFrom::End(-6))?; - reader.read_exact(&mut magic_buffer)?; - if magic_buffer != ARROW_MAGIC { +/// Reads the footer's length and magic number in footer +fn read_footer_len(reader: &mut R) -> Result { + // read footer length and magic number in footer + reader.seek(SeekFrom::End(-10))?; + let mut footer: [u8; 10] = [0; 10]; + + reader.read_exact(&mut footer)?; + let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap()); + + if footer[4..] != ARROW_MAGIC { return Err(ArrowError::OutOfSpec( "Arrow file does not contain correct footer".to_string(), )); } - // read footer length - let mut footer_size: [u8; 4] = [0; 4]; - reader.seek(SeekFrom::End(-10))?; - reader.read_exact(&mut footer_size)?; - let footer_len = i32::from_le_bytes(footer_size); - - // read footer - let mut footer_data = vec![0; footer_len as usize]; - reader.seek(SeekFrom::End(-10 - footer_len as i64))?; - reader.read_exact(&mut footer_data)?; + footer_len + .try_into() + .map_err(|_| ArrowError::oos("The footer's lenght must be a positive number")) +} - let footer = arrow_format::ipc::FooterRef::read_as_root(&footer_data) +pub(super) fn deserialize_footer( + footer_data: &[u8], +) -> Result<(FileMetadata, Option>)> { + let footer = arrow_format::ipc::FooterRef::read_as_root(footer_data) .map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?; let blocks = footer.record_batches()?.ok_or_else(|| { ArrowError::OutOfSpec("Unable to get record batches from footer".to_string()) })?; + let blocks = blocks + .iter() + .map(|block| Ok(block.try_into()?)) + .collect::>>()?; + let ipc_schema = footer .schema()? .ok_or_else(|| ArrowError::OutOfSpec("Unable to get the schema from footer".to_string()))?; let (schema, ipc_schema) = fb_to_schema(ipc_schema)?; - let dictionary_blocks = footer.dictionaries()?; + Ok(( + FileMetadata { + schema, + ipc_schema, + blocks, + dictionaries: Default::default(), + }, + footer.dictionaries()?, + )) +} + +/// Read the IPC file's metadata +pub fn read_file_metadata(reader: &mut R) -> Result { + // check if header contain the correct magic bytes + let mut magic_buffer: [u8; 6] = [0; 6]; + reader.read_exact(&mut magic_buffer)?; + if magic_buffer != ARROW_MAGIC { + return Err(ArrowError::OutOfSpec( + "Arrow file does not contain correct header".to_string(), + )); + } + + let footer_len = read_footer_len(reader)?; + + // read footer + let mut footer_data = vec![0; footer_len as usize]; + reader.seek(SeekFrom::End(-10 - footer_len as i64))?; + reader.read_exact(&mut footer_data)?; - let dictionaries = if let Some(blocks) = dictionary_blocks { - read_dictionaries(reader, &schema.fields, &ipc_schema, blocks)? + let (mut metadata, dictionary_blocks) = deserialize_footer(&footer_data)?; + + // read dictionaries + metadata.dictionaries = if let Some(blocks) = dictionary_blocks { + read_dictionaries( + reader, + &metadata.schema.fields, + &metadata.ipc_schema, + blocks, + )? } else { Default::default() }; - Ok(FileMetadata { - schema, - ipc_schema, - blocks: blocks - .iter() - .map(|block| Ok(block.try_into()?)) - .collect::>>()?, - dictionaries, - }) + Ok(metadata) } pub(super) fn get_serialized_batch<'a>( @@ -272,26 +298,27 @@ impl Iterator for FileReader { fn next(&mut self) -> Option { // get current block - if self.current_block < self.metadata.blocks.len() { - let block = self.current_block; - self.current_block += 1; - let chunk = read_batch( - &mut self.reader, - &self.metadata, - self.projection.as_ref().map(|x| x.0.as_ref()), - block, - &mut self.buffer, - ); - - let chunk = if let Some((projection, map, _)) = &self.projection { - // re-order according to projection - chunk.map(|chunk| apply_projection(chunk, projection, map)) - } else { - chunk - }; - Some(chunk) - } else { - None + if self.current_block == self.metadata.blocks.len() { + return None; } + + let block = self.current_block; + self.current_block += 1; + + let chunk = read_batch( + &mut self.reader, + &self.metadata, + self.projection.as_ref().map(|x| x.0.as_ref()), + block, + &mut self.buffer, + ); + + let chunk = if let Some((projection, map, _)) = &self.projection { + // re-order according to projection + chunk.map(|chunk| apply_projection(chunk, projection, map)) + } else { + chunk + }; + Some(chunk) } }