Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved parquet stats deserialization (#962)
Browse files Browse the repository at this point in the history
* Fixed stats

* Fixed error in decimal stats

* Fixed struct stats
  • Loading branch information
jorgecarleitao committed Apr 25, 2022
1 parent 8fb3b8d commit bb4f7d8
Show file tree
Hide file tree
Showing 19 changed files with 1,182 additions and 729 deletions.
10 changes: 9 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ fn main() -> Result<()> {
let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, Some(&[8]), None, None, None)?;

println!("{:#?}", reader.metadata());
println!("{:#?}", reader.schema());

// say we want to evaluate if the we can skip some row groups based on a field's value
let field = &reader.schema().fields[0];

// we can deserialize the parquet statistics from this field
let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?;

println!("{:#?}", statistics);

let start = SystemTime::now();
for maybe_chunk in reader {
Expand Down
19 changes: 9 additions & 10 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@


def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
int64 = [0, 1, None, 3, None, 5, 6, 7, None, 9]
int64 = [-256, -1, None, 3, None, 5, 6, 7, None, 9]
uint32 = [0, 1, None, 3, None, 5, 6, 7, None, 9]
float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0]
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
Expand All @@ -24,7 +25,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("float64", pa.float64()),
pa.field("string", pa.utf8()),
pa.field("bool", pa.bool_()),
pa.field("date", pa.timestamp("ms")),
pa.field("timestamp_ms", pa.timestamp("ms")),
pa.field("uint32", pa.uint32()),
pa.field("string_large", pa.utf8()),
# decimal testing
Expand All @@ -44,8 +45,8 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"uint32": int64,
"timestamp_ms": uint32,
"uint32": uint32,
"string_large": string_large,
"decimal_9": decimal,
"decimal_18": decimal,
Expand All @@ -61,7 +62,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:


def case_basic_required() -> Tuple[dict, pa.Schema, str]:
int64 = [-256, -1, 0, 1, 2, 3, 4, 5, 6, 7]
int64 = [-256, -1, 2, 3, 4, 5, 6, 7, 8, 9]
uint32 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
float64 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
string = ["Hello", "bbb", "aa", "", "bbb", "abc", "bbb", "bbb", "def", "aaa"]
Expand All @@ -74,10 +75,8 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("string", pa.utf8(), nullable=False),
pa.field("bool", pa.bool_(), nullable=False),
pa.field(
"date",
pa.timestamp(
"ms",
),
"timestamp_ms",
pa.timestamp("ms"),
nullable=False,
),
pa.field("uint32", pa.uint32(), nullable=False),
Expand All @@ -93,7 +92,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"timestamp_ms": uint32,
"uint32": uint32,
"decimal_9": decimal,
"decimal_18": decimal,
Expand Down
134 changes: 22 additions & 112 deletions src/io/parquet/read/statistics/binary.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,23 @@
use std::any::Any;
use std::convert::TryFrom;

use crate::datatypes::DataType;
use parquet2::statistics::BinaryStatistics as ParquetByteArrayStatistics;

use super::Statistics;
use crate::error::{ArrowError, Result};

/// Represents a `Binary` or `LargeBinary`
#[derive(Debug, Clone, PartialEq)]
pub struct BinaryStatistics {
/// number of nulls
pub null_count: Option<i64>,
/// number of dictinct values
pub distinct_count: Option<i64>,
/// Minimum
pub min_value: Option<Vec<u8>>,
/// Maximum
pub max_value: Option<Vec<u8>>,
}

impl Statistics for BinaryStatistics {
fn data_type(&self) -> &DataType {
&DataType::Binary
}

fn as_any(&self) -> &dyn Any {
self
}

fn null_count(&self) -> Option<i64> {
self.null_count
}
}

impl From<&ParquetByteArrayStatistics> for BinaryStatistics {
fn from(stats: &ParquetByteArrayStatistics) -> Self {
Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats.min_value.clone(),
max_value: stats.max_value.clone(),
}
}
}

/// Statistics of a string parquet column
#[derive(Debug, Clone, PartialEq)]
pub struct Utf8Statistics {
/// number of nulls
pub null_count: Option<i64>,
/// number of dictinct values
pub distinct_count: Option<i64>,
/// Minimum
pub min_value: Option<String>,
/// Maximum
pub max_value: Option<String>,
}

impl Statistics for Utf8Statistics {
fn data_type(&self) -> &DataType {
&DataType::Utf8
}

fn as_any(&self) -> &dyn Any {
self
}

fn null_count(&self) -> Option<i64> {
self.null_count
}
}

impl TryFrom<&ParquetByteArrayStatistics> for Utf8Statistics {
type Error = ArrowError;

fn try_from(stats: &ParquetByteArrayStatistics) -> Result<Self> {
Ok(Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats
.min_value
.as_ref()
.map(|x| simdutf8::basic::from_utf8(x).map(|x| x.to_string()))
.transpose()?,
max_value: stats
.max_value
.as_ref()
.map(|x| simdutf8::basic::from_utf8(x).map(|x| x.to_string()))
.transpose()?,
})
}
}

pub(super) fn statistics_from_byte_array(
stats: &ParquetByteArrayStatistics,
data_type: DataType,
) -> Result<Box<dyn Statistics>> {
use DataType::*;
Ok(match data_type {
Utf8 => Box::new(Utf8Statistics::try_from(stats)?),
LargeUtf8 => Box::new(Utf8Statistics::try_from(stats)?),
Binary => Box::new(BinaryStatistics::from(stats)),
LargeBinary => Box::new(BinaryStatistics::from(stats)),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read {:?} from parquet",
other
)))
}
})
use crate::array::{MutableArray, MutableBinaryArray, Offset};
use parquet2::statistics::{BinaryStatistics, Statistics as ParquetStatistics};

use crate::error::Result;

pub(super) fn push<O: Offset>(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutableBinaryArray<O>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutableBinaryArray<O>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<BinaryStatistics>().unwrap());
min.push(from.and_then(|s| s.min_value.as_ref()));
max.push(from.and_then(|s| s.max_value.as_ref()));
Ok(())
}
60 changes: 20 additions & 40 deletions src/io/parquet/read/statistics/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,23 @@
use crate::datatypes::DataType;
use parquet2::statistics::BooleanStatistics as ParquetBooleanStatistics;
use std::any::Any;
use crate::array::{MutableArray, MutableBooleanArray};
use parquet2::statistics::{BooleanStatistics, Statistics as ParquetStatistics};

use super::Statistics;
use crate::error::Result;

/// Statistics of a boolean parquet column
#[derive(Debug, Clone, PartialEq)]
pub struct BooleanStatistics {
/// number of nulls
pub null_count: Option<i64>,
/// number of dictinct values
pub distinct_count: Option<i64>,
/// Minimum
pub min_value: Option<bool>,
/// Maximum
pub max_value: Option<bool>,
}

impl Statistics for BooleanStatistics {
fn data_type(&self) -> &DataType {
&DataType::Boolean
}

fn as_any(&self) -> &dyn Any {
self
}

fn null_count(&self) -> Option<i64> {
self.null_count
}
}

impl From<&ParquetBooleanStatistics> for BooleanStatistics {
fn from(stats: &ParquetBooleanStatistics) -> Self {
Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats.min_value,
max_value: stats.max_value,
}
}
pub(super) fn push(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<BooleanStatistics>().unwrap());
min.push(from.and_then(|s| s.min_value));
max.push(from.and_then(|s| s.max_value));
Ok(())
}
65 changes: 65 additions & 0 deletions src/io/parquet/read/statistics/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::array::*;
use crate::datatypes::{DataType, PhysicalType};
use crate::error::Result;

use super::make_mutable;

#[derive(Debug)]
pub struct DynMutableDictionary {
data_type: DataType,
pub inner: Box<dyn MutableArray>,
}

impl DynMutableDictionary {
pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result<Self> {
let inner = if let DataType::Dictionary(_, inner, _) = &data_type {
inner.as_ref()
} else {
unreachable!()
};
let inner = make_mutable(inner, capacity)?;

Ok(Self { data_type, inner })
}
}

impl MutableArray for DynMutableDictionary {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn len(&self) -> usize {
self.inner.len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
self.inner.validity()
}

fn as_box(&mut self) -> Box<dyn Array> {
let inner = self.inner.as_arc();
match self.data_type.to_physical_type() {
PhysicalType::Dictionary(key) => match_integer_type!(key, |$T| {
let keys = PrimitiveArray::<$T>::from_iter((0..inner.len() as $T).map(Some));
Box::new(DictionaryArray::<$T>::from_data(keys, inner))
}),
_ => todo!(),
}
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn push_null(&mut self) {
todo!()
}

fn shrink_to_fit(&mut self) {
todo!()
}
}
Loading

0 comments on commit bb4f7d8

Please sign in to comment.