Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved performance of reading utf8 required from parquet (-15%) (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 10, 2021
1 parent 15dc6c5 commit 2b86bc9
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 12 deletions.
29 changes: 20 additions & 9 deletions benches/read_parquet.rs
Expand Up @@ -6,16 +6,23 @@ use criterion::{criterion_group, criterion_main, Criterion};
use arrow2::error::Result;
use arrow2::io::parquet::read;

fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec<u8> {
fn to_buffer(
size: usize,
nullable: bool,
dict: bool,
multi_page: bool,
compressed: bool,
) -> Vec<u8> {
let dir = env!("CARGO_MANIFEST_DIR");

let dict = if dict { "dict/" } else { "" };
let multi_page = if multi_page { "multi/" } else { "" };
let compressed = if compressed { "snappy/" } else { "" };
let nullable = if nullable { "" } else { "_required" };

let path = PathBuf::from(dir).join(format!(
"fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet",
dict, multi_page, compressed, size
"fixtures/pyarrow3/v1/{}{}{}benches{}_{}.parquet",
dict, multi_page, compressed, nullable, size
));

let metadata = fs::metadata(&path).expect("unable to read metadata");
Expand All @@ -40,7 +47,7 @@ fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|i| {
let size = 2usize.pow(i);
let buffer = to_buffer(size, false, false, false);
let buffer = to_buffer(size, true, false, false, false);
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

Expand All @@ -53,25 +60,29 @@ fn add_benchmark(c: &mut Criterion) {
let a = format!("read bool 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap()));

let buffer = to_buffer(size, true, false, false);
let buffer = to_buffer(size, true, true, false, false);
let a = format!("read utf8 dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, false, true);
let buffer = to_buffer(size, true, false, false, true);
let a = format!("read i64 snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, true, false);
let buffer = to_buffer(size, true, false, true, false);
let a = format!("read utf8 multi 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let buffer = to_buffer(size, true, false, true, true);
let a = format!("read utf8 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let buffer = to_buffer(size, true, false, true, true);
let a = format!("read i64 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, false, false, false);
let a = format!("read required utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
});
}

Expand Down
10 changes: 10 additions & 0 deletions parquet_integration/write_parquet.py
Expand Up @@ -284,12 +284,22 @@ def case_benches(size):
return data, schema, f"benches_{size}.parquet"


def case_benches_required(size):
assert size % 8 == 0
data, schema, _ = case_basic_required(1)
for k in data:
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_required_{size}.parquet"


# for read benchmarks
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches, 2 ** i, 1, True, False, False)
# single page
write_pyarrow(case_benches, 2 ** i, 1, False, False, False)
# single page required
write_pyarrow(case_benches_required, 2 ** i, 1, False, False, False)
# multiple pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, False)
# multiple compressed pages
Expand Down
6 changes: 5 additions & 1 deletion src/io/parquet/read/binary/basic.rs
Expand Up @@ -214,19 +214,23 @@ fn read_plain_optional<O: Offset>(

pub(super) fn read_plain_required<O: Offset>(
buffer: &[u8],
_length: usize,
additional: usize,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
) {
let mut last_offset = *offsets.as_mut_slice().last().unwrap();

let values_iterator = utils::BinaryIter::new(buffer);

// each value occupies 4 bytes + len declared in 4 bytes => reserve accordingly.
values.reserve(buffer.len() - 4 * additional);
let a = values.capacity();
for value in values_iterator {
last_offset += O::from_usize(value.len()).unwrap();
values.extend_from_slice(value);
offsets.push(last_offset);
}
debug_assert_eq!(a, values.capacity());
}

pub(super) fn extend_from_page<O: Offset>(
Expand Down
6 changes: 4 additions & 2 deletions src/io/parquet/read/utils.rs
@@ -1,4 +1,6 @@
use parquet2::encoding::{get_length, Encoding};
use std::convert::TryInto;

use parquet2::encoding::Encoding;
use parquet2::metadata::ColumnDescriptor;
use parquet2::page::{split_buffer as _split_buffer, DataPage, DataPageHeader};

Expand All @@ -22,7 +24,7 @@ impl<'a> Iterator for BinaryIter<'a> {
if self.values.is_empty() {
return None;
}
let length = get_length(self.values) as usize;
let length = u32::from_le_bytes(self.values[0..4].try_into().unwrap()) as usize;
self.values = &self.values[4..];
let result = &self.values[..length];
self.values = &self.values[length..];
Expand Down

0 comments on commit 2b86bc9

Please sign in to comment.