Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed struct stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 25, 2022
1 parent 218a031 commit c9fe387
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 88 deletions.
10 changes: 9 additions & 1 deletion examples/parquet_read.rs
Expand Up @@ -13,7 +13,15 @@ fn main() -> Result<()> {
let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, Some(&[8]), None, None, None)?;

println!("{:#?}", reader.metadata());
println!("{:#?}", reader.schema());

// say we want to evaluate if the we can skip some row groups based on a field's value
let field = &reader.schema().fields[0];

// we can deserialize the parquet statistics from this field
let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?;

println!("{:#?}", statistics);

let start = SystemTime::now();
for maybe_chunk in reader {
Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/read/statistics/dictionary.rs
@@ -1,5 +1,5 @@
use crate::array::*;
use crate::datatypes::DataType;
use crate::datatypes::{DataType, PhysicalType};
use crate::error::Result;

use super::make_mutable;
Expand Down Expand Up @@ -38,8 +38,13 @@ impl MutableArray for DynMutableDictionary {

fn as_box(&mut self) -> Box<dyn Array> {
let inner = self.inner.as_arc();
let keys = PrimitiveArray::<i32>::from_iter((0..inner.len() as i32).map(Some));
Box::new(DictionaryArray::<i32>::from_data(keys, inner))
match self.data_type.to_physical_type() {
PhysicalType::Dictionary(key) => match_integer_type!(key, |$T| {
let keys = PrimitiveArray::<$T>::from_iter((0..inner.len() as $T).map(Some));
Box::new(DictionaryArray::<$T>::from_data(keys, inner))
}),
_ => todo!(),
}
}

fn as_any(&self) -> &dyn std::any::Any {
Expand Down
153 changes: 128 additions & 25 deletions src/io/parquet/read/statistics/mod.rs
Expand Up @@ -23,19 +23,29 @@ mod dictionary;
mod fixlen;
mod list;
mod primitive;
mod struct_;
mod utf8;

use self::list::DynMutableListArray;

use super::get_field_columns;

/// Enum of a count statistics
#[derive(Debug, PartialEq)]
pub enum Count {
/// simple arrays (every type not a Struct) have a count of UInt64
Single(UInt64Array),
/// struct arrays have a count as a struct of UInt64
Struct(StructArray),
}

/// Arrow-deserialized parquet Statistics of a file
#[derive(Debug, PartialEq)]
pub struct Statistics {
/// number of nulls
pub null_count: UInt64Array,
/// number of nulls.
pub null_count: Count,
/// number of dictinct values
pub distinct_count: UInt64Array,
pub distinct_count: Count,
/// Minimum
pub min_value: Box<dyn Array>,
/// Maximum
Expand All @@ -46,9 +56,9 @@ pub struct Statistics {
#[derive(Debug)]
struct MutableStatistics {
/// number of nulls
pub null_count: UInt64Vec,
pub null_count: Box<dyn MutableArray>,
/// number of dictinct values
pub distinct_count: UInt64Vec,
pub distinct_count: Box<dyn MutableArray>,
/// Minimum
pub min_value: Box<dyn MutableArray>,
/// Maximum
Expand All @@ -57,9 +67,48 @@ struct MutableStatistics {

impl From<MutableStatistics> for Statistics {
fn from(mut s: MutableStatistics) -> Self {
let null_count = if let PhysicalType::Struct = s.null_count.data_type().to_physical_type() {
let a = s
.null_count
.as_box()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone();
Count::Struct(a)
} else {
let a = s
.null_count
.as_box()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.clone();
Count::Single(a)
};
let distinct_count =
if let PhysicalType::Struct = s.distinct_count.data_type().to_physical_type() {
let a = s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone();
Count::Struct(a)
} else {
let a = s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.clone();
Count::Single(a)
};
Self {
null_count: s.null_count.into(),
distinct_count: s.distinct_count.into(),
null_count,
distinct_count,
min_value: s.min_value.as_box(),
max_value: s.max_value.as_box(),
}
Expand Down Expand Up @@ -98,6 +147,10 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result<Box<dyn Mutable
PhysicalType::Dictionary(_) => Box::new(
dictionary::DynMutableDictionary::try_with_capacity(data_type.clone(), capacity)?,
),
PhysicalType::Struct => Box::new(struct_::DynMutableStructArray::try_with_capacity(
data_type.clone(),
capacity,
)?),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing parquet stats from {:?} is still not implemented",
Expand All @@ -107,14 +160,28 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result<Box<dyn Mutable
})
}

fn create_dt(data_type: &DataType) -> DataType {
if let DataType::Struct(fields) = data_type.to_logical_type() {
DataType::Struct(
fields
.iter()
.map(|f| Field::new(&f.name, create_dt(&f.data_type), f.is_nullable))
.collect(),
)
} else {
DataType::UInt64
}
}

impl MutableStatistics {
fn try_new(field: &Field) -> Result<Self> {
let min_value = make_mutable(&field.data_type, 0)?;
let max_value = make_mutable(&field.data_type, 0)?;

let dt = create_dt(&field.data_type);
Ok(Self {
null_count: UInt64Vec::new(),
distinct_count: UInt64Vec::new(),
null_count: make_mutable(&dt, 0)?,
distinct_count: make_mutable(&dt, 0)?,
min_value,
max_value,
})
Expand Down Expand Up @@ -188,11 +255,11 @@ fn push_others(
}

fn push(
mut stats: VecDeque<(Option<Arc<dyn ParquetStatistics>>, ParquetPrimitiveType)>,
stats: &mut VecDeque<(Option<Arc<dyn ParquetStatistics>>, ParquetPrimitiveType)>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
distinct_count: &mut UInt64Vec,
null_count: &mut UInt64Vec,
distinct_count: &mut dyn MutableArray,
null_count: &mut dyn MutableArray,
) -> Result<()> {
match min.data_type().to_logical_type() {
List(_) | LargeList(_) => {
Expand Down Expand Up @@ -229,12 +296,51 @@ fn push(
null_count,
);
}
Struct(_) => {
let min = min
.as_mut_any()
.downcast_mut::<struct_::DynMutableStructArray>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<struct_::DynMutableStructArray>()
.unwrap();
let distinct_count = distinct_count
.as_mut_any()
.downcast_mut::<struct_::DynMutableStructArray>()
.unwrap();
let null_count = null_count
.as_mut_any()
.downcast_mut::<struct_::DynMutableStructArray>()
.unwrap();
return min
.inner
.iter_mut()
.zip(max.inner.iter_mut())
.zip(distinct_count.inner.iter_mut())
.zip(null_count.inner.iter_mut())
.try_for_each(|(((min, max), distinct_count), null_count)| {
push(
stats,
min.as_mut(),
max.as_mut(),
distinct_count.as_mut(),
null_count.as_mut(),
)
});
}
_ => {}
}

let (from, type_) = stats.pop_front().unwrap();
let from = from.as_deref();

let distinct_count = distinct_count
.as_mut_any()
.downcast_mut::<UInt64Vec>()
.unwrap();
let null_count = null_count.as_mut_any().downcast_mut::<UInt64Vec>().unwrap();

push_others(from, distinct_count, null_count);

let physical_type = &type_.physical_type;
Expand Down Expand Up @@ -288,21 +394,18 @@ fn push(
}
}

/// Deserializes [`ParquetStatistics`] into [`Statistics`] associated to `field`
/// Deserializes the statistics in the column chunks from all `row_groups`
/// into [`Statistics`] associated from `field`'s name.
///
/// For non-nested types, it returns a single column.
/// For nested types, it returns one column per parquet primitive column.
pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Result<Statistics> {
if groups.is_empty() {
todo!("Return an empty statistics")
}

/// # Errors
/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)
pub fn deserialize(field: &Field, row_groups: &[RowGroupMetaData]) -> Result<Statistics> {
let mut statistics = MutableStatistics::try_new(field)?;

// transpose
groups.iter().try_for_each(|group| {
row_groups.iter().try_for_each(|group| {
let columns = get_field_columns(group.columns(), field.name.as_ref());
let stats = columns
let mut stats = columns
.into_iter()
.map(|column| {
Ok((
Expand All @@ -312,11 +415,11 @@ pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Res
})
.collect::<Result<VecDeque<(Option<_>, ParquetPrimitiveType)>>>()?;
push(
stats,
&mut stats,
statistics.min_value.as_mut(),
statistics.max_value.as_mut(),
&mut statistics.distinct_count,
&mut statistics.null_count,
statistics.distinct_count.as_mut(),
statistics.null_count.as_mut(),
)
})?;

Expand Down
61 changes: 61 additions & 0 deletions src/io/parquet/read/statistics/struct_.rs
@@ -0,0 +1,61 @@
use crate::array::{Array, StructArray};
use crate::error::Result;
use crate::{array::MutableArray, datatypes::DataType};

use super::make_mutable;

#[derive(Debug)]
pub struct DynMutableStructArray {
data_type: DataType,
pub inner: Vec<Box<dyn MutableArray>>,
}

impl DynMutableStructArray {
pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result<Self> {
let inners = match data_type.to_logical_type() {
DataType::Struct(inner) => inner,
_ => unreachable!(),
};
let inner = inners
.iter()
.map(|f| make_mutable(f.data_type(), capacity))
.collect::<Result<Vec<_>>>()?;

Ok(Self { data_type, inner })
}
}
impl MutableArray for DynMutableStructArray {
fn data_type(&self) -> &DataType {
&self.data_type
}

fn len(&self) -> usize {
self.inner.len()
}

fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> {
None
}

fn as_box(&mut self) -> Box<dyn Array> {
let inner = self.inner.iter_mut().map(|x| x.as_arc()).collect();

Box::new(StructArray::new(self.data_type.clone(), inner, None))
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn push_null(&mut self) {
todo!()
}

fn shrink_to_fit(&mut self) {
todo!()
}
}

0 comments on commit c9fe387

Please sign in to comment.