Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in writing ListArray to parquet (#984)
Browse files Browse the repository at this point in the history
* Internal refactor to fix bug

* Fixed error in writing lists
  • Loading branch information
jorgecarleitao committed May 7, 2022
1 parent bfbc071 commit fc91dd8
Show file tree
Hide file tree
Showing 19 changed files with 413 additions and 273 deletions.
6 changes: 6 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
pa.list_(pa.field("item", pa.int64(), False)),
False,
),
pa.field(
"list_int64_optional_required",
pa.list_(pa.field("item", pa.int64(), True)),
False,
),
pa.field("list_int16", pa.list_(pa.int16())),
pa.field("list_bool", pa.list_(pa.bool_())),
pa.field("list_utf8", pa.list_(pa.utf8())),
Expand All @@ -182,6 +187,7 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
"list_int64": items_nullable,
"list_int64_required": items_required,
"list_int64_required_required": all_required,
"list_int64_optional_required": all_required,
"list_int16": i16,
"list_bool": boolean,
"list_utf8": string,
Expand Down
9 changes: 4 additions & 5 deletions src/io/parquet/write/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use parquet2::{
encoding::{delta_bitpacked, Encoding},
metadata::Descriptor,
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics},
Expand Down Expand Up @@ -43,11 +42,11 @@ pub(crate) fn encode_plain<O: Offset>(
pub fn array_to_page<O: Offset>(
array: &BinaryArray<O>,
options: WriteOptions,
descriptor: Descriptor,
type_: PrimitiveType,
encoding: Encoding,
) -> Result<DataPage> {
let validity = array.validity();
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];
utils::write_def_levels(
Expand Down Expand Up @@ -79,7 +78,7 @@ pub fn array_to_page<O: Offset>(
}

let statistics = if options.write_statistics {
Some(build_statistics(array, descriptor.primitive_type.clone()))
Some(build_statistics(array, type_.clone()))
} else {
None
};
Expand All @@ -92,7 +91,7 @@ pub fn array_to_page<O: Offset>(
0,
definition_levels_byte_length,
statistics,
descriptor,
type_,
options,
encoding,
)
Expand Down
12 changes: 6 additions & 6 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet2::metadata::Descriptor;
use parquet2::schema::types::PrimitiveType;
use parquet2::{encoding::Encoding, page::DataPage};

use super::super::{levels, utils, WriteOptions};
Expand All @@ -12,28 +12,28 @@ use crate::{
pub fn array_to_page<O, OO>(
array: &BinaryArray<O>,
options: WriteOptions,
descriptor: Descriptor,
type_: PrimitiveType,
nested: levels::NestedInfo<OO>,
) -> Result<DataPage>
where
OO: Offset,
O: Offset,
{
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
let is_optional = is_nullable(&type_.field_info);

let validity = array.validity();

let mut buffer = vec![];
levels::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

levels::write_def_levels(&mut buffer, &nested, validity, options.version)?;
levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

encode_plain(array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, descriptor.primitive_type.clone()))
Some(build_statistics(array, type_.clone()))
} else {
None
};
Expand All @@ -46,7 +46,7 @@ where
repetition_levels_byte_length,
definition_levels_byte_length,
statistics,
descriptor,
type_,
options,
Encoding::Plain,
)
Expand Down
8 changes: 4 additions & 4 deletions src/io/parquet/write/boolean/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use parquet2::{
encoding::{hybrid_rle::bitpacked_encode, Encoding},
metadata::Descriptor,
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics},
};

Expand Down Expand Up @@ -41,9 +41,9 @@ pub(super) fn encode_plain(
pub fn array_to_page(
array: &BooleanArray,
options: WriteOptions,
descriptor: Descriptor,
type_: PrimitiveType,
) -> Result<DataPage> {
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
let is_optional = is_nullable(&type_.field_info);

let validity = array.validity();

Expand Down Expand Up @@ -74,7 +74,7 @@ pub fn array_to_page(
0,
definition_levels_byte_length,
statistics,
descriptor,
type_,
options,
Encoding::Plain,
)
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage};
use parquet2::schema::types::PrimitiveType;
use parquet2::{encoding::Encoding, page::DataPage};

use super::super::{levels, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
Expand All @@ -11,21 +12,21 @@ use crate::{
pub fn array_to_page<O>(
array: &BooleanArray,
options: WriteOptions,
descriptor: Descriptor,
type_: PrimitiveType,
nested: levels::NestedInfo<O>,
) -> Result<DataPage>
where
O: Offset,
{
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
let is_optional = is_nullable(&type_.field_info);

let validity = array.validity();

let mut buffer = vec![];
levels::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

levels::write_def_levels(&mut buffer, &nested, validity, options.version)?;
levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

encode_plain(array, is_optional, &mut buffer)?;
Expand All @@ -44,7 +45,7 @@ where
repetition_levels_byte_length,
definition_levels_byte_length,
statistics,
descriptor,
type_,
options,
Encoding::Plain,
)
Expand Down
48 changes: 23 additions & 25 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
metadata::Descriptor,
page::{EncodedDictPage, EncodedPage},
schema::types::PrimitiveType,
statistics::{serialize_statistics, ParquetStatistics},
write::DynIter,
};
Expand All @@ -27,11 +27,11 @@ use crate::{
fn encode_keys<K: DictionaryKey>(
array: &PrimitiveArray<K>,
validity: Option<&Bitmap>,
descriptor: Descriptor,
type_: PrimitiveType,
statistics: ParquetStatistics,
options: WriteOptions,
) -> Result<EncodedPage> {
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
let is_optional = is_nullable(&type_.field_info);

let mut buffer = vec![];

Expand Down Expand Up @@ -107,90 +107,88 @@ fn encode_keys<K: DictionaryKey>(
0,
definition_levels_byte_length,
Some(statistics),
descriptor,
type_,
options,
Encoding::RleDictionary,
)
.map(EncodedPage::Data)
}

macro_rules! dyn_prim {
($from:ty, $to:ty, $array:expr, $options:expr, $descriptor:expr) => {{
($from:ty, $to:ty, $array:expr, $options:expr, $type_:expr) => {{
let values = $array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
primitive_encode_plain::<$from, $to>(values, false, &mut buffer);
let stats =
primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone());
let stats = primitive_build_statistics::<$from, $to>(values, $type_.clone());
let stats = serialize_statistics(&stats);
(EncodedDictPage::new(buffer, values.len()), stats)
}};
}

pub fn array_to_pages<K: DictionaryKey>(
array: &DictionaryArray<K>,
descriptor: Descriptor,
type_: PrimitiveType,
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
let (dict_page, statistics) = match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array, options, descriptor),
DataType::Int16 => dyn_prim!(i16, i32, array, options, descriptor),
DataType::Int8 => dyn_prim!(i8, i32, array, options, type_),
DataType::Int16 => dyn_prim!(i16, i32, array, options, type_),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
dyn_prim!(i32, i32, array, options, descriptor)
dyn_prim!(i32, i32, array, options, type_)
}
DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => dyn_prim!(i64, i64, array, options, descriptor),
DataType::UInt8 => dyn_prim!(u8, i32, array, options, descriptor),
DataType::UInt16 => dyn_prim!(u16, i32, array, options, descriptor),
DataType::UInt32 => dyn_prim!(u32, i32, array, options, descriptor),
DataType::UInt64 => dyn_prim!(i64, i64, array, options, descriptor),
DataType::Float32 => dyn_prim!(f32, f32, array, options, descriptor),
DataType::Float64 => dyn_prim!(f64, f64, array, options, descriptor),
| DataType::Duration(_) => dyn_prim!(i64, i64, array, options, type_),
DataType::UInt8 => dyn_prim!(u8, i32, array, options, type_),
DataType::UInt16 => dyn_prim!(u16, i32, array, options, type_),
DataType::UInt32 => dyn_prim!(u32, i32, array, options, type_),
DataType::UInt64 => dyn_prim!(i64, i64, array, options, type_),
DataType::Float32 => dyn_prim!(f32, f32, array, options, type_),
DataType::Float64 => dyn_prim!(f64, f64, array, options, type_),
DataType::Utf8 => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i32>(array, false, &mut buffer);
let stats = utf8_build_statistics(array, descriptor.primitive_type.clone());
let stats = utf8_build_statistics(array, type_.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::LargeUtf8 => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i64>(array, false, &mut buffer);
let stats = utf8_build_statistics(array, descriptor.primitive_type.clone());
let stats = utf8_build_statistics(array, type_.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::Binary => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i32>(array, false, &mut buffer);
let stats = binary_build_statistics(array, descriptor.primitive_type.clone());
let stats = binary_build_statistics(array, type_.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::LargeBinary => {
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i64>(array, false, &mut buffer);
let stats = binary_build_statistics(array, descriptor.primitive_type.clone());
let stats = binary_build_statistics(array, type_.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::FixedSizeBinary(_) => {
let mut buffer = vec![];
let array = array.values().as_any().downcast_ref().unwrap();
fixed_binary_encode_plain(array, false, &mut buffer);
let stats =
fixed_binary_build_statistics(array, descriptor.primitive_type.clone());
let stats = fixed_binary_build_statistics(array, type_.clone());
let stats = serialize_statistics(&stats);
(EncodedDictPage::new(buffer, array.len()), stats)
}
Expand All @@ -207,7 +205,7 @@ pub fn array_to_pages<K: DictionaryKey>(
let data_page = encode_keys(
array.keys(),
array.values().validity(),
descriptor,
type_,
statistics,
options,
)?;
Expand Down
7 changes: 3 additions & 4 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use parquet2::{
encoding::Encoding,
metadata::Descriptor,
page::DataPage,
schema::types::PrimitiveType,
statistics::{serialize_statistics, FixedLenStatistics},
Expand Down Expand Up @@ -29,10 +28,10 @@ pub(crate) fn encode_plain(array: &FixedSizeBinaryArray, is_optional: bool, buff
pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
descriptor: Descriptor,
type_: PrimitiveType,
statistics: Option<FixedLenStatistics>,
) -> Result<DataPage> {
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
let is_optional = is_nullable(&type_.field_info);
let validity = array.validity();

let mut buffer = vec![];
Expand All @@ -56,7 +55,7 @@ pub fn array_to_page(
0,
definition_levels_byte_length,
statistics.map(|x| serialize_statistics(&x)),
descriptor,
type_,
options,
Encoding::Plain,
)
Expand Down
Loading

0 comments on commit fc91dd8

Please sign in to comment.