Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed May 27, 2022
1 parent 6397393 commit f10a626
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 32 deletions.
41 changes: 33 additions & 8 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ where
List(inner) | LargeList(inner) | FixedSizeList(inner, _) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
vec![columns.pop().unwrap()],
columns,
types,
inner.as_ref().clone(),
init,
Expand All @@ -291,13 +291,10 @@ where
.map(|f| {
let mut init = init.clone();
init.push(InitNested::Struct(field.is_nullable));
columns_to_iter_recursive(
vec![columns.pop().unwrap()],
vec![types.pop().unwrap()],
f.clone(),
init,
chunk_size,
)
let n = n_columns(f);
let columns = columns.drain(columns.len() - n..).collect();
let types = types.drain(types.len() - n..).collect();
columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size)
})
.collect::<Result<Vec<_>>>()?;
let columns = columns.into_iter().rev().collect();
Expand All @@ -307,6 +304,34 @@ where
})
}

fn n_columns(field: &Field) -> usize {
use crate::datatypes::PhysicalType::*;
match field.data_type.to_physical_type() {
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
| Dictionary(_) | LargeUtf8 => 1,
List | FixedSizeList | LargeList => {
let a = field.data_type().to_logical_type();
if let DataType::List(inner) = a {
n_columns(inner)
} else if let DataType::LargeList(inner) = a {
n_columns(inner)
} else if let DataType::FixedSizeList(inner, _) = a {
n_columns(inner)
} else {
unreachable!()
}
}
Struct => {
if let DataType::Struct(fields) = field.data_type.to_logical_type() {
fields.iter().map(n_columns).sum()
} else {
unreachable!()
}
}
_ => todo!(),
}
}

/// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s.
///
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`.
Expand Down
28 changes: 20 additions & 8 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub trait Nested: std::fmt::Debug + Send + Sync {

fn is_nullable(&self) -> bool;

fn is_repeated(&self) -> bool {
false
}

/// number of rows
fn len(&self) -> usize;

Expand Down Expand Up @@ -79,6 +83,10 @@ impl Nested for NestedOptional {
true
}

fn is_repeated(&self) -> bool {
true
}

fn push(&mut self, value: i64, is_valid: bool) {
self.offsets.push(value);
self.validity.push(is_valid);
Expand Down Expand Up @@ -116,6 +124,10 @@ impl Nested for NestedValid {
false
}

fn is_repeated(&self) -> bool {
true
}

fn push(&mut self, value: i64, _is_valid: bool) {
self.offsets.push(value);
}
Expand Down Expand Up @@ -184,11 +196,11 @@ impl NestedStruct {

impl Nested for NestedStruct {
fn inner(&mut self) -> (Vec<i64>, Option<MutableBitmap>) {
(Default::default(), None)
(Default::default(), Some(std::mem::take(&mut self.validity)))
}

fn is_nullable(&self) -> bool {
false
true
}

fn push(&mut self, _value: i64, is_valid: bool) {
Expand Down Expand Up @@ -389,7 +401,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi

let mut cum_sum = vec![0u32; nested.len() + 1];
for (i, nest) in nested.iter().enumerate() {
let delta = if nest.is_nullable() { 2 } else { 1 };
let delta = nest.is_nullable() as u32 + nest.is_repeated() as u32;
cum_sum[i + 1] = cum_sum[i] + delta;
}

Expand Down Expand Up @@ -422,18 +434,18 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi
#[derive(Debug)]
pub struct Optional<'a> {
iter: HybridRleDecoder<'a>,
max: u32,
max_def: u32,
}

impl<'a> Iterator for Optional<'a> {
type Item = bool;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().and_then(|x| {
if x == self.max {
self.iter.next().and_then(|def| {
if def == self.max_def {
Some(true)
} else if x == self.max - 1 {
} else if def == self.max_def - 1 {
Some(false)
} else {
self.next()
Expand All @@ -450,7 +462,7 @@ impl<'a> Optional<'a> {

Self {
iter: HybridRleDecoder::new(def_levels, get_bit_width(max_def), page.num_values()),
max: max_def as u32,
max_def: max_def as u32,
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/io/parquet/read/deserialize/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ where
P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
pub fn new(iter: I, init: Vec<InitNested>, data_type: DataType, chunk_size: usize, op: F) -> Self {
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: DataType,
chunk_size: usize,
op: F,
) -> Self {
Self {
iter,
init,
Expand Down
38 changes: 23 additions & 15 deletions src/io/parquet/read/deserialize/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,29 @@ impl<'a> Iterator for StructIterator<'a> {
if values.iter().any(|x| x.is_none()) {
return None;
}
let values = values
.into_iter()
.map(|x| x.unwrap().map(|x| x.1))
.collect::<Result<Vec<_>, ArrowError>>();

match values {
Ok(values) => Some(Ok((
NestedState::new(vec![]), // todo
Arc::new(StructArray::from_data(
DataType::Struct(self.fields.clone()),
values,
None,
)),
))),
Err(e) => Some(Err(e)),

// todo: unzip of Result not yet supportted in stable Rust
let mut nested = vec![];
let mut new_values = vec![];
for x in values {
match x.unwrap() {
Ok((nest, values)) => {
new_values.push(values);
nested.push(nest);
}
Err(e) => return Some(Err(e)),
}
}
let mut nested = nested.pop().unwrap();
let (_, validity) = nested.nested.pop().unwrap().inner();

Some(Ok((
nested,
Arc::new(StructArray::from_data(
DataType::Struct(self.fields.clone()),
new_values,
validity.and_then(|x| x.into()),
)),
)))
}
}

0 comments on commit f10a626

Please sign in to comment.