Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved reading of bitmaps from IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 29, 2022
1 parent 339610e commit 5b3e897
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/io/ipc/read/array/binary.rs
Expand Up @@ -34,6 +34,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
4 changes: 4 additions & 0 deletions src/io/ipc/read/array/boolean.rs
Expand Up @@ -8,6 +8,7 @@ use crate::error::{Error, Result};
use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

#[allow(clippy::too_many_arguments)]
pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
Expand All @@ -16,6 +17,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut Vec<u8>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Error::oos(format!(
Expand All @@ -36,6 +38,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let values = read_bitmap(
Expand All @@ -45,6 +48,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;
BooleanArray::try_new(data_type, values, validity)
}
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/array/fixed_size_binary.rs
Expand Up @@ -33,6 +33,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/array/fixed_size_list.rs
Expand Up @@ -38,6 +38,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let (field, _) = FixedSizeListArray::get_child_and_size(&data_type);
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/array/list.rs
Expand Up @@ -43,6 +43,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/array/map.rs
Expand Up @@ -39,6 +39,7 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/array/primitive.rs
Expand Up @@ -36,6 +36,7 @@ where
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/array/struct_.rs
Expand Up @@ -38,6 +38,7 @@ pub fn read_struct<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let fields = StructArray::get_fields(&data_type);
Expand Down
1 change: 1 addition & 0 deletions src/io/ipc/read/array/utf8.rs
Expand Up @@ -34,6 +34,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?;

let length: usize = field_node
Expand Down
3 changes: 2 additions & 1 deletion src/io/ipc/read/deserialize.rs
Expand Up @@ -39,6 +39,7 @@ pub fn read<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)
.map(|x| x.boxed()),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Expand All @@ -50,7 +51,7 @@ pub fn read<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch
scratch,
)
.map(|x| x.boxed())
}),
Expand Down
28 changes: 17 additions & 11 deletions src/io/ipc/read/read_basic.rs
Expand Up @@ -181,10 +181,13 @@ fn read_uncompressed_bitmap<R: Read + Seek>(
number_of_bits: bytes * 8,
}));
}
// it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
// see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
let mut buffer = vec![0; bytes];
reader.read_exact(buffer.as_mut_slice())?;

let mut buffer = vec![];
buffer.try_reserve(bytes)?;
reader
.by_ref()
.take(bytes as u64)
.read_to_end(&mut buffer)?;

Ok(buffer)
}
Expand All @@ -194,24 +197,24 @@ fn read_compressed_bitmap<R: Read + Seek>(
bytes: usize,
compression: Compression,
reader: &mut R,
scratch: &mut Vec<u8>,
) -> Result<Vec<u8>> {
let mut buffer = vec![0; (length + 7) / 8];

// read all first
// todo: move this allocation to an external buffer for re-use
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;
scratch.clear();
scratch.try_reserve(bytes)?;
reader.by_ref().take(bytes as u64).read_to_end(scratch)?;

let compression = compression
.codec()
.map_err(|err| Error::from(OutOfSpecKind::InvalidFlatbufferCompression(err)))?;

match compression {
arrow_format::ipc::CompressionType::Lz4Frame => {
compression::decompress_lz4(&slice[8..], &mut buffer)?;
compression::decompress_lz4(&scratch[8..], &mut buffer)?;
}
arrow_format::ipc::CompressionType::Zstd => {
compression::decompress_zstd(&slice[8..], &mut buffer)?;
compression::decompress_zstd(&scratch[8..], &mut buffer)?;
}
}
Ok(buffer)
Expand All @@ -224,6 +227,7 @@ pub fn read_bitmap<R: Read + Seek>(
block_offset: u64,
_: bool,
compression: Option<Compression>,
scratch: &mut Vec<u8>,
) -> Result<Bitmap> {
let buf = buf
.pop_front()
Expand All @@ -242,7 +246,7 @@ pub fn read_bitmap<R: Read + Seek>(
reader.seek(SeekFrom::Start(block_offset + offset))?;

let buffer = if let Some(compression) = compression {
read_compressed_bitmap(length, bytes, compression, reader)
read_compressed_bitmap(length, bytes, compression, reader, scratch)
} else {
read_uncompressed_bitmap(length, bytes, reader)
}?;
Expand All @@ -257,6 +261,7 @@ pub fn read_validity<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
scratch: &mut Vec<u8>,
) -> Result<Option<Bitmap>> {
let length: usize = field_node
.length()
Expand All @@ -271,6 +276,7 @@ pub fn read_validity<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
scratch,
)?)
} else {
let _ = buffers
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/reader.rs
Expand Up @@ -230,9 +230,14 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
let (end, footer_len) = read_footer_len(reader)?;

// read footer
let mut serialized_footer = vec![0; footer_len];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut serialized_footer)?;

let mut serialized_footer = vec![];
serialized_footer.try_reserve(footer_len)?;
reader
.by_ref()
.take(footer_len as u64)
.read_to_end(&mut serialized_footer)?;

deserialize_footer(&serialized_footer, end - start)
}
Expand Down

0 comments on commit 5b3e897

Please sign in to comment.