From 2b86bc9142650c1611d99b12e0bdcd7373a2e8ec Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Fri, 10 Dec 2021 19:52:27 +0100 Subject: [PATCH] Improved performance of reading utf8 required from parquet (-15%) (#670) --- benches/read_parquet.rs | 29 +++++++++++++++++++--------- parquet_integration/write_parquet.py | 10 ++++++++++ src/io/parquet/read/binary/basic.rs | 6 +++++- src/io/parquet/read/utils.rs | 6 ++++-- 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 8f536ed6842..82fef98dba2 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -6,16 +6,23 @@ use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::error::Result; use arrow2::io::parquet::read; -fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec { +fn to_buffer( + size: usize, + nullable: bool, + dict: bool, + multi_page: bool, + compressed: bool, +) -> Vec { let dir = env!("CARGO_MANIFEST_DIR"); let dict = if dict { "dict/" } else { "" }; let multi_page = if multi_page { "multi/" } else { "" }; let compressed = if compressed { "snappy/" } else { "" }; + let nullable = if nullable { "" } else { "_required" }; let path = PathBuf::from(dir).join(format!( - "fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet", - dict, multi_page, compressed, size + "fixtures/pyarrow3/v1/{}{}{}benches{}_{}.parquet", + dict, multi_page, compressed, nullable, size )); let metadata = fs::metadata(&path).expect("unable to read metadata"); @@ -40,7 +47,7 @@ fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { fn add_benchmark(c: &mut Criterion) { (10..=20).step_by(2).for_each(|i| { let size = 2usize.pow(i); - let buffer = to_buffer(size, false, false, false); + let buffer = to_buffer(size, true, false, false, false); let a = format!("read i64 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); @@ -53,25 +60,29 @@ fn add_benchmark(c: &mut Criterion) { let a = format!("read bool 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap())); - let buffer = to_buffer(size, true, false, false); + let buffer = to_buffer(size, true, true, false, false); let a = format!("read utf8 dict 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); - let buffer = to_buffer(size, false, false, true); + let buffer = to_buffer(size, true, false, false, true); let a = format!("read i64 snappy 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); - let buffer = to_buffer(size, false, true, false); + let buffer = to_buffer(size, true, false, true, false); let a = format!("read utf8 multi 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); - let buffer = to_buffer(size, false, true, true); + let buffer = to_buffer(size, true, false, true, true); let a = format!("read utf8 multi snappy 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); - let buffer = to_buffer(size, false, true, true); + let buffer = to_buffer(size, true, false, true, true); let a = format!("read i64 multi snappy 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + + let buffer = to_buffer(size, false, false, false, false); + let a = format!("read required utf8 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); }); } diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index ef781534e0c..9f4199d7487 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -284,12 +284,22 @@ def case_benches(size): return data, schema, f"benches_{size}.parquet" +def case_benches_required(size): + assert size % 8 == 0 + data, schema, _ = case_basic_required(1) + for k in data: + data[k] = data[k][:8] * (size // 8) + return data, schema, f"benches_required_{size}.parquet" + + # for read benchmarks for i in range(10, 22, 2): # two pages (dict) write_pyarrow(case_benches, 2 ** i, 1, True, False, False) # single page write_pyarrow(case_benches, 2 ** i, 1, False, False, False) + # single page required + write_pyarrow(case_benches_required, 2 ** i, 1, False, False, False) # multiple pages write_pyarrow(case_benches, 2 ** i, 1, False, True, False) # multiple compressed pages diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 5d7d0ced562..96b5b21adb0 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -214,7 +214,7 @@ fn read_plain_optional( pub(super) fn read_plain_required( buffer: &[u8], - _length: usize, + additional: usize, offsets: &mut MutableBuffer, values: &mut MutableBuffer, ) { @@ -222,11 +222,15 @@ pub(super) fn read_plain_required( let values_iterator = utils::BinaryIter::new(buffer); + // each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly. + values.reserve(buffer.len() - 4 * additional); + let a = values.capacity(); for value in values_iterator { last_offset += O::from_usize(value.len()).unwrap(); values.extend_from_slice(value); offsets.push(last_offset); } + debug_assert_eq!(a, values.capacity()); } pub(super) fn extend_from_page( diff --git a/src/io/parquet/read/utils.rs b/src/io/parquet/read/utils.rs index 5659f2d6440..85b40a95a6b 100644 --- a/src/io/parquet/read/utils.rs +++ b/src/io/parquet/read/utils.rs @@ -1,4 +1,6 @@ -use parquet2::encoding::{get_length, Encoding}; +use std::convert::TryInto; + +use parquet2::encoding::Encoding; use parquet2::metadata::ColumnDescriptor; use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader}; @@ -22,7 +24,7 @@ impl<'a> Iterator for BinaryIter<'a> { if self.values.is_empty() { return None; } - let length = get_length(self.values) as usize; + let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize; self.values = &self.values[4..]; let result = &self.values[..length]; self.values = &self.values[length..];