Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed panics on read #150

Merged
merged 2 commits into from Jun 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -15,7 +15,7 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-async-temp = "0.3.0"
parquet-format-async-temp = "0.3.1"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
streaming-decompression = "0.1"

Expand All @@ -34,6 +34,7 @@ xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt"] }
criterion = "0.3"
rand = "0.8"

[features]
default = ["snappy", "gzip", "lz4", "zstd", "brotli", "bloom_filter"]
Expand Down
4 changes: 2 additions & 2 deletions examples/read_metadata.rs
Expand Up @@ -6,9 +6,9 @@ use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage};
use parquet2::schema::types::PhysicalType;

fn deserialize(page: &DataPage) {
fn deserialize(page: &DataPage) -> Result<()> {
// split the data buffer in repetition levels, definition levels and values
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page);
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page)?;

// decode and deserialize.
match (
Expand Down
18 changes: 9 additions & 9 deletions src/deserialize/binary.rs
Expand Up @@ -14,10 +14,10 @@ pub struct Dictionary<'a> {
}

impl<'a> Dictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let indexes = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self, Error> {
let indexes = utils::dict_indices_decoder(page)?;

Self { indexes, dict }
Ok(Self { indexes, dict })
}

#[inline]
Expand All @@ -41,26 +41,26 @@ impl<'a> BinaryPageState<'a> {
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(Self::RequiredDictionary(Dictionary::new(page, dict)))
Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(Self::OptionalDictionary(
utils::DefLevelsDecoder::new(page),
Dictionary::new(page, dict),
utils::DefLevelsDecoder::try_new(page)?,
Dictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;

let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;
let values = BinaryIter::new(values, None);

Ok(Self::Optional(validity, values))
}
(Encoding::Plain, _, false) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
let values = BinaryIter::new(values, Some(page.num_values()));

Ok(Self::Required(values))
Expand Down
6 changes: 3 additions & 3 deletions src/deserialize/boolean.rs
Expand Up @@ -22,15 +22,15 @@ impl<'a> BooleanPageState<'a> {

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, _, true) => {
let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;

let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(Self::Optional(validity, values))
}
(Encoding::Plain, _, false) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
Ok(Self::Required(values, page.num_values()))
}
_ => Err(Error::General(format!(
Expand Down
18 changes: 9 additions & 9 deletions src/deserialize/fixed_len.rs
Expand Up @@ -41,10 +41,10 @@ pub struct Dictionary<'a> {
}

impl<'a> Dictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Self {
let indexes = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a FixedLenByteArrayPageDict) -> Result<Self, Error> {
let indexes = utils::dict_indices_decoder(page)?;

Self { indexes, dict }
Ok(Self { indexes, dict })
}

#[inline]
Expand Down Expand Up @@ -79,26 +79,26 @@ impl<'a> FixedLenBinaryPageState<'a> {
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(Self::RequiredDictionary(Dictionary::new(page, dict)))
Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(Self::OptionalDictionary(
utils::DefLevelsDecoder::new(page),
Dictionary::new(page, dict),
utils::DefLevelsDecoder::try_new(page)?,
Dictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;

let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;
let values = FixexBinaryIter::new(values, size);

Ok(Self::Optional(validity, values))
}
(Encoding::Plain, _, false) => {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
let values = FixexBinaryIter::new(values, size);

Ok(Self::Required(values))
Expand Down
21 changes: 9 additions & 12 deletions src/deserialize/native.rs
Expand Up @@ -13,7 +13,7 @@ pub type Casted<'a, T> = std::iter::Map<std::slice::ChunksExact<'a, u8>, fn(&'a

/// Views the values of the data page as [`Casted`] to [`NativeType`].
pub fn native_cast<T: NativeType>(page: &DataPage) -> Result<Casted<T>, Error> {
let (_, _, values) = split_buffer(page);
let (_, _, values) = split_buffer(page)?;
if values.len() % std::mem::size_of::<T>() != 0 {
return Err(Error::OutOfSpec(
"A primitive page data's len must be a multiple of the type".to_string(),
Expand All @@ -31,20 +31,17 @@ where
T: NativeType,
{
pub indexes: hybrid_rle::HybridRleDecoder<'a>,
pub values: &'a [T],
pub dict: &'a PrimitivePageDict<T>,
}

impl<'a, T> Dictionary<'a, T>
where
T: NativeType,
{
pub fn new(page: &'a DataPage, dict: &'a PrimitivePageDict<T>) -> Self {
let indexes = utils::dict_indices_decoder(page);
pub fn try_new(page: &'a DataPage, dict: &'a PrimitivePageDict<T>) -> Result<Self, Error> {
let indexes = utils::dict_indices_decoder(page)?;

Self {
values: dict.values(),
indexes,
}
Ok(Self { dict, indexes })
}

pub fn len(&self) -> usize {
Expand Down Expand Up @@ -84,18 +81,18 @@ impl<'a, T: NativeType> NativePageState<'a, T> {
match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(Self::RequiredDictionary(Dictionary::new(page, dict)))
Ok(Self::RequiredDictionary(Dictionary::try_new(page, dict)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(Self::OptionalDictionary(
utils::DefLevelsDecoder::new(page),
Dictionary::new(page, dict),
utils::DefLevelsDecoder::try_new(page)?,
Dictionary::try_new(page, dict)?,
))
}
(Encoding::Plain, _, true) => {
let validity = utils::DefLevelsDecoder::new(page);
let validity = utils::DefLevelsDecoder::try_new(page)?;
let values = native_cast(page)?;

Ok(Self::Optional(validity, values))
Expand Down
24 changes: 17 additions & 7 deletions src/deserialize/utils.rs
Expand Up @@ -2,22 +2,32 @@ use std::collections::VecDeque;

use crate::{
encoding::hybrid_rle::{self, HybridRleDecoder},
error::{Error, Result},
indexes::Interval,
page::{split_buffer, DataPage},
read::levels::get_bit_width,
};

use super::hybrid_rle::{HybridDecoderBitmapIter, HybridRleIter};

pub(super) fn dict_indices_decoder(page: &DataPage) -> hybrid_rle::HybridRleDecoder {
let (_, _, indices_buffer) = split_buffer(page);
pub(super) fn dict_indices_decoder(page: &DataPage) -> Result<hybrid_rle::HybridRleDecoder> {
let (_, _, indices_buffer) = split_buffer(page)?;

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
if bit_width > 32 {
return Err(Error::OutOfSpec(
"Bit width of dictionary pages cannot be larger than 32".to_string(),
));
}
let indices_buffer = &indices_buffer[1..];

hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, page.num_values())
Ok(hybrid_rle::HybridRleDecoder::new(
indices_buffer,
bit_width as u32,
page.num_values(),
))
}

/// Decoder of definition levels.
Expand All @@ -32,19 +42,19 @@ pub enum DefLevelsDecoder<'a> {
}

impl<'a> DefLevelsDecoder<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, def_levels, _) = split_buffer(page);
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, def_levels, _) = split_buffer(page)?;

let max_def_level = page.descriptor.max_def_level;
if max_def_level == 1 {
Ok(if max_def_level == 1 {
let iter = hybrid_rle::Decoder::new(def_levels, 1);
let iter = HybridRleIter::new(iter, page.num_values());
Self::Bitmap(iter)
} else {
let iter =
HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values());
Self::Levels(iter, max_def_level as u32)
}
})
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/encoding/bitpacking.rs
Expand Up @@ -69,16 +69,16 @@ fn decode_pack(compressed: &[u8], num_bits: u8, pack: &mut [u32; BitPacker1x::BL
}

impl<'a> Decoder<'a> {
pub fn new(compressed: &'a [u8], num_bits: u8, length: usize) -> Self {
pub fn new(compressed: &'a [u8], num_bits: u8, mut length: usize) -> Self {
let compressed_block_size = BitPacker1x::BLOCK_LEN * num_bits as usize / 8;

let mut compressed_chunks = compressed.chunks(compressed_block_size);
let mut current_pack = [0; BitPacker1x::BLOCK_LEN];
decode_pack(
compressed_chunks.next().unwrap(),
num_bits,
&mut current_pack,
);
if let Some(chunk) = compressed_chunks.next() {
decode_pack(chunk, num_bits, &mut current_pack);
} else {
length = 0
};

Self {
remaining: length,
Expand Down
13 changes: 9 additions & 4 deletions src/encoding/hybrid_rle/decoder.rs
Expand Up @@ -30,20 +30,25 @@ impl<'a> Iterator for Decoder<'a> {
}
let (indicator, consumed) = uleb128::decode(self.values);
self.values = &self.values[consumed..];
if self.values.is_empty() {
return None;
};
if indicator & 1 == 1 {
// is bitpacking
let bytes = (indicator as usize >> 1) * self.num_bits as usize;
let bytes = std::cmp::min(bytes, self.values.len());
let result = Some(HybridEncoded::Bitpacked(&self.values[..bytes]));
self.values = &self.values[bytes..];
let (result, remaining) = self.values.split_at(bytes);
let result = Some(HybridEncoded::Bitpacked(result));
self.values = remaining;
result
} else {
// is rle
let run_length = indicator as usize >> 1;
// repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width)
let rle_bytes = ceil8(self.num_bits as usize);
let result = Some(HybridEncoded::Rle(&self.values[..rle_bytes], run_length));
self.values = &self.values[rle_bytes..];
let (result, remaining) = self.values.split_at(rle_bytes);
let result = Some(HybridEncoded::Rle(result, run_length));
self.values = remaining;
result
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/encoding/mod.rs
Expand Up @@ -14,8 +14,10 @@ pub use crate::parquet_bridge::Encoding;
/// # Panics
/// This function panics iff `values.len() < 4`.
#[inline]
pub fn get_length(values: &[u8]) -> u32 {
u32::from_le_bytes(values[0..4].try_into().unwrap())
pub fn get_length(values: &[u8]) -> Option<usize> {
values
.get(0..4)
.map(|x| u32::from_le_bytes(x.try_into().unwrap()) as usize)
}

/// Returns the ceil of value/divisor
Expand Down
14 changes: 10 additions & 4 deletions src/encoding/plain_byte_array.rs
Expand Up @@ -2,6 +2,7 @@
/// prefixes, lengths and values
/// # Implementation
/// This struct does not allocate on the heap.
use crate::error::Error;

#[derive(Debug)]
pub struct BinaryIter<'a> {
Expand All @@ -16,7 +17,7 @@ impl<'a> BinaryIter<'a> {
}

impl<'a> Iterator for BinaryIter<'a> {
type Item = &'a [u8];
type Item = Result<&'a [u8], Error>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -28,9 +29,14 @@ impl<'a> Iterator for BinaryIter<'a> {
}
let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize;
self.values = &self.values[4..];
let result = &self.values[..length];
self.values = &self.values[length..];
Some(result)
if length > self.values.len() {
return Some(Err(Error::OutOfSpec(
"A string in plain encoding declares a length that is out of range".to_string(),
)));
}
let (result, remaining) = self.values.split_at(length);
self.values = remaining;
Some(Ok(result))
}

#[inline]
Expand Down
18 changes: 17 additions & 1 deletion src/metadata/column_chunk_metadata.rs
Expand Up @@ -4,7 +4,7 @@ use parquet_format_async_temp::{ColumnChunk, ColumnMetaData, Encoding};

use super::column_descriptor::ColumnDescriptor;
use crate::compression::Compression;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::schema::types::PhysicalType;
use crate::statistics::{deserialize_statistics, Statistics};

Expand Down Expand Up @@ -135,6 +135,22 @@ impl ColumnChunkMetaData {
column_descr: ColumnDescriptor,
column_chunk: ColumnChunk,
) -> Result<Self> {
// validate metadata
if let Some(meta) = &column_chunk.meta_data {
let _: usize = meta.total_compressed_size.try_into()?;

if let Some(offset) = meta.dictionary_page_offset {
let _: usize = offset.try_into()?;
}
let _: usize = meta.data_page_offset.try_into()?;

let _: Compression = meta.codec.try_into()?;
} else {
return Err(Error::OutOfSpec(
"Column chunk requires metdata".to_string(),
));
}

Ok(Self {
column_chunk,
column_descr,
Expand Down