Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Moved internal structs
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 11, 2022
1 parent 4843260 commit 261137f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 68 deletions.
5 changes: 2 additions & 3 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ use crate::{
};

use super::super::utils::{
extend_from_decoder, next, BinaryIter, DecodedState, MaybeNext, OptionalPageValidity,
SizedBinaryIter,
extend_from_decoder, next, DecodedState, MaybeNext, OptionalPageValidity,
};
use super::super::DataPages;
use super::{super::utils, utils::Binary};
use super::{super::utils, utils::*};

/*
fn read_delta_optional<O: Offset>(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
use super::super::nested_utils::*;
use super::super::utils::MaybeNext;
use super::basic::ValuesDictionary;
use super::utils::Binary;
use super::utils::*;
use super::{
super::utils,
basic::{finish, Required, TraitBinaryArray},
Expand All @@ -19,7 +19,7 @@ use super::{
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum State<'a> {
Optional(Optional<'a>, utils::BinaryIter<'a>),
Optional(Optional<'a>, BinaryIter<'a>),
Required(Required<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(Optional<'a>, ValuesDictionary<'a>),
Expand Down Expand Up @@ -64,7 +64,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
(Encoding::Plain, None, true) => {
let (_, _, values) = utils::split_buffer(page);

let values = utils::BinaryIter::new(values);
let values = BinaryIter::new(values);

Ok(State::Optional(Optional::new(page), values))
}
Expand Down
62 changes: 62 additions & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,65 @@ impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
self.extend_constant(additional)
}
}


#[derive(Debug)]
pub struct BinaryIter<'a> {
values: &'a [u8],
}

impl<'a> BinaryIter<'a> {
pub fn new(values: &'a [u8]) -> Self {
Self { values }
}
}

impl<'a> Iterator for BinaryIter<'a> {
type Item = &'a [u8];

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.values.is_empty() {
return None;
}
let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize;
self.values = &self.values[4..];
let result = &self.values[..length];
self.values = &self.values[length..];
Some(result)
}
}

#[derive(Debug)]
pub struct SizedBinaryIter<'a> {
iter: BinaryIter<'a>,
remaining: usize,
}

impl<'a> SizedBinaryIter<'a> {
pub fn new(values: &'a [u8], size: usize) -> Self {
let iter = BinaryIter::new(values);
Self {
iter,
remaining: size,
}
}
}

impl<'a> Iterator for SizedBinaryIter<'a> {
type Item = &'a [u8];

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
} else {
self.remaining -= 1
};
self.iter.next()
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}
62 changes: 0 additions & 62 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
use std::convert::TryInto;

use parquet2::encoding::hybrid_rle;
use parquet2::page::{split_buffer as _split_buffer, DataPage};
Expand All @@ -12,67 +11,6 @@ use crate::error::ArrowError;

use super::super::DataPages;

#[derive(Debug)]
pub struct BinaryIter<'a> {
values: &'a [u8],
}

impl<'a> BinaryIter<'a> {
pub fn new(values: &'a [u8]) -> Self {
Self { values }
}
}

impl<'a> Iterator for BinaryIter<'a> {
type Item = &'a [u8];

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.values.is_empty() {
return None;
}
let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize;
self.values = &self.values[4..];
let result = &self.values[..length];
self.values = &self.values[length..];
Some(result)
}
}

#[derive(Debug)]
pub struct SizedBinaryIter<'a> {
iter: BinaryIter<'a>,
remaining: usize,
}

impl<'a> SizedBinaryIter<'a> {
pub fn new(values: &'a [u8], size: usize) -> Self {
let iter = BinaryIter::new(values);
Self {
iter,
remaining: size,
}
}
}

impl<'a> Iterator for SizedBinaryIter<'a> {
type Item = &'a [u8];

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
} else {
self.remaining -= 1
};
self.iter.next()
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}

pub fn not_implemented(page: &DataPage) -> ArrowError {
let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let required = if is_optional { "optional" } else { "required" };
Expand Down

0 comments on commit 261137f

Please sign in to comment.