Skip to content

Commit

Permalink
Simplified code (jorgecarleitao#1401)
Browse files Browse the repository at this point in the history
Simpler code
  • Loading branch information
jorgecarleitao authored and ritchie46 committed Apr 5, 2023
1 parent 567706d commit b2d447a
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 96 deletions.
77 changes: 26 additions & 51 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use parquet2::{

use crate::{
array::{Array, BinaryArray, Utf8Array},
bitmap::{Bitmap, MutableBitmap},
buffer::Buffer,
datatypes::DataType,
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::{Error, Result},
offset::{Offset, OffsetsBuffer},
offset::Offset,
};

use super::super::utils::{
Expand Down Expand Up @@ -225,39 +224,6 @@ impl<'a> utils::PageState<'a> for State<'a> {
}
}

pub trait TraitBinaryArray<O: Offset>: Array + 'static {
fn try_new(
data_type: DataType,
offsets: OffsetsBuffer<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self>
where
Self: Sized;
}

impl<O: Offset> TraitBinaryArray<O> for BinaryArray<O> {
fn try_new(
data_type: DataType,
offsets: OffsetsBuffer<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self> {
Self::try_new(data_type, offsets, values, validity)
}
}

impl<O: Offset> TraitBinaryArray<O> for Utf8Array<O> {
fn try_new(
data_type: DataType,
offsets: OffsetsBuffer<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self> {
Self::try_new(data_type, offsets, values, validity)
}
}

impl<O: Offset> DecodedState for (Binary<O>, MutableBitmap) {
fn len(&self) -> usize {
self.0.len()
Expand Down Expand Up @@ -475,34 +441,44 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}

pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
pub(super) fn finish<O: Offset>(
data_type: &DataType,
mut values: Binary<O>,
mut validity: MutableBitmap,
) -> Result<A> {
) -> Result<Box<dyn Array>> {
values.offsets.shrink_to_fit();
values.values.shrink_to_fit();
validity.shrink_to_fit();

A::try_new(
data_type.clone(),
values.offsets.into(),
values.values.into(),
validity.into(),
)
match data_type.to_physical_type() {
PhysicalType::Binary | PhysicalType::LargeBinary => BinaryArray::<O>::try_new(
data_type.clone(),
values.offsets.into(),
values.values.into(),
validity.into(),
)
.map(|x| x.boxed()),
PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Utf8Array::<O>::try_new(
data_type.clone(),
values.offsets.into(),
values.values.into(),
validity.into(),
)
.map(|x| x.boxed()),
_ => unreachable!(),
}
}

pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: Pages> {
pub struct Iter<O: Offset, I: Pages> {
iter: I,
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
dict: Option<Dict>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iter<O, A, I> {
impl<O: Offset, I: Pages> Iter<O, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, num_rows: usize) -> Self {
Self {
iter,
Expand All @@ -511,13 +487,12 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iter<O, A, I> {
dict: None,
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iterator for Iter<O, A, I> {
type Item = Result<A>;
impl<O: Offset, I: Pages> Iterator for Iter<O, I> {
type Item = Result<Box<dyn Array>>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = next(
Expand Down
16 changes: 7 additions & 9 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use parquet2::{
};

use crate::{
bitmap::MutableBitmap, datatypes::DataType, error::Result, io::parquet::read::Pages,
offset::Offset,
array::Array, bitmap::MutableBitmap, datatypes::DataType, error::Result,
io::parquet::read::Pages, offset::Offset,
};

use super::super::utils::MaybeNext;
Expand All @@ -17,7 +17,7 @@ use super::utils::*;
use super::{super::nested_utils::*, basic::deserialize_plain};
use super::{
super::utils,
basic::{finish, Dict, TraitBinaryArray},
basic::{finish, Dict},
};

#[derive(Debug)]
Expand Down Expand Up @@ -136,18 +136,17 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
}
}

pub struct NestedIter<O: Offset, A: TraitBinaryArray<O>, I: Pages> {
pub struct NestedIter<O: Offset, I: Pages> {
iter: I,
data_type: DataType,
init: Vec<InitNested>,
items: VecDeque<(NestedState, (Binary<O>, MutableBitmap))>,
dict: Option<Dict>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> NestedIter<O, A, I> {
impl<O: Offset, I: Pages> NestedIter<O, I> {
pub fn new(
iter: I,
init: Vec<InitNested>,
Expand All @@ -163,13 +162,12 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> NestedIter<O, A, I> {
dict: None,
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iterator for NestedIter<O, A, I> {
type Item = Result<(NestedState, A)>;
impl<O: Offset, I: Pages> Iterator for NestedIter<O, I> {
type Item = Result<(NestedState, Box<dyn Array>)>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = next(
Expand Down
46 changes: 19 additions & 27 deletions src/io/parquet/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
use parquet2::schema::types::PrimitiveType;

use crate::{
array::{BinaryArray, MapArray, Utf8Array},
array::MapArray,
datatypes::{DataType, Field},
error::{Error, Result},
};

use super::nested_utils::{InitNested, NestedArrayIter};
use super::*;

/// Converts an iterator of arrays to a trait object returning trait objects
#[inline]
fn remove_nested<'a, I>(iter: I) -> NestedArrayIter<'a>
where
I: Iterator<Item = Result<(NestedState, Box<dyn Array>)>> + Send + Sync + 'a,
{
Box::new(iter.map(|x| {
x.map(|(mut nested, array)| {
let _ = nested.nested.pop().unwrap(); // the primitive
(nested, array)
})
}))
}

/// Converts an iterator of arrays to a trait object returning trait objects
#[inline]
fn primitive<'a, A, I>(iter: I) -> NestedArrayIter<'a>
Expand Down Expand Up @@ -185,43 +199,21 @@ where
|x: f64| x,
))
}
Utf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive(binary::NestedIter::<i32, Utf8Array<i32>, _>::new(
columns.pop().unwrap(),
init,
field.data_type().clone(),
num_rows,
chunk_size,
))
}
LargeUtf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive(binary::NestedIter::<i64, Utf8Array<i64>, _>::new(
columns.pop().unwrap(),
init,
field.data_type().clone(),
num_rows,
chunk_size,
))
}
Binary => {
Binary | Utf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive(binary::NestedIter::<i32, BinaryArray<i32>, _>::new(
remove_nested(binary::NestedIter::<i32, _>::new(
columns.pop().unwrap(),
init,
field.data_type().clone(),
num_rows,
chunk_size,
))
}
LargeBinary => {
LargeBinary | LargeUtf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
primitive(binary::NestedIter::<i64, BinaryArray<i64>, _>::new(
remove_nested(binary::NestedIter::<i64, _>::new(
columns.pop().unwrap(),
init,
field.data_type().clone(),
Expand Down
12 changes: 3 additions & 9 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parquet2::{
};

use crate::{
array::{Array, BinaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, Utf8Array},
array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{Error, Result},
types::{days_ms, NativeType},
Expand Down Expand Up @@ -278,16 +278,10 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>(
|x: f64| x,
))),

Binary => dyn_iter(binary::Iter::<i32, BinaryArray<i32>, _>::new(
Utf8 | Binary => Box::new(binary::Iter::<i32, _>::new(
pages, data_type, chunk_size, num_rows,
)),
LargeBinary => dyn_iter(binary::Iter::<i64, BinaryArray<i64>, _>::new(
pages, data_type, chunk_size, num_rows,
)),
Utf8 => dyn_iter(binary::Iter::<i32, Utf8Array<i32>, _>::new(
pages, data_type, chunk_size, num_rows,
)),
LargeUtf8 => dyn_iter(binary::Iter::<i64, Utf8Array<i64>, _>::new(
LargeBinary | LargeUtf8 => Box::new(binary::Iter::<i64, _>::new(
pages, data_type, chunk_size, num_rows,
)),

Expand Down

0 comments on commit b2d447a

Please sign in to comment.