Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in decimal stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 25, 2022
1 parent 3df39d7 commit 218a031
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 44 deletions.
11 changes: 6 additions & 5 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
metadata::Descriptor,
page::{EncodedDictPage, EncodedPage},
statistics::ParquetStatistics,
statistics::{serialize_statistics, ParquetStatistics},
write::DynIter,
};

Expand Down Expand Up @@ -120,10 +120,10 @@ macro_rules! dyn_prim {

let mut buffer = vec![];
primitive_encode_plain::<$from, $to>(values, false, &mut buffer);
(
EncodedDictPage::new(buffer, values.len()),
primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()),
)
let stats =
primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone());
let stats = serialize_statistics(&stats);
(EncodedDictPage::new(buffer, values.len()), stats)
}};
}

Expand Down Expand Up @@ -191,6 +191,7 @@ pub fn array_to_pages<K: DictionaryKey>(
fixed_binary_encode_plain(array, false, &mut buffer);
let stats =
fixed_binary_build_statistics(array, descriptor.primitive_type.clone());
let stats = serialize_statistics(&stats);
(EncodedDictPage::new(buffer, array.len()), stats)
}
other => {
Expand Down
42 changes: 29 additions & 13 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use parquet2::{
metadata::Descriptor,
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, FixedLenStatistics, ParquetStatistics, Statistics},
statistics::{serialize_statistics, FixedLenStatistics},
};

use super::{binary::ord_binary, utils, WriteOptions};
use crate::{
array::{Array, FixedSizeBinaryArray},
array::{Array, FixedSizeBinaryArray, PrimitiveArray},
error::Result,
io::parquet::read::schema::is_nullable,
};
Expand All @@ -30,6 +30,7 @@ pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
descriptor: Descriptor,
statistics: Option<FixedLenStatistics>,
) -> Result<DataPage> {
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
let validity = array.validity();
Expand All @@ -47,20 +48,14 @@ pub fn array_to_page(

encode_plain(array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, descriptor.primitive_type.clone()))
} else {
None
};

utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
statistics,
statistics.map(|x| serialize_statistics(&x)),
descriptor,
options,
Encoding::Plain,
Expand All @@ -70,8 +65,8 @@ pub fn array_to_page(
pub(super) fn build_statistics(
array: &FixedSizeBinaryArray,
primitive_type: PrimitiveType,
) -> ParquetStatistics {
let statistics = &FixedLenStatistics {
) -> FixedLenStatistics {
FixedLenStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
distinct_count: None,
Expand All @@ -85,6 +80,27 @@ pub(super) fn build_statistics(
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
} as &dyn Statistics;
serialize_statistics(statistics)
}
}

pub(super) fn build_statistics_decimal(
array: &PrimitiveArray<i128>,
primitive_type: PrimitiveType,
size: usize,
) -> FixedLenStatistics {
FixedLenStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max()
.map(|x| x.to_be_bytes()[16 - size..].to_vec()),
min_value: array
.iter()
.flatten()
.min()
.map(|x| x.to_be_bytes()[16 - size..].to_vec()),
}
}
52 changes: 44 additions & 8 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,15 @@ pub fn array_to_page(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, descriptor)
let statistics = if options.write_statistics {
Some(fixed_len_bytes::build_statistics(
&array,
descriptor.primitive_type.clone(),
))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, descriptor, statistics)
}
DataType::Interval(IntervalUnit::DayTime) => {
let array = array
Expand All @@ -261,13 +269,29 @@ pub fn array_to_page(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, descriptor)
let statistics = if options.write_statistics {
Some(fixed_len_bytes::build_statistics(
&array,
descriptor.primitive_type.clone(),
))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, descriptor, statistics)
}
DataType::FixedSizeBinary(_) => {
let array = array.as_any().downcast_ref().unwrap();
let statistics = if options.write_statistics {
Some(fixed_len_bytes::build_statistics(
array,
descriptor.primitive_type.clone(),
))
} else {
None
};

fixed_len_bytes::array_to_page(array, options, descriptor, statistics)
}
DataType::FixedSizeBinary(_) => fixed_len_bytes::array_to_page(
array.as_any().downcast_ref().unwrap(),
options,
descriptor,
),
DataType::Decimal(precision, _) => {
let precision = *precision;
let array = array
Expand Down Expand Up @@ -298,6 +322,18 @@ pub fn array_to_page(
primitive::array_to_page::<i64, i64>(&array, options, descriptor)
} else {
let size = decimal_length_from_precision(precision);

let statistics = if options.write_statistics {
let stats = fixed_len_bytes::build_statistics_decimal(
array,
descriptor.primitive_type.clone(),
size,
);
Some(stats)
} else {
None
};

let mut values = Vec::<u8>::with_capacity(size * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_be_bytes()[16 - size..];
Expand All @@ -308,7 +344,7 @@ pub fn array_to_page(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, descriptor)
fixed_len_bytes::array_to_page(&array, options, descriptor, statistics)
}
}
DataType::FixedSizeList(_, _) | DataType::List(_) | DataType::LargeList(_) => {
Expand Down
14 changes: 8 additions & 6 deletions src/io/parquet/write/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use parquet2::{
metadata::Descriptor,
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics},
statistics::{serialize_statistics, PrimitiveStatistics},
types::NativeType,
};

Expand Down Expand Up @@ -67,7 +67,10 @@ where
encode_plain(array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, descriptor.primitive_type.clone()))
Some(serialize_statistics(&build_statistics(
array,
descriptor.primitive_type.clone(),
)))
} else {
None
};
Expand All @@ -89,13 +92,13 @@ where
pub fn build_statistics<T, R>(
array: &PrimitiveArray<T>,
primitive_type: PrimitiveType,
) -> ParquetStatistics
) -> PrimitiveStatistics<R>
where
T: ArrowNativeType,
R: NativeType,
T: num_traits::AsPrimitive<R>,
{
let statistics = &PrimitiveStatistics::<R> {
PrimitiveStatistics::<R> {
primitive_type,
null_count: Some(array.null_count() as i64),
distinct_count: None,
Expand All @@ -115,6 +118,5 @@ where
x
})
.min_by(|x, y| x.ord(y)),
} as &dyn Statistics;
serialize_statistics(statistics)
}
}
6 changes: 5 additions & 1 deletion src/io/parquet/write/primitive/nested.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use parquet2::statistics::serialize_statistics;
use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType};

use super::super::levels;
Expand Down Expand Up @@ -37,7 +38,10 @@ where
encode_plain(array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, descriptor.primitive_type.clone()))
Some(serialize_statistics(&build_statistics(
array,
descriptor.primitive_type.clone(),
)))
} else {
None
};
Expand Down
20 changes: 10 additions & 10 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,49 +560,49 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics {
match column {
"list_int16" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: UInt64Array::from([Some(4)]), // this should be 1, see ARROW-16299
null_count: UInt64Array::from([Some(1)]),
min_value: new_list(Arc::new(Int16Array::from_slice([0])), true),
max_value: new_list(Arc::new(Int16Array::from_slice([10])), true),
},
"list_bool" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: UInt64Array::from([Some(4)]),
null_count: UInt64Array::from([Some(1)]),
min_value: new_list(Arc::new(BooleanArray::from_slice([false])), true),
max_value: new_list(Arc::new(BooleanArray::from_slice([true])), true),
},
"list_utf8" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(4)].into(),
null_count: [Some(1)].into(),
min_value: new_list(Arc::new(Utf8Array::<i32>::from_slice([""])), true),
max_value: new_list(Arc::new(Utf8Array::<i32>::from_slice(["ccc"])), true),
},
"list_large_binary" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(4)].into(), // this should be 1, see ARROW-16299
null_count: [Some(1)].into(),
min_value: new_list(Arc::new(BinaryArray::<i64>::from_slice([b""])), true),
max_value: new_list(Arc::new(BinaryArray::<i64>::from_slice([b"ccc"])), true),
},
"list_int64" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(4)].into(), // this should be 1, see ARROW-16299
null_count: [Some(1)].into(),
min_value: new_list(Arc::new(Int64Array::from_slice([0])), true),
max_value: new_list(Arc::new(Int64Array::from_slice([10])), true),
},
"list_int64_required" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(3)].into(), // this should be 1, see ARROW-16299
null_count: [Some(1)].into(),
min_value: new_list(Arc::new(Int64Array::from_slice([0])), false),
max_value: new_list(Arc::new(Int64Array::from_slice([10])), false),
},
"list_int64_required_required" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(3)].into(), // this should be 0, see ARROW-16299
null_count: [Some(0)].into(),
min_value: new_list(Arc::new(Int64Array::from_slice([0])), false),
max_value: new_list(Arc::new(Int64Array::from_slice([10])), false),
},
"list_nested_i64" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(7)].into(), // this should be 2, see ARROW-16299
null_count: [Some(2)].into(),
min_value: new_list(
new_list(Arc::new(Int64Array::from_slice([0])), true).into(),
true,
Expand All @@ -614,7 +614,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics {
},
"list_nested_inner_required_required_i64" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(3)].into(), // this should be 0, see ARROW-16299
null_count: [Some(0)].into(),
min_value: new_list(
new_list(Arc::new(Int64Array::from_slice([0])), true).into(),
true,
Expand All @@ -626,7 +626,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics {
},
"list_nested_inner_required_i64" => Statistics {
distinct_count: UInt64Array::from([None]),
null_count: [Some(4)].into(), // this should be 0, see ARROW-16299
null_count: [Some(0)].into(),
min_value: new_list(
new_list(Arc::new(Int64Array::from_slice([0])), true).into(),
true,
Expand Down
18 changes: 17 additions & 1 deletion tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,23 @@ fn test_pyarrow_integration(
};

assert_eq!(expected.as_ref(), array.as_ref());
assert_eq!(expected_statistics, statistics);
if ![
"list_int16",
"list_large_binary",
"list_int64",
"list_int64_required",
"list_int64_required_required",
"list_nested_i64",
"list_utf8",
"list_bool",
"list_nested_inner_required_required_i64",
"list_nested_inner_required_i64",
]
.contains(&column)
{
// pyarrow outputs an incorrect number of null count for nested types - ARROW-16299
assert_eq!(expected_statistics, statistics);
}

Ok(())
}
Expand Down

0 comments on commit 218a031

Please sign in to comment.