From aee8124377372bc09462e52e2797801d448c1ea6 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Fri, 17 Sep 2021 23:40:56 +0100 Subject: [PATCH] Added support for required dict utf8 read. (#419) --- src/io/parquet/read/binary/basic.rs | 42 +++++++++++++++++++++++++++++ tests/it/io/parquet/read.rs | 10 +++++++ 2 files changed, 52 insertions(+) diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index dc92797ff38..44d52857e99 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -80,6 +80,38 @@ fn read_dict_buffer( } } +#[allow(clippy::too_many_arguments)] +fn read_dict_required( + indices_buffer: &[u8], + additional: usize, + dict: &BinaryPageDict, + offsets: &mut MutableBuffer, + values: &mut MutableBuffer, + validity: &mut MutableBitmap, +) { + let dict_values = dict.values(); + let dict_offsets = dict.offsets(); + let mut last_offset = *offsets.as_mut_slice().last().unwrap(); + + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let indices = hybrid_rle::HybridRleDecoder::new(indices_buffer, bit_width as u32, additional); + + for index in indices { + let index = index as usize; + let dict_offset_i = dict_offsets[index] as usize; + let dict_offset_ip1 = dict_offsets[index + 1] as usize; + let length = dict_offset_ip1 - dict_offset_i; + last_offset += O::from_usize(length).unwrap(); + offsets.push(last_offset); + values.extend_from_slice(&dict_values[dict_offset_i..dict_offset_ip1]); + } + validity.extend_constant(additional, true); +} + fn read_delta_optional( validity_buffer: &[u8], values_buffer: &[u8], @@ -226,6 +258,16 @@ fn extend_from_page( validity, ) } + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + read_dict_required::( + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + offsets, + values, + validity, + ) + } (Encoding::DeltaLengthByteArray, None, true) => read_delta_optional::( validity_buffer, values_buffer, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 35ddcf3b5ce..93544b3d73a 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -142,6 +142,16 @@ fn v1_utf8_nullable_dict() -> Result<()> { test_pyarrow_integration(2, 1, "basic", true, false) } +#[test] +fn v2_utf8_required_dict() -> Result<()> { + test_pyarrow_integration(2, 2, "basic", true, true) +} + +#[test] +fn v1_utf8_required_dict() -> Result<()> { + test_pyarrow_integration(2, 1, "basic", true, true) +} + #[test] fn v2_boolean_nullable() -> Result<()> { test_pyarrow_integration(3, 2, "basic", false, false)