Skip to content

Commit

Permalink
Fixed panics
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 10, 2022
1 parent 1291b1e commit df20b3b
Show file tree
Hide file tree
Showing 37 changed files with 196 additions and 133 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -15,7 +15,7 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-async-temp = "0.3.1"
parquet-format-safe = "0.1"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
streaming-decompression = "0.1"

Expand Down
2 changes: 1 addition & 1 deletion examples/read_metadata.rs
Expand Up @@ -118,7 +118,7 @@ fn main() -> Result<()> {

// ANCHOR: pages
use parquet2::read::get_page_iterator;
let pages = get_page_iterator(column_metadata, &mut reader, None, vec![])?;
let pages = get_page_iterator(column_metadata, &mut reader, None, vec![], 1024 * 1024)?;
// ANCHOR_END: pages

// ANCHOR: decompress
Expand Down
4 changes: 2 additions & 2 deletions src/bloom_filter/read.rs
@@ -1,6 +1,6 @@
use std::io::{Read, Seek, SeekFrom};

use parquet_format_async_temp::{
use parquet_format_safe::{
thrift::protocol::TCompactInputProtocol, BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHeader, SplitBlockAlgorithm, Uncompressed,
};
Expand All @@ -27,7 +27,7 @@ pub fn read<R: Read + Seek>(
reader.seek(SeekFrom::Start(offset))?;

// deserialize header
let mut prot = TCompactInputProtocol::new(&mut reader);
let mut prot = TCompactInputProtocol::new(&mut reader, usize::MAX); // max is ok since `BloomFilterHeader` never allocates
let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;

if header.algorithm != BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}) {
Expand Down
4 changes: 2 additions & 2 deletions src/error.rs
Expand Up @@ -78,8 +78,8 @@ impl From<lz4_flex::block::CompressError> for Error {
}
}

impl From<parquet_format_async_temp::thrift::Error> for Error {
fn from(e: parquet_format_async_temp::thrift::Error) -> Error {
impl From<parquet_format_safe::thrift::Error> for Error {
fn from(e: parquet_format_safe::thrift::Error) -> Error {
Error::General(format!("underlying thrift error: {}", e))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/indexes/index.rs
@@ -1,6 +1,6 @@
use std::any::Any;

use parquet_format_async_temp::ColumnIndex;
use parquet_format_safe::ColumnIndex;

use crate::parquet_bridge::BoundaryOrder;
use crate::schema::types::PrimitiveType;
Expand Down
2 changes: 1 addition & 1 deletion src/indexes/intervals.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::PageLocation;
use parquet_format_safe::PageLocation;

use crate::error::Error;

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -18,7 +18,7 @@ pub mod statistics;
pub mod types;
pub mod write;

use parquet_format_async_temp as thrift_format;
use parquet_format_safe as thrift_format;

pub use streaming_decompression::fallible_streaming_iterator;
pub use streaming_decompression::FallibleStreamingIterator;
Expand Down
23 changes: 10 additions & 13 deletions src/metadata/column_chunk_metadata.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;

use parquet_format_async_temp::{ColumnChunk, ColumnMetaData, Encoding};
use parquet_format_safe::{ColumnChunk, ColumnMetaData, Encoding};

use super::column_descriptor::ColumnDescriptor;
use crate::compression::Compression;
Expand Down Expand Up @@ -117,17 +117,14 @@ impl ColumnChunkMetaData {

/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
let col_start = if let Some(dict_page_offset) = self.dictionary_page_offset() {
dict_page_offset
let start = if let Some(dict_page_offset) = self.dictionary_page_offset() {
dict_page_offset as u64
} else {
self.data_page_offset()
self.data_page_offset() as u64
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
let length = self.compressed_size() as u64;
// this has been validated in [`try_from_thrift`]
(start, length)
}

/// Method to convert from Thrift.
Expand All @@ -137,12 +134,12 @@ impl ColumnChunkMetaData {
) -> Result<Self> {
// validate metadata
if let Some(meta) = &column_chunk.meta_data {
let _: usize = meta.total_compressed_size.try_into()?;
let _: u64 = meta.total_compressed_size.try_into()?;

if let Some(offset) = meta.dictionary_page_offset {
let _: usize = offset.try_into()?;
let _: u64 = offset.try_into()?;
}
let _: usize = meta.data_page_offset.try_into()?;
let _: u64 = meta.data_page_offset.try_into()?;

let _: Compression = meta.codec.try_into()?;
} else {
Expand Down
14 changes: 6 additions & 8 deletions src/metadata/file_metadata.rs
@@ -1,12 +1,12 @@
use crate::{error::Error, metadata::get_sort_order};

use super::{column_order::ColumnOrder, schema_descriptor::SchemaDescriptor, RowGroupMetaData};
use parquet_format_async_temp::ColumnOrder as TColumnOrder;
use parquet_format_safe::ColumnOrder as TColumnOrder;

pub use crate::thrift_format::KeyValue;

/// Metadata for a Parquet file.
// This is almost equal to [`parquet_format_async_temp::FileMetaData`] but contains the descriptors,
// This is almost equal to [`parquet_format_safe::FileMetaData`] but contains the descriptors,
// which are crucial to deserialize pages.
#[derive(Debug, Clone)]
pub struct FileMetaData {
Expand Down Expand Up @@ -60,9 +60,7 @@ impl FileMetaData {
}

/// Deserializes [`crate::thrift_format::FileMetaData`] into this struct
pub fn try_from_thrift(
metadata: parquet_format_async_temp::FileMetaData,
) -> Result<Self, Error> {
pub fn try_from_thrift(metadata: parquet_format_safe::FileMetaData) -> Result<Self, Error> {
let schema_descr = SchemaDescriptor::try_from_thrift(&metadata.schema)?;

let row_groups = metadata
Expand All @@ -86,9 +84,9 @@ impl FileMetaData {
})
}

/// Serializes itself to thrift's [`parquet_format_async_temp::FileMetaData`].
pub fn into_thrift(self) -> parquet_format_async_temp::FileMetaData {
parquet_format_async_temp::FileMetaData {
/// Serializes itself to thrift's [`parquet_format_safe::FileMetaData`].
pub fn into_thrift(self) -> parquet_format_safe::FileMetaData {
parquet_format_safe::FileMetaData {
version: self.version,
schema: self.schema_descr.into_thrift(),
num_rows: self.num_rows as i64,
Expand Down
2 changes: 1 addition & 1 deletion src/metadata/row_metadata.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::RowGroup;
use parquet_format_safe::RowGroup;

use super::{column_chunk_metadata::ColumnChunkMetaData, schema_descriptor::SchemaDescriptor};
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion src/metadata/schema_descriptor.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::SchemaElement;
use parquet_format_safe::SchemaElement;

use crate::{
error::Error,
Expand Down
2 changes: 1 addition & 1 deletion src/read/compression.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::DataPageHeaderV2;
use parquet_format_safe::DataPageHeaderV2;
use streaming_decompression;

use crate::compression::{self, Compression};
Expand Down
7 changes: 2 additions & 5 deletions src/read/indexes/deserialize.rs
@@ -1,15 +1,12 @@
use std::io::Cursor;

use parquet_format_async_temp::{thrift::protocol::TCompactInputProtocol, ColumnIndex};
use parquet_format_safe::{thrift::protocol::TCompactInputProtocol, ColumnIndex};

use crate::error::Error;
use crate::schema::types::{PhysicalType, PrimitiveType};

use crate::indexes::{BooleanIndex, ByteIndex, FixedLenByteIndex, Index, NativeIndex};

pub fn deserialize(data: &[u8], primitive_type: PrimitiveType) -> Result<Box<dyn Index>, Error> {
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);
let mut prot = TCompactInputProtocol::new(data, data.len() * 2 + 1024);

let index = ColumnIndex::read_from_in_protocol(&mut prot)?;

Expand Down
11 changes: 5 additions & 6 deletions src/read/indexes/read.rs
@@ -1,10 +1,8 @@
use std::convert::TryInto;
use std::io::{Cursor, Read, Seek, SeekFrom};

use parquet_format_async_temp::ColumnChunk;
use parquet_format_async_temp::{
thrift::protocol::TCompactInputProtocol, OffsetIndex, PageLocation,
};
use parquet_format_safe::ColumnChunk;
use parquet_format_safe::{thrift::protocol::TCompactInputProtocol, OffsetIndex, PageLocation};

use crate::error::Error;
use crate::indexes::Index;
Expand Down Expand Up @@ -102,11 +100,12 @@ fn deserialize_page_locations(
data: &[u8],
column_number: usize,
) -> Result<Vec<Vec<PageLocation>>, Error> {
let mut d = Cursor::new(data);
let len = data.len() * 2 + 1024;
let mut reader = Cursor::new(data);

(0..column_number)
.map(|_| {
let mut prot = TCompactInputProtocol::new(&mut d);
let mut prot = TCompactInputProtocol::new(&mut reader, len);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
Ok(offset.page_locations)
})
Expand Down
21 changes: 11 additions & 10 deletions src/read/metadata.rs
Expand Up @@ -4,8 +4,8 @@ use std::{
io::{Read, Seek, SeekFrom},
};

use parquet_format_async_temp::thrift::protocol::TCompactInputProtocol;
use parquet_format_async_temp::FileMetaData as TFileMetaData;
use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
use parquet_format_safe::FileMetaData as TFileMetaData;

use super::super::{
metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC,
Expand All @@ -31,7 +31,7 @@ fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error>
Ok(len)
}

/// Reads from the end of the reader a [`FileMetaData`].
/// Reads a [`FileMetaData`] from the reader, located at the end of the file.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
// check file is large enough to hold footer
let file_size = stream_len(reader)?;
Expand All @@ -53,9 +53,7 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {

// check this is indeed a parquet file
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
return Err(Error::OutOfSpec(
"Invalid Parquet file. Corrupt footer".to_string(),
));
return Err(Error::OutOfSpec("The file must end with PAR1".to_string()));
}

let metadata_len = metadata_len(&buffer, default_end_len);
Expand All @@ -69,7 +67,7 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
));
}

let reader = if (footer_len as usize) < buffer.len() {
let reader: &[u8] = if (footer_len as usize) < buffer.len() {
// the whole metadata is in the bytes we already read
let remaining = buffer.len() - footer_len as usize;
&buffer[remaining..]
Expand All @@ -84,12 +82,15 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
&buffer
};

deserialize_metadata(reader)
// a highly nested but sparse struct could result in many allocations
let max_size = reader.len() * 2 + 1024;

deserialize_metadata(reader, max_size)
}

/// Parse loaded metadata bytes
pub fn deserialize_metadata<R: Read>(reader: R) -> Result<FileMetaData> {
let mut prot = TCompactInputProtocol::new(reader);
pub fn deserialize_metadata<R: Read>(reader: R, max_size: usize) -> Result<FileMetaData> {
let mut prot = TCompactInputProtocol::new(reader, max_size);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;

FileMetaData::try_from_thrift(metadata)
Expand Down

0 comments on commit df20b3b

Please sign in to comment.