Skip to content

Commit

Permalink
do not assume footer exists, fixes issue apache#1335
Browse files Browse the repository at this point in the history
  • Loading branch information
pcjentsch committed Apr 29, 2022
1 parent 7d00e3c commit b39ce9d
Showing 1 changed file with 41 additions and 36 deletions.
77 changes: 41 additions & 36 deletions arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,43 +651,48 @@ impl<R: Read + Seek> FileReader<R> {

// Create an array of optional dictionary value arrays, one per field.
let mut dictionaries_by_field = vec![None; schema.all_fields().len()];
for block in footer.dictionaries().unwrap() {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
}
let footer_len = i32::from_le_bytes(message_size);
let mut block_data = vec![0; footer_len as usize];

reader.read_exact(&mut block_data)?;

let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;

match message.header_type() {
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();

// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;

read_dictionary(&buf, batch, &schema, &mut dictionaries_by_field)?;
}
t => {
return Err(ArrowError::IoError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
match footer.dictionaries() {
Some(dictionaries) => {
for block in dictionaries {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
}
let footer_len = i32::from_le_bytes(message_size);
let mut block_data = vec![0; footer_len as usize];

reader.read_exact(&mut block_data)?;

let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;

match message.header_type() {
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();

// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; block.bodyLength() as usize];
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;

read_dictionary(&buf, batch, &schema, &mut dictionaries_by_field)?;
}
t => {
return Err(ArrowError::IoError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
}
}
};
}
_ => (),
}
let projection = match projection {
Some(projection_indices) => {
Expand Down

0 comments on commit b39ce9d

Please sign in to comment.