Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
fix: fix nested decimal read and write (#1573)
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil authored Oct 7, 2023
1 parent 420936e commit 710d6b3
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 29 deletions.
20 changes: 16 additions & 4 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,14 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
pa.field("list_bool", pa.list_(pa.bool_())),
pa.field("list_utf8", pa.list_(pa.utf8())),
pa.field("list_large_binary", pa.list_(pa.large_binary())),
pa.field("list_decimal", pa.list_(pa.decimal128(9, 0))),
pa.field("list_decimal256", pa.list_(pa.decimal256(9, 0))),
pa.field("list_decimal_9", pa.list_(pa.decimal128(9, 0))),
pa.field("list_decimal_18", pa.list_(pa.decimal128(18, 0))),
pa.field("list_decimal_26", pa.list_(pa.decimal128(26, 0))),
pa.field("list_decimal256_9", pa.list_(pa.decimal256(9, 0))),
pa.field("list_decimal256_18", pa.list_(pa.decimal256(18, 0))),
pa.field("list_decimal256_26", pa.list_(pa.decimal256(26, 0))),
pa.field("list_decimal256_39", pa.list_(pa.decimal256(39, 0))),
pa.field("list_decimal256_76", pa.list_(pa.decimal256(76, 0))),
pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))),
pa.field("list_nested_decimal", pa.list_(pa.list_(pa.decimal128(9, 0)))),
pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))),
Expand Down Expand Up @@ -266,8 +272,14 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
"list_bool": boolean,
"list_utf8": string,
"list_large_binary": string,
"list_decimal": decimal_nullable,
"list_decimal256": decimal_nullable,
"list_decimal_9": decimal_nullable,
"list_decimal_18": decimal_nullable,
"list_decimal_26": decimal_nullable,
"list_decimal256_9": decimal_nullable,
"list_decimal256_18": decimal_nullable,
"list_decimal256_26": decimal_nullable,
"list_decimal256_39": decimal_nullable,
"list_decimal256_76": decimal_nullable,
"list_nested_i64": items_nested,
"list_nested_decimal": decimal_nested,
"list_nested_inner_required_i64": items_required_nested,
Expand Down
8 changes: 4 additions & 4 deletions src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ pub(super) struct Required<'a> {
}

impl<'a> Required<'a> {
pub(super) fn new(page: &'a DataPage, size: usize) -> Self {
let values = page.buffer();
pub(super) fn try_new(page: &'a DataPage, size: usize) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;
assert_eq!(values.len() % size, 0);
let values = values.chunks_exact(size);
Self { values }
Ok(Self { values })
}

#[inline]
Expand Down Expand Up @@ -171,7 +171,7 @@ impl<'a> Decoder<'a> for BinaryDecoder {
Ok(State::Optional(Optional::try_new(page, self.size)?))
}
(Encoding::Plain, _, false, false) => {
Ok(State::Required(Required::new(page, self.size)))
Ok(State::Required(Required::try_new(page, self.size)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
RequiredDictionary::try_new(page, dict).map(State::RequiredDictionary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<'a> NestedDecoder<'a> for BinaryDecoder {
Ok(State::Optional(Optional::try_new(page, self.size)?))
}
(Encoding::Plain, _, false, false) => {
Ok(State::Required(Required::new(page, self.size)))
Ok(State::Required(Required::try_new(page, self.size)?))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
RequiredDictionary::try_new(page, dict).map(State::RequiredDictionary)
Expand Down
32 changes: 31 additions & 1 deletion src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use parquet2::{
};

use super::{binary::ord_binary, utils, WriteOptions};
use crate::io::parquet::write::{nested, Nested};
use crate::types::i256;
use crate::{
array::{Array, FixedSizeBinaryArray, PrimitiveArray},
Expand Down Expand Up @@ -62,7 +63,36 @@ pub fn array_to_page(
)
}

pub(super) fn build_statistics(
pub fn nested_array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
type_: PrimitiveType,
statistics: Option<FixedLenStatistics>,
nested: &[Nested],
) -> Result<DataPage> {
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer);

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
statistics.map(|x| serialize_statistics(&x)),
type_,
options,
Encoding::Plain,
)
}

pub fn build_statistics(
array: &FixedSizeBinaryArray,
primitive_type: PrimitiveType,
) -> FixedLenStatistics {
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ fn array_to_page_nested(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_len_bytes::nested_array_to_page(&array, options, type_, statistics, nested)
}
}
Decimal256(precision, _) => {
Expand Down Expand Up @@ -782,7 +782,7 @@ fn array_to_page_nested(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_len_bytes::nested_array_to_page(&array, options, type_, statistics, nested)
} else {
let size = 32;
let array = array
Expand All @@ -807,7 +807,7 @@ fn array_to_page_nested(
array.validity().cloned(),
);

fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_len_bytes::nested_array_to_page(&array, options, type_, statistics, nested)
}
}
other => Err(Error::NotYetImplemented(format!(
Expand Down
164 changes: 158 additions & 6 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,67 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box<dyn Array> {
Some(b"bbb".to_vec()),
Some(b"".to_vec()),
])),
"list_decimal" => {
"list_decimal_9" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| x as i128))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(values).to(DataType::Decimal(9, 0)))
}
"list_decimal256" => {
"list_decimal_18" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| x as i128))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(values).to(DataType::Decimal(18, 0)))
}
"list_decimal_26" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| x as i128))
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(values).to(DataType::Decimal(26, 0)))
}
"list_decimal256_9" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
let array = PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(9, 0));
Box::new(array)
}
"list_decimal256_18" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
let array = PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(18, 0));
Box::new(array)
}
"list_decimal256_26" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
let array = PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(26, 0));
Box::new(array)
}
"list_decimal256_39" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
let array = PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(39, 0));
Box::new(array)
}
"list_decimal256_76" => {
let values = i64_values
.iter()
.map(|x| x.map(|x| i256(x.as_i256())))
.collect::<Vec<_>>();
let array = PrimitiveArray::<i256>::from(values).to(DataType::Decimal256(76, 0));
Box::new(array)
}
"list_nested_i64"
| "list_nested_decimal"
| "list_nested_inner_required_i64"
Expand Down Expand Up @@ -479,8 +525,14 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box<dyn Array> {
"list_bool" => Field::new("item", DataType::Boolean, true),
"list_utf8" => Field::new("item", DataType::Utf8, true),
"list_large_binary" => Field::new("item", DataType::LargeBinary, true),
"list_decimal" => Field::new("item", DataType::Decimal(9, 0), true),
"list_decimal256" => Field::new("item", DataType::Decimal256(9, 0), true),
"list_decimal_9" => Field::new("item", DataType::Decimal(9, 0), true),
"list_decimal_18" => Field::new("item", DataType::Decimal(18, 0), true),
"list_decimal_26" => Field::new("item", DataType::Decimal(26, 0), true),
"list_decimal256_9" => Field::new("item", DataType::Decimal256(9, 0), true),
"list_decimal256_18" => Field::new("item", DataType::Decimal256(18, 0), true),
"list_decimal256_26" => Field::new("item", DataType::Decimal256(26, 0), true),
"list_decimal256_39" => Field::new("item", DataType::Decimal256(39, 0), true),
"list_decimal256_76" => Field::new("item", DataType::Decimal256(76, 0), true),
"list_struct_nullable" => Field::new("item", values.data_type().clone(), true),
"list_struct_list_nullable" => Field::new("item", values.data_type().clone(), true),
other => unreachable!("{}", other),
Expand Down Expand Up @@ -927,7 +979,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics {
min_value: new_list(Box::new(BinaryArray::<i64>::from_slice([b""])), true).boxed(),
max_value: new_list(Box::new(BinaryArray::<i64>::from_slice([b"ccc"])), true).boxed(),
},
"list_decimal" => Statistics {
"list_decimal_9" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Expand All @@ -941,7 +993,35 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics {
)
.boxed(),
},
"list_decimal256" => Statistics {
"list_decimal_18" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Box::new(Int128Array::from_slice([0]).to(DataType::Decimal(18, 0))),
true,
)
.boxed(),
max_value: new_list(
Box::new(Int128Array::from_slice([10]).to(DataType::Decimal(18, 0))),
true,
)
.boxed(),
},
"list_decimal_26" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Box::new(Int128Array::from_slice([0]).to(DataType::Decimal(26, 0))),
true,
)
.boxed(),
max_value: new_list(
Box::new(Int128Array::from_slice([10]).to(DataType::Decimal(26, 0))),
true,
)
.boxed(),
},
"list_decimal256_9" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Expand All @@ -959,6 +1039,78 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics {
)
.boxed(),
},
"list_decimal256_18" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Box::new(
Int256Array::from_slice([i256(0.as_i256())]).to(DataType::Decimal256(18, 0)),
),
true,
)
.boxed(),
max_value: new_list(
Box::new(
Int256Array::from_slice([i256(10.as_i256())]).to(DataType::Decimal256(18, 0)),
),
true,
)
.boxed(),
},
"list_decimal256_26" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Box::new(
Int256Array::from_slice([i256(0.as_i256())]).to(DataType::Decimal256(26, 0)),
),
true,
)
.boxed(),
max_value: new_list(
Box::new(
Int256Array::from_slice([i256(10.as_i256())]).to(DataType::Decimal256(26, 0)),
),
true,
)
.boxed(),
},
"list_decimal256_39" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Box::new(
Int256Array::from_slice([i256(0.as_i256())]).to(DataType::Decimal256(39, 0)),
),
true,
)
.boxed(),
max_value: new_list(
Box::new(
Int256Array::from_slice([i256(10.as_i256())]).to(DataType::Decimal256(39, 0)),
),
true,
)
.boxed(),
},
"list_decimal256_76" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
min_value: new_list(
Box::new(
Int256Array::from_slice([i256(0.as_i256())]).to(DataType::Decimal256(76, 0)),
),
true,
)
.boxed(),
max_value: new_list(
Box::new(
Int256Array::from_slice([i256(10.as_i256())]).to(DataType::Decimal256(76, 0)),
),
true,
)
.boxed(),
},
"list_int64" => Statistics {
distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(),
null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(),
Expand Down
Loading

0 comments on commit 710d6b3

Please sign in to comment.