Skip to content

Commit

Permalink
Preserve dictionary encoding from parquet (apache#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 18, 2022
1 parent 0cc0c05 commit 3142ec0
Show file tree
Hide file tree
Showing 12 changed files with 1,235 additions and 226 deletions.
68 changes: 26 additions & 42 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ use arrow::datatypes::{
use arrow::util::bit_util;

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
Expand All @@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
Expand All @@ -81,9 +80,15 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;

mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod offset_buffer;

#[cfg(test)]
mod test_util;

pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -271,7 +276,8 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -829,17 +835,18 @@ fn remove_indices(
size
),
ArrowType::Struct(fields) => {
let struct_array = arr.as_any()
let struct_array = arr
.as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");

// Recursively call remove indices on each of the structs fields
let new_columns = fields.into_iter()
let new_columns = fields
.into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
Ok((field,
remove_indices(column.clone(), dt, indices.clone())?))
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;

Expand Down Expand Up @@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
}
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
Expand Down Expand Up @@ -2025,7 +2009,7 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
Expand Down
71 changes: 7 additions & 64 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,69 +579,18 @@ impl ByteArrayDecoderDictionary {
#[cfg(test)]
mod tests {
use super::*;
use crate::arrow::array_reader::test_util::{
byte_array_all_encodings, utf8_column,
};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::basic::Type as PhysicalType;
use crate::data_type::{ByteArray, ByteArrayType};
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
use crate::util::memory::MemTracker;
use arrow::array::{Array, StringArray};
use std::sync::Arc;

fn column() -> ColumnDescPtr {
let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY)
.with_converted_type(ConvertedType::UTF8)
.build()
.unwrap();

Arc::new(ColumnDescriptor::new(
Arc::new(t),
1,
0,
ColumnPath::new(vec![]),
))
}

fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr {
let descriptor = column();
let mem_tracker = Arc::new(MemTracker::new());
let mut encoder =
get_encoder::<ByteArrayType>(descriptor, encoding, mem_tracker).unwrap();

encoder.put(data).unwrap();
encoder.flush_buffer().unwrap()
}

#[test]
fn test_byte_array_decoder() {
let data: Vec<_> = vec!["hello", "world", "a", "b"]
.into_iter()
.map(ByteArray::from)
.collect();

let mut dict_encoder =
DictEncoder::<ByteArrayType>::new(column(), Arc::new(MemTracker::new()));

dict_encoder.put(&data).unwrap();
let encoded_rle = dict_encoder.flush_buffer().unwrap();
let encoded_dictionary = dict_encoder.write_dict().unwrap();

// A column chunk with all the encodings!
let pages = vec![
(Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)),
(
Encoding::DELTA_BYTE_ARRAY,
get_encoded(Encoding::DELTA_BYTE_ARRAY, &data),
),
(
Encoding::DELTA_LENGTH_BYTE_ARRAY,
get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data),
),
(Encoding::PLAIN_DICTIONARY, encoded_rle.clone()),
(Encoding::RLE_DICTIONARY, encoded_rle),
];
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "a", "b"]);

let column_desc = column();
let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
Expand All @@ -668,15 +617,9 @@ mod tests {
assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);

let valid = vec![false, false, true, true, false, true, true, false, false];
let rev_position_iter = valid
.iter()
.enumerate()
.rev()
.filter_map(|(i, valid)| valid.then(|| i));

let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), rev_position_iter);
output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

Expand Down

0 comments on commit 3142ec0

Please sign in to comment.