Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrated
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 21, 2022
1 parent bca3a95 commit aa6e4b7
Show file tree
Hide file tree
Showing 24 changed files with 131 additions and 104 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ahash = { version = "0.7", optional = true }

# parquet support
#parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }
parquet2 = { path = "../parquet2", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "write_indexes", optional = true, default_features = false, features = ["stream"] }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
3 changes: 1 addition & 2 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ fn write_batch(path: &str, schema: Schema, columns: Chunk<Arc<dyn Array>>) -> Re

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _size = writer.end(None)?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ fn main() -> Result<()> {
// Write the file.
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/write/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub fn array_to_page<O: Offset>(
utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
Expand All @@ -97,7 +98,7 @@ pub fn array_to_page<O: Offset>(
)
}

pub(super) fn build_statistics<O: Offset>(
pub(crate) fn build_statistics<O: Offset>(
array: &BinaryArray<O>,
primitive_type: PrimitiveType,
) -> ParquetStatistics {
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/write/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod basic;
mod nested;

pub use basic::array_to_page;
pub(crate) use basic::build_statistics;
pub(crate) use basic::encode_plain;
pub(super) use basic::{encode_delta, ord_binary};
pub use nested::array_to_page as nested_array_to_page;
1 change: 1 addition & 0 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ where
utils::build_plain_page(
buffer,
levels::num_values(nested.offsets()),
nested.offsets().len().saturating_sub(1),
array.null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/write/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub fn array_to_page(
utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ where
utils::build_plain_page(
buffer,
levels::num_values(nested.offsets()),
nested.offsets().len().saturating_sub(1),
array.null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
Expand Down
81 changes: 51 additions & 30 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
metadata::Descriptor,
page::{EncodedDictPage, EncodedPage},
statistics::ParquetStatistics,
write::{DynIter, WriteOptions},
};

use super::binary::build_statistics as binary_build_statistics;
use super::binary::encode_plain as binary_encode_plain;
use super::fixed_len_bytes::build_statistics as fixed_binary_build_statistics;
use super::fixed_len_bytes::encode_plain as fixed_binary_encode_plain;
use super::primitive::build_statistics as primitive_build_statistics;
use super::primitive::encode_plain as primitive_encode_plain;
use super::utf8::build_statistics as utf8_build_statistics;
use super::utf8::encode_plain as utf8_encode_plain;
use crate::bitmap::Bitmap;
use crate::datatypes::DataType;
Expand All @@ -20,9 +25,9 @@ use crate::{

fn encode_keys<K: DictionaryKey>(
array: &PrimitiveArray<K>,
// todo: merge this to not discard values' validity
validity: Option<&Bitmap>,
descriptor: Descriptor,
statistics: ParquetStatistics,
options: WriteOptions,
) -> Result<EncodedPage> {
let is_optional = is_nullable(&descriptor.primitive_type.field_info);
Expand Down Expand Up @@ -96,10 +101,11 @@ fn encode_keys<K: DictionaryKey>(
utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
None,
Some(statistics),
descriptor,
options,
Encoding::RleDictionary,
Expand All @@ -108,12 +114,15 @@ fn encode_keys<K: DictionaryKey>(
}

macro_rules! dyn_prim {
($from:ty, $to:ty, $array:expr, $options:expr) => {{
($from:ty, $to:ty, $array:expr, $options:expr, $descriptor:expr) => {{
let values = $array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
primitive_encode_plain::<$from, $to>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
(
EncodedDictPage::new(buffer, values.len()),
primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()),
)
}};
}

Expand All @@ -123,59 +132,66 @@ pub fn array_to_pages<K: DictionaryKey>(
options: WriteOptions,
encoding: Encoding,
) -> Result<DynIter<'static, Result<EncodedPage>>> {
println!("{descriptor:#?}");
match encoding {
Encoding::PlainDictionary | Encoding::RleDictionary => {
// write DictPage
let dict_page = match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array, options),
DataType::Int16 => dyn_prim!(i16, i32, array, options),
let (dict_page, statistics) = match array.values().data_type().to_logical_type() {
DataType::Int8 => dyn_prim!(i8, i32, array, options, descriptor),
DataType::Int16 => dyn_prim!(i16, i32, array, options, descriptor),
DataType::Int32 | DataType::Date32 | DataType::Time32(_) => {
dyn_prim!(i32, i32, array, options)
dyn_prim!(i32, i32, array, options, descriptor)
}
DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => dyn_prim!(i64, i64, array, options),
DataType::UInt8 => dyn_prim!(u8, i32, array, options),
DataType::UInt16 => dyn_prim!(u16, i32, array, options),
DataType::UInt32 => dyn_prim!(u32, i32, array, options),
DataType::UInt64 => dyn_prim!(i64, i64, array, options),
DataType::Float32 => dyn_prim!(f32, f32, array, options),
DataType::Float64 => dyn_prim!(f64, f64, array, options),
| DataType::Duration(_) => dyn_prim!(i64, i64, array, options, descriptor),
DataType::UInt8 => dyn_prim!(u8, i32, array, options, descriptor),
DataType::UInt16 => dyn_prim!(u16, i32, array, options, descriptor),
DataType::UInt32 => dyn_prim!(u32, i32, array, options, descriptor),
DataType::UInt64 => dyn_prim!(i64, i64, array, options, descriptor),
DataType::Float32 => dyn_prim!(f32, f32, array, options, descriptor),
DataType::Float64 => dyn_prim!(f64, f64, array, options, descriptor),
DataType::Utf8 => {
let values = array.values().as_any().downcast_ref().unwrap();
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i32>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
utf8_encode_plain::<i32>(array, false, &mut buffer);
let stats = utf8_build_statistics(array, descriptor.primitive_type.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::LargeUtf8 => {
let values = array.values().as_any().downcast_ref().unwrap();
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
utf8_encode_plain::<i64>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
utf8_encode_plain::<i64>(array, false, &mut buffer);
let stats = utf8_build_statistics(array, descriptor.primitive_type.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::Binary => {
let values = array.values().as_any().downcast_ref().unwrap();
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i32>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
binary_encode_plain::<i32>(array, false, &mut buffer);
let stats = binary_build_statistics(array, descriptor.primitive_type.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::LargeBinary => {
let values = array.values().as_any().downcast_ref().unwrap();
let array = array.values().as_any().downcast_ref().unwrap();

let mut buffer = vec![];
binary_encode_plain::<i64>(values, false, &mut buffer);
EncodedDictPage::new(buffer, values.len())
binary_encode_plain::<i64>(array, false, &mut buffer);
let stats = binary_build_statistics(array, descriptor.primitive_type.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
DataType::FixedSizeBinary(_) => {
let mut buffer = vec![];
let array = array.values().as_any().downcast_ref().unwrap();
fixed_binary_encode_plain(array, false, &mut buffer);
EncodedDictPage::new(buffer, array.len())
let stats =
fixed_binary_build_statistics(array, descriptor.primitive_type.clone());
(EncodedDictPage::new(buffer, array.len()), stats)
}
other => {
return Err(ArrowError::NotYetImplemented(format!(
Expand All @@ -187,8 +203,13 @@ pub fn array_to_pages<K: DictionaryKey>(
let dict_page = EncodedPage::Dict(dict_page);

// write DataPage pointing to DictPage
let data_page =
encode_keys(array.keys(), array.values().validity(), descriptor, options)?;
let data_page = encode_keys(
array.keys(),
array.values().validity(),
descriptor,
statistics,
options,
)?;

let iter = std::iter::once(Ok(dict_page)).chain(std::iter::once(Ok(data_page)));
Ok(DynIter::new(Box::new(iter)))
Expand Down
8 changes: 2 additions & 6 deletions src/io/parquet/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ impl<W: Write> FileWriter<W> {
}

/// Writes a row group to the file.
pub fn write(
&mut self,
row_group: RowGroupIter<'_, ArrowError>,
num_rows: usize,
) -> Result<()> {
Ok(self.writer.write(row_group, num_rows)?)
pub fn write(&mut self, row_group: RowGroupIter<'_, ArrowError>) -> Result<()> {
Ok(self.writer.write(row_group)?)
}

/// Writes the footer of the parquet file. Returns the total size of the file.
Expand Down
21 changes: 10 additions & 11 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use parquet2::{
encoding::Encoding,
metadata::Descriptor,
page::DataPage,
statistics::{deserialize_statistics, serialize_statistics, ParquetStatistics},
schema::types::PrimitiveType,
statistics::{serialize_statistics, FixedLenStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};

Expand Down Expand Up @@ -48,14 +49,15 @@ pub fn array_to_page(
encode_plain(array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
build_statistics(array, descriptor.clone())
Some(build_statistics(array, descriptor.primitive_type.clone()))
} else {
None
};

utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
Expand All @@ -68,11 +70,10 @@ pub fn array_to_page(

pub(super) fn build_statistics(
array: &FixedSizeBinaryArray,
descriptor: Descriptor,
) -> Option<ParquetStatistics> {
let pq_statistics = &ParquetStatistics {
max: None,
min: None,
primitive_type: PrimitiveType,
) -> ParquetStatistics {
let statistics = &FixedLenStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array
Expand All @@ -85,8 +86,6 @@ pub(super) fn build_statistics(
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
};
deserialize_statistics(pq_statistics, descriptor.primitive_type)
.map(|e| serialize_statistics(&*e))
.ok()
} as &dyn Statistics;
serialize_statistics(statistics)
}
2 changes: 2 additions & 0 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ pub(self) fn decimal_length_from_precision(precision: usize) -> usize {

/// Creates a parquet [`SchemaDescriptor`] from a [`Schema`].
pub fn to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
println!("{:#?}", schema);
let parquet_types = schema
.fields
.iter()
.map(to_parquet_type)
.collect::<Result<Vec<_>>>()?;
println!("{:#?}", parquet_types);
Ok(SchemaDescriptor::new("root".to_string(), parquet_types))
}

Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/write/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ where
utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/write/primitive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod basic;
mod nested;

pub(crate) use basic::build_statistics;
pub use basic::array_to_page;
pub(crate) use basic::encode_plain;
pub use nested::array_to_page as nested_array_to_page;
1 change: 1 addition & 0 deletions src/io/parquet/write/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ where
utils::build_plain_page(
buffer,
levels::num_values(nested.offsets()),
nested.offsets().len().saturating_sub(1),
array.null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
Expand Down
16 changes: 6 additions & 10 deletions src/io/parquet/write/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,19 @@ impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Chunk<A>>>> RowGro
impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = Result<Chunk<A>>>> Iterator
for RowGroupIterator<A, I>
{
type Item = Result<(RowGroupIter<'static, ArrowError>, usize)>;
type Item = Result<RowGroupIter<'static, ArrowError>>;

fn next(&mut self) -> Option<Self::Item> {
let options = self.options;

self.iter.next().map(|maybe_chunk| {
let chunk = maybe_chunk?;
let len = chunk.len();
let encodings = self.encodings.clone();
Ok((
row_group_iter(
chunk,
encodings,
self.parquet_schema.columns().to_vec(),
options,
),
len,
Ok(row_group_iter(
chunk,
encodings,
self.parquet_schema.columns().to_vec(),
options,
))
})
}
Expand Down
Loading

0 comments on commit aa6e4b7

Please sign in to comment.