Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed OOM on malicious/malformed thrift #172

Merged
merged 1 commit into from Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -15,7 +15,7 @@ name = "parquet2"
bench = false

[dependencies]
parquet-format-async-temp = "0.3.1"
parquet-format-safe = "0.1"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
streaming-decompression = "0.1"

Expand Down
2 changes: 1 addition & 1 deletion examples/read_metadata.rs
Expand Up @@ -118,7 +118,7 @@ fn main() -> Result<()> {

// ANCHOR: pages
use parquet2::read::get_page_iterator;
let pages = get_page_iterator(column_metadata, &mut reader, None, vec![])?;
let pages = get_page_iterator(column_metadata, &mut reader, None, vec![], 1024 * 1024)?;
// ANCHOR_END: pages

// ANCHOR: decompress
Expand Down
4 changes: 2 additions & 2 deletions src/bloom_filter/read.rs
@@ -1,6 +1,6 @@
use std::io::{Read, Seek, SeekFrom};

use parquet_format_async_temp::{
use parquet_format_safe::{
thrift::protocol::TCompactInputProtocol, BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHeader, SplitBlockAlgorithm, Uncompressed,
};
Expand All @@ -27,7 +27,7 @@ pub fn read<R: Read + Seek>(
reader.seek(SeekFrom::Start(offset))?;

// deserialize header
let mut prot = TCompactInputProtocol::new(&mut reader);
let mut prot = TCompactInputProtocol::new(&mut reader, usize::MAX); // max is ok since `BloomFilterHeader` never allocates
let header = BloomFilterHeader::read_from_in_protocol(&mut prot)?;

if header.algorithm != BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}) {
Expand Down
4 changes: 2 additions & 2 deletions src/error.rs
Expand Up @@ -78,8 +78,8 @@ impl From<lz4_flex::block::CompressError> for Error {
}
}

impl From<parquet_format_async_temp::thrift::Error> for Error {
fn from(e: parquet_format_async_temp::thrift::Error) -> Error {
impl From<parquet_format_safe::thrift::Error> for Error {
fn from(e: parquet_format_safe::thrift::Error) -> Error {
Error::General(format!("underlying thrift error: {}", e))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/indexes/index.rs
@@ -1,6 +1,6 @@
use std::any::Any;

use parquet_format_async_temp::ColumnIndex;
use parquet_format_safe::ColumnIndex;

use crate::parquet_bridge::BoundaryOrder;
use crate::schema::types::PrimitiveType;
Expand Down
2 changes: 1 addition & 1 deletion src/indexes/intervals.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::PageLocation;
use parquet_format_safe::PageLocation;

use crate::error::Error;

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -18,7 +18,7 @@ pub mod statistics;
pub mod types;
pub mod write;

use parquet_format_async_temp as thrift_format;
use parquet_format_safe as thrift_format;

pub use streaming_decompression::fallible_streaming_iterator;
pub use streaming_decompression::FallibleStreamingIterator;
Expand Down
23 changes: 10 additions & 13 deletions src/metadata/column_chunk_metadata.rs
@@ -1,6 +1,6 @@
use std::sync::Arc;

use parquet_format_async_temp::{ColumnChunk, ColumnMetaData, Encoding};
use parquet_format_safe::{ColumnChunk, ColumnMetaData, Encoding};

use super::column_descriptor::ColumnDescriptor;
use crate::compression::Compression;
Expand Down Expand Up @@ -117,17 +117,14 @@ impl ColumnChunkMetaData {

/// Returns the offset and length in bytes of the column chunk within the file
pub fn byte_range(&self) -> (u64, u64) {
let col_start = if let Some(dict_page_offset) = self.dictionary_page_offset() {
dict_page_offset
let start = if let Some(dict_page_offset) = self.dictionary_page_offset() {
dict_page_offset as u64
} else {
self.data_page_offset()
self.data_page_offset() as u64
};
let col_len = self.compressed_size();
assert!(
col_start >= 0 && col_len >= 0,
"column start and length should not be negative"
);
(col_start as u64, col_len as u64)
let length = self.compressed_size() as u64;
// this has been validated in [`try_from_thrift`]
(start, length)
}

/// Method to convert from Thrift.
Expand All @@ -137,12 +134,12 @@ impl ColumnChunkMetaData {
) -> Result<Self> {
// validate metadata
if let Some(meta) = &column_chunk.meta_data {
let _: usize = meta.total_compressed_size.try_into()?;
let _: u64 = meta.total_compressed_size.try_into()?;

if let Some(offset) = meta.dictionary_page_offset {
let _: usize = offset.try_into()?;
let _: u64 = offset.try_into()?;
}
let _: usize = meta.data_page_offset.try_into()?;
let _: u64 = meta.data_page_offset.try_into()?;

let _: Compression = meta.codec.try_into()?;
} else {
Expand Down
14 changes: 6 additions & 8 deletions src/metadata/file_metadata.rs
@@ -1,12 +1,12 @@
use crate::{error::Error, metadata::get_sort_order};

use super::{column_order::ColumnOrder, schema_descriptor::SchemaDescriptor, RowGroupMetaData};
use parquet_format_async_temp::ColumnOrder as TColumnOrder;
use parquet_format_safe::ColumnOrder as TColumnOrder;

pub use crate::thrift_format::KeyValue;

/// Metadata for a Parquet file.
// This is almost equal to [`parquet_format_async_temp::FileMetaData`] but contains the descriptors,
// This is almost equal to [`parquet_format_safe::FileMetaData`] but contains the descriptors,
// which are crucial to deserialize pages.
#[derive(Debug, Clone)]
pub struct FileMetaData {
Expand Down Expand Up @@ -60,9 +60,7 @@ impl FileMetaData {
}

/// Deserializes [`crate::thrift_format::FileMetaData`] into this struct
pub fn try_from_thrift(
metadata: parquet_format_async_temp::FileMetaData,
) -> Result<Self, Error> {
pub fn try_from_thrift(metadata: parquet_format_safe::FileMetaData) -> Result<Self, Error> {
let schema_descr = SchemaDescriptor::try_from_thrift(&metadata.schema)?;

let row_groups = metadata
Expand All @@ -86,9 +84,9 @@ impl FileMetaData {
})
}

/// Serializes itself to thrift's [`parquet_format_async_temp::FileMetaData`].
pub fn into_thrift(self) -> parquet_format_async_temp::FileMetaData {
parquet_format_async_temp::FileMetaData {
/// Serializes itself to thrift's [`parquet_format_safe::FileMetaData`].
pub fn into_thrift(self) -> parquet_format_safe::FileMetaData {
parquet_format_safe::FileMetaData {
version: self.version,
schema: self.schema_descr.into_thrift(),
num_rows: self.num_rows as i64,
Expand Down
2 changes: 1 addition & 1 deletion src/metadata/row_metadata.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::RowGroup;
use parquet_format_safe::RowGroup;

use super::{column_chunk_metadata::ColumnChunkMetaData, schema_descriptor::SchemaDescriptor};
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion src/metadata/schema_descriptor.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::SchemaElement;
use parquet_format_safe::SchemaElement;

use crate::{
error::Error,
Expand Down
2 changes: 1 addition & 1 deletion src/read/compression.rs
@@ -1,4 +1,4 @@
use parquet_format_async_temp::DataPageHeaderV2;
use parquet_format_safe::DataPageHeaderV2;
use streaming_decompression;

use crate::compression::{self, Compression};
Expand Down
7 changes: 2 additions & 5 deletions src/read/indexes/deserialize.rs
@@ -1,15 +1,12 @@
use std::io::Cursor;

use parquet_format_async_temp::{thrift::protocol::TCompactInputProtocol, ColumnIndex};
use parquet_format_safe::{thrift::protocol::TCompactInputProtocol, ColumnIndex};

use crate::error::Error;
use crate::schema::types::{PhysicalType, PrimitiveType};

use crate::indexes::{BooleanIndex, ByteIndex, FixedLenByteIndex, Index, NativeIndex};

pub fn deserialize(data: &[u8], primitive_type: PrimitiveType) -> Result<Box<dyn Index>, Error> {
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);
let mut prot = TCompactInputProtocol::new(data, data.len() * 2 + 1024);

let index = ColumnIndex::read_from_in_protocol(&mut prot)?;

Expand Down
11 changes: 5 additions & 6 deletions src/read/indexes/read.rs
@@ -1,10 +1,8 @@
use std::convert::TryInto;
use std::io::{Cursor, Read, Seek, SeekFrom};

use parquet_format_async_temp::ColumnChunk;
use parquet_format_async_temp::{
thrift::protocol::TCompactInputProtocol, OffsetIndex, PageLocation,
};
use parquet_format_safe::ColumnChunk;
use parquet_format_safe::{thrift::protocol::TCompactInputProtocol, OffsetIndex, PageLocation};

use crate::error::Error;
use crate::indexes::Index;
Expand Down Expand Up @@ -102,11 +100,12 @@ fn deserialize_page_locations(
data: &[u8],
column_number: usize,
) -> Result<Vec<Vec<PageLocation>>, Error> {
let mut d = Cursor::new(data);
let len = data.len() * 2 + 1024;
let mut reader = Cursor::new(data);

(0..column_number)
.map(|_| {
let mut prot = TCompactInputProtocol::new(&mut d);
let mut prot = TCompactInputProtocol::new(&mut reader, len);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
Ok(offset.page_locations)
})
Expand Down
21 changes: 11 additions & 10 deletions src/read/metadata.rs
Expand Up @@ -4,8 +4,8 @@ use std::{
io::{Read, Seek, SeekFrom},
};

use parquet_format_async_temp::thrift::protocol::TCompactInputProtocol;
use parquet_format_async_temp::FileMetaData as TFileMetaData;
use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
use parquet_format_safe::FileMetaData as TFileMetaData;

use super::super::{
metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC,
Expand All @@ -31,7 +31,7 @@ fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error>
Ok(len)
}

/// Reads from the end of the reader a [`FileMetaData`].
/// Reads a [`FileMetaData`] from the reader, located at the end of the file.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
// check file is large enough to hold footer
let file_size = stream_len(reader)?;
Expand All @@ -53,9 +53,7 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {

// check this is indeed a parquet file
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
return Err(Error::OutOfSpec(
"Invalid Parquet file. Corrupt footer".to_string(),
));
return Err(Error::OutOfSpec("The file must end with PAR1".to_string()));
}

let metadata_len = metadata_len(&buffer, default_end_len);
Expand All @@ -69,7 +67,7 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
));
}

let reader = if (footer_len as usize) < buffer.len() {
let reader: &[u8] = if (footer_len as usize) < buffer.len() {
// the whole metadata is in the bytes we already read
let remaining = buffer.len() - footer_len as usize;
&buffer[remaining..]
Expand All @@ -84,12 +82,15 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
&buffer
};

deserialize_metadata(reader)
// a highly nested but sparse struct could result in many allocations
let max_size = reader.len() * 2 + 1024;

deserialize_metadata(reader, max_size)
}

/// Parse loaded metadata bytes
pub fn deserialize_metadata<R: Read>(reader: R) -> Result<FileMetaData> {
let mut prot = TCompactInputProtocol::new(reader);
pub fn deserialize_metadata<R: Read>(reader: R, max_size: usize) -> Result<FileMetaData> {
let mut prot = TCompactInputProtocol::new(reader, max_size);
let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;

FileMetaData::try_from_thrift(metadata)
Expand Down