Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Refactored parquet statistics deserialization #962

Merged
merged 3 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ fn main() -> Result<()> {
let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, Some(&[8]), None, None, None)?;

println!("{:#?}", reader.metadata());
println!("{:#?}", reader.schema());

// say we want to evaluate if the we can skip some row groups based on a field's value
let field = &reader.schema().fields[0];

// we can deserialize the parquet statistics from this field
let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?;

println!("{:#?}", statistics);

let start = SystemTime::now();
for maybe_chunk in reader {
Expand Down
19 changes: 9 additions & 10 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@


def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
int64 = [0, 1, None, 3, None, 5, 6, 7, None, 9]
int64 = [-256, -1, None, 3, None, 5, 6, 7, None, 9]
uint32 = [0, 1, None, 3, None, 5, 6, 7, None, 9]
float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0]
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
Expand All @@ -24,7 +25,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
pa.field("float64", pa.float64()),
pa.field("string", pa.utf8()),
pa.field("bool", pa.bool_()),
pa.field("date", pa.timestamp("ms")),
pa.field("timestamp_ms", pa.timestamp("ms")),
pa.field("uint32", pa.uint32()),
pa.field("string_large", pa.utf8()),
# decimal testing
Expand All @@ -44,8 +45,8 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"uint32": int64,
"timestamp_ms": uint32,
"uint32": uint32,
"string_large": string_large,
"decimal_9": decimal,
"decimal_18": decimal,
Expand All @@ -61,7 +62,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]:


def case_basic_required() -> Tuple[dict, pa.Schema, str]:
int64 = [-256, -1, 0, 1, 2, 3, 4, 5, 6, 7]
int64 = [-256, -1, 2, 3, 4, 5, 6, 7, 8, 9]
uint32 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
float64 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
string = ["Hello", "bbb", "aa", "", "bbb", "abc", "bbb", "bbb", "def", "aaa"]
Expand All @@ -74,10 +75,8 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
pa.field("string", pa.utf8(), nullable=False),
pa.field("bool", pa.bool_(), nullable=False),
pa.field(
"date",
pa.timestamp(
"ms",
),
"timestamp_ms",
pa.timestamp("ms"),
nullable=False,
),
pa.field("uint32", pa.uint32(), nullable=False),
Expand All @@ -93,7 +92,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]:
"float64": float64,
"string": string,
"bool": boolean,
"date": int64,
"timestamp_ms": uint32,
"uint32": uint32,
"decimal_9": decimal,
"decimal_18": decimal,
Expand Down
134 changes: 22 additions & 112 deletions src/io/parquet/read/statistics/binary.rs
Original file line number Diff line number Diff line change
@@ -1,113 +1,23 @@
use std::any::Any;
use std::convert::TryFrom;

use crate::datatypes::DataType;
use parquet2::statistics::BinaryStatistics as ParquetByteArrayStatistics;

use super::Statistics;
use crate::error::{ArrowError, Result};

/// Represents a `Binary` or `LargeBinary`
#[derive(Debug, Clone, PartialEq)]
pub struct BinaryStatistics {
/// number of nulls
pub null_count: Option<i64>,
/// number of dictinct values
pub distinct_count: Option<i64>,
/// Minimum
pub min_value: Option<Vec<u8>>,
/// Maximum
pub max_value: Option<Vec<u8>>,
}

impl Statistics for BinaryStatistics {
fn data_type(&self) -> &DataType {
&DataType::Binary
}

fn as_any(&self) -> &dyn Any {
self
}

fn null_count(&self) -> Option<i64> {
self.null_count
}
}

impl From<&ParquetByteArrayStatistics> for BinaryStatistics {
fn from(stats: &ParquetByteArrayStatistics) -> Self {
Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats.min_value.clone(),
max_value: stats.max_value.clone(),
}
}
}

/// Statistics of a string parquet column
#[derive(Debug, Clone, PartialEq)]
pub struct Utf8Statistics {
/// number of nulls
pub null_count: Option<i64>,
/// number of dictinct values
pub distinct_count: Option<i64>,
/// Minimum
pub min_value: Option<String>,
/// Maximum
pub max_value: Option<String>,
}

impl Statistics for Utf8Statistics {
fn data_type(&self) -> &DataType {
&DataType::Utf8
}

fn as_any(&self) -> &dyn Any {
self
}

fn null_count(&self) -> Option<i64> {
self.null_count
}
}

impl TryFrom<&ParquetByteArrayStatistics> for Utf8Statistics {
type Error = ArrowError;

fn try_from(stats: &ParquetByteArrayStatistics) -> Result<Self> {
Ok(Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats
.min_value
.as_ref()
.map(|x| simdutf8::basic::from_utf8(x).map(|x| x.to_string()))
.transpose()?,
max_value: stats
.max_value
.as_ref()
.map(|x| simdutf8::basic::from_utf8(x).map(|x| x.to_string()))
.transpose()?,
})
}
}

pub(super) fn statistics_from_byte_array(
stats: &ParquetByteArrayStatistics,
data_type: DataType,
) -> Result<Box<dyn Statistics>> {
use DataType::*;
Ok(match data_type {
Utf8 => Box::new(Utf8Statistics::try_from(stats)?),
LargeUtf8 => Box::new(Utf8Statistics::try_from(stats)?),
Binary => Box::new(BinaryStatistics::from(stats)),
LargeBinary => Box::new(BinaryStatistics::from(stats)),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read {:?} from parquet",
other
)))
}
})
use crate::array::{MutableArray, MutableBinaryArray, Offset};
use parquet2::statistics::{BinaryStatistics, Statistics as ParquetStatistics};

use crate::error::Result;

pub(super) fn push<O: Offset>(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutableBinaryArray<O>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutableBinaryArray<O>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<BinaryStatistics>().unwrap());
min.push(from.and_then(|s| s.min_value.as_ref()));
max.push(from.and_then(|s| s.max_value.as_ref()));
Ok(())
}
60 changes: 20 additions & 40 deletions src/io/parquet/read/statistics/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,23 @@
use crate::datatypes::DataType;
use parquet2::statistics::BooleanStatistics as ParquetBooleanStatistics;
use std::any::Any;
use crate::array::{MutableArray, MutableBooleanArray};
use parquet2::statistics::{BooleanStatistics, Statistics as ParquetStatistics};

use super::Statistics;
use crate::error::Result;

/// Statistics of a boolean parquet column
#[derive(Debug, Clone, PartialEq)]
pub struct BooleanStatistics {
/// number of nulls
pub null_count: Option<i64>,
/// number of dictinct values
pub distinct_count: Option<i64>,
/// Minimum
pub min_value: Option<bool>,
/// Maximum
pub max_value: Option<bool>,
}

impl Statistics for BooleanStatistics {
fn data_type(&self) -> &DataType {
&DataType::Boolean
}

fn as_any(&self) -> &dyn Any {
self
}

fn null_count(&self) -> Option<i64> {
self.null_count
}
}

impl From<&ParquetBooleanStatistics> for BooleanStatistics {
fn from(stats: &ParquetBooleanStatistics) -> Self {
Self {
null_count: stats.null_count,
distinct_count: stats.distinct_count,
min_value: stats.min_value,
max_value: stats.max_value,
}
}
pub(super) fn push(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutableBooleanArray>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<BooleanStatistics>().unwrap());
min.push(from.and_then(|s| s.min_value));
max.push(from.and_then(|s| s.max_value));
Ok(())
}
65 changes: 65 additions & 0 deletions src/io/parquet/read/statistics/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::array::*;
use crate::datatypes::{DataType, PhysicalType};
use crate::error::Result;

use super::make_mutable;

#[derive(Debug)]
pub struct DynMutableDictionary {
data_type: DataType,
pub inner: Box<dyn MutableArray>,
}

impl DynMutableDictionary {
pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result<Self> {
let inner = if let DataType::Dictionary(_, inner, _) = &data_type {
inner.as_ref()
} else {
unreachable!()
};
let inner = make_mutable(inner, capacity)?;

Ok(Self { data_type, inner })
}
}

impl MutableArray for DynMutableDictionary {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn len(&self) -> usize {
self.inner.len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
self.inner.validity()
}

fn as_box(&mut self) -> Box<dyn Array> {
let inner = self.inner.as_arc();
match self.data_type.to_physical_type() {
PhysicalType::Dictionary(key) => match_integer_type!(key, |$T| {
let keys = PrimitiveArray::<$T>::from_iter((0..inner.len() as $T).map(Some));
Box::new(DictionaryArray::<$T>::from_data(keys, inner))
}),
_ => todo!(),
}
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn push_null(&mut self) {
todo!()
}

fn shrink_to_fit(&mut self) {
todo!()
}
}
Loading