Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in writing lists
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 7, 2022
1 parent d773af5 commit 5c3397b
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 97 deletions.
6 changes: 6 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
pa.list_(pa.field("item", pa.int64(), False)),
False,
),
pa.field(
"list_int64_optional_required",
pa.list_(pa.field("item", pa.int64(), True)),
False,
),
pa.field("list_int16", pa.list_(pa.int16())),
pa.field("list_bool", pa.list_(pa.bool_())),
pa.field("list_utf8", pa.list_(pa.utf8())),
Expand All @@ -182,6 +187,7 @@ def case_nested() -> Tuple[dict, pa.Schema, str]:
"list_int64": items_nullable,
"list_int64_required": items_required,
"list_int64_required_required": all_required,
"list_int64_optional_required": all_required,
"list_int16": i16,
"list_bool": boolean,
"list_utf8": string,
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ where
levels::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

levels::write_def_levels(&mut buffer, &nested, validity, options.version)?;
levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

encode_plain(array, is_optional, &mut buffer);
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where
levels::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

levels::write_def_levels(&mut buffer, &nested, validity, options.version)?;
levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

encode_plain(array, is_optional, &mut buffer)?;
Expand Down
217 changes: 130 additions & 87 deletions src/io/parquet/write/levels.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use parquet2::encoding::hybrid_rle::encode_u32;
use parquet2::write::Version;

use crate::{
array::Offset,
bitmap::{utils::BitmapIter, Bitmap},
error::Result,
};
use crate::{array::Offset, bitmap::Bitmap, error::Result};

pub fn num_values<O: Offset>(offsets: &[O]) -> usize {
offsets
Expand Down Expand Up @@ -72,87 +68,57 @@ impl<O: Offset> Iterator for RepLevelsIter<'_, O> {
}
}

enum OffsetsIter<'a, O> {
Optional(std::iter::Zip<std::slice::Windows<'a, O>, BitmapIter<'a>>),
Required(std::slice::Windows<'a, O>),
}

/// Iterator adapter of parquet / dremel definition levels
pub struct DefLevelsIter<'a, O: Offset> {
iter: OffsetsIter<'a, O>,
primitive_validity: Option<BitmapIter<'a>>,
pub struct DefLevelsIter<'a, O: Offset, II: Iterator<Item = u32>, I: Iterator<Item = u32>> {
iter: std::iter::Zip<std::slice::Windows<'a, O>, II>,
primitive_validity: I,
remaining: usize,
is_valid: bool,
length: usize,
is_valid: u32,
total_size: usize,
}

impl<'a, O: Offset> DefLevelsIter<'a, O> {
pub fn new(
offsets: &'a [O],
validity: Option<&'a Bitmap>,
primitive_validity: Option<&'a Bitmap>,
) -> Self {
impl<'a, O: Offset, II: Iterator<Item = u32>, I: Iterator<Item = u32>> DefLevelsIter<'a, O, II, I> {
pub fn new(offsets: &'a [O], validity: II, primitive_validity: I) -> Self {
let total_size = num_values(offsets);

let primitive_validity = primitive_validity.map(|x| x.iter());

let iter = validity
.map(|x| OffsetsIter::Optional(offsets.windows(2).zip(x.iter())))
.unwrap_or_else(|| OffsetsIter::Required(offsets.windows(2)));
let iter = offsets.windows(2).zip(validity);

Self {
iter,
primitive_validity,
remaining: 0,
length: 0,
is_valid: false,
is_valid: 0,
total_size,
}
}
}

impl<O: Offset> Iterator for DefLevelsIter<'_, O> {
impl<O: Offset, II: Iterator<Item = u32>, I: Iterator<Item = u32>> Iterator
for DefLevelsIter<'_, O, II, I>
{
type Item = u32;

fn next(&mut self) -> Option<Self::Item> {
if self.remaining == self.length {
match &mut self.iter {
OffsetsIter::Optional(iter) => {
let (w, is_valid) = iter.next()?;
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = is_valid;
if self.length == 0 {
self.total_size -= 1;
return Some(self.is_valid as u32);
}
}
OffsetsIter::Required(iter) => {
let w = iter.next()?;
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = true;
if self.length == 0 {
self.total_size -= 1;
return Some(0);
}
}
if self.remaining == 0 {
let (w, is_valid) = self.iter.next()?;
let start = w[0].to_usize();
let end = w[1].to_usize();
self.remaining = end - start;
self.is_valid = is_valid + 1;
if self.remaining == 0 {
self.total_size -= 1;
return Some(self.is_valid - 1);
}
}
self.remaining += 1;
self.remaining -= 1;
self.total_size -= 1;

let (base_def, p_is_valid) = self
.primitive_validity
.as_mut()
.map(|x| (1, x.next().unwrap() as u32))
.next()
.map(|x| (1, x))
.unwrap_or((0, 0));
let def_ = (base_def + 1) * self.is_valid as u32 + p_is_valid;
let def_ = self.is_valid + p_is_valid;
Some(def_)
}

Expand All @@ -163,15 +129,15 @@ impl<O: Offset> Iterator for DefLevelsIter<'_, O> {

#[derive(Debug)]
pub struct NestedInfo<'a, O: Offset> {
_is_optional: bool,
is_optional: bool,
offsets: &'a [O],
validity: Option<&'a Bitmap>,
}

impl<'a, O: Offset> NestedInfo<'a, O> {
pub fn new(offsets: &'a [O], validity: Option<&'a Bitmap>, is_optional: bool) -> Self {
Self {
_is_optional: is_optional,
is_optional,
offsets,
validity,
}
Expand All @@ -182,7 +148,7 @@ impl<'a, O: Offset> NestedInfo<'a, O> {
}
}

fn write_levels_v1<F: Fn(&mut Vec<u8>) -> Result<()>>(
fn write_levels_v1<F: FnOnce(&mut Vec<u8>) -> Result<()>>(
buffer: &mut Vec<u8>,
encode: F,
) -> Result<()> {
Expand Down Expand Up @@ -226,30 +192,107 @@ pub fn write_rep_levels<O: Offset>(
Ok(())
}

fn write_def_levels1<I: Iterator<Item = u32>>(
buffer: &mut Vec<u8>,
levels: I,
num_bits: u8,
version: Version,
) -> Result<()> {
match version {
Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec<u8>| {
encode_u32(buffer, levels, num_bits)?;
Ok(())
}),
Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?),
}
}

/// writes the rep levels to a `Vec<u8>`.
pub fn write_def_levels<O: Offset>(
buffer: &mut Vec<u8>,
nested: &NestedInfo<O>,
validity: Option<&Bitmap>,
primitive_is_optional: bool,
version: Version,
) -> Result<()> {
let num_bits = 1 + validity.is_some() as u8;

match version {
Version::V1 => {
write_levels_v1(buffer, |buffer: &mut Vec<u8>| {
let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity);
encode_u32(buffer, levels, num_bits)?;
Ok(())
})?;
let mut num_bits = 1 + nested.is_optional as u8 + primitive_is_optional as u8;
if num_bits == 3 {
// brute-force log2 - this needs to be generalized for e.g. list of list
num_bits = 2
};

// this match ensures that irrespectively of the arrays' validities, we write def levels
// that are consistent with the declared parquet schema.
// see comments on some of the variants
match (
nested.is_optional,
nested.validity.as_ref(),
primitive_is_optional,
validity.as_ref(),
) {
// if the validity is optional and there is no validity in the array, we
// need to write 1 to mark the fields as valid
(true, None, true, None) => {
let nested_validity = std::iter::repeat(1);
let validity = std::iter::repeat(1);
let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity);
write_def_levels1(buffer, levels, num_bits, version)
}
Version::V2 => {
let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity);
encode_u32(buffer, levels, num_bits)?;
(true, Some(nested_validity), true, None) => {
let levels = DefLevelsIter::new(
nested.offsets,
nested_validity.iter().map(|x| x as u32),
std::iter::repeat(1),
);
write_def_levels1(buffer, levels, num_bits, version)
}
(true, None, true, Some(validity)) => {
let levels = DefLevelsIter::new(
nested.offsets,
std::iter::repeat(1),
validity.iter().map(|x| x as u32),
);
write_def_levels1(buffer, levels, num_bits, version)
}
(true, Some(nested_validity), true, Some(validity)) => {
let levels = DefLevelsIter::new(
nested.offsets,
nested_validity.iter().map(|x| x as u32),
validity.iter().map(|x| x as u32),
);
write_def_levels1(buffer, levels, num_bits, version)
}
(true, None, false, _) => {
let nested_validity = std::iter::repeat(1);
let validity = std::iter::repeat(0);
let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity);
write_def_levels1(buffer, levels, num_bits, version)
}
(true, Some(nested_validity), false, _) => {
let nested_validity = nested_validity.iter().map(|x| x as u32);
let validity = std::iter::repeat(0);
let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity);
write_def_levels1(buffer, levels, num_bits, version)
}
(false, _, true, None) => {
let nested_validity = std::iter::repeat(0);
let validity = std::iter::repeat(1);
let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity);
write_def_levels1(buffer, levels, num_bits, version)
}
(false, _, true, Some(validity)) => {
let nested_validity = std::iter::repeat(0);
let validity = validity.iter().map(|x| x as u32);
let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity);
write_def_levels1(buffer, levels, num_bits, version)
}
(false, _, false, _) => {
let nested_validity = std::iter::repeat(0);
let validity = std::iter::repeat(0);
let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity);
write_def_levels1(buffer, levels, num_bits, version)
}
}

Ok(())
}

#[cfg(test)]
Expand All @@ -269,33 +312,33 @@ mod tests {
fn test_def_levels() {
// [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]]
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let validity = Some(Bitmap::from([
true, false, true, true, true, true, false, true,
]));
let primitive_validity = Some(Bitmap::from([
let validity = [true, false, true, true, true, true, false, true]
.into_iter()
.map(|x| x as u32);
let primitive_validity = [
true, true, //[0, 1]
true, false, true, //[2, None, 3]
true, true, true, //[4, 5, 6]
true, true, true, //[7, 8, 9]
true, //[10]
]));
]
.into_iter()
.map(|x| x as u32);
let expected = vec![3u32, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3];

let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref())
.collect::<Vec<_>>();
let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::<Vec<_>>();
assert_eq!(result, expected)
}

#[test]
fn test_def_levels1() {
// [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]]
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let validity = None;
let primitive_validity = None;
let validity = std::iter::repeat(0);
let primitive_validity = std::iter::repeat(0);
let expected = vec![1u32, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1];

let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref())
.collect::<Vec<_>>();
let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::<Vec<_>>();
assert_eq!(result, expected)
}
}
2 changes: 1 addition & 1 deletion src/io/parquet/write/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ where
levels::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

levels::write_def_levels(&mut buffer, &nested, validity, options.version)?;
levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

encode_plain(array, is_optional, &mut buffer);
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/utf8/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ where
levels::write_rep_levels(&mut buffer, &nested, options.version)?;
let repetition_levels_byte_length = buffer.len();

levels::write_def_levels(&mut buffer, &nested, validity, options.version)?;
levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?;
let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length;

encode_plain(array, is_optional, &mut buffer);
Expand Down
Loading

0 comments on commit 5c3397b

Please sign in to comment.