Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved parquet read benches (#533)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Oct 16, 2021
1 parent fb8b6f3 commit ee68d18
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 58 deletions.
53 changes: 37 additions & 16 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ use criterion::{criterion_group, criterion_main, Criterion};
use arrow2::error::Result;
use arrow2::io::parquet::read;

fn to_buffer(size: usize) -> Vec<u8> {
fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec<u8> {
let dir = env!("CARGO_MANIFEST_DIR");
let path = PathBuf::from(dir).join(format!("fixtures/pyarrow3/v1/benches_{}.parquet", size));

let dict = if dict { "dict/" } else { "" };
let multi_page = if multi_page { "multi/" } else { "" };
let compressed = if compressed { "snappy/" } else { "" };

let path = PathBuf::from(dir).join(format!(
"fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet",
dict, multi_page, compressed, size
));

let metadata = fs::metadata(&path).expect("unable to read metadata");
let mut file = fs::File::open(path).unwrap();
let mut buffer = vec![0; metadata.len() as usize];
file.read_exact(&mut buffer).expect("buffer overflow");
buffer
}

fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> {
fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);

let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?;
Expand All @@ -31,26 +40,38 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<
fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|i| {
let size = 2usize.pow(i);
let buffer = to_buffer(size);
let buffer = to_buffer(size, false, false, false);
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 0).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let a = format!("read utf8 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 2).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let a = format!("read utf8 large 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 6).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap()));

let a = format!("read bool 2^{}", i);
c.bench_function(&a, |b| {
b.iter(|| read_decompressed_pages(&buffer, size * 8, 3).unwrap())
});
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap()));

let buffer = to_buffer(size, true, false, false);
let a = format!("read utf8 dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, false, true);
let a = format!("read i64 snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, true, false);
let a = format!("read utf8 multi 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let a = format!("read utf8 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, false, true, true);
let a = format!("read i64 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
});
}

Expand Down
1 change: 1 addition & 0 deletions benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
runs
49 changes: 49 additions & 0 deletions benchmarks/bench_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import timeit
import io
import os
import json

import pyarrow.parquet


def _bench_single(log2_size: int, column: str, use_dict: bool) -> float:
if use_dict:
path = f"fixtures/pyarrow3/v1/dict/benches_{2**log2_size}.parquet"
else:
path = f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet"
with open(path, "rb") as f:
data = f.read()
data = io.BytesIO(data)

def f():
pyarrow.parquet.read_table(data, columns=[column])

seconds = timeit.Timer(f).timeit(number=512) / 512
ns = seconds * 1000 * 1000 * 1000
return ns


def _report(name: str, result: float):
path = f"benchmarks/runs/{name}/new"
os.makedirs(path, exist_ok=True)
with open(f"{path}/estimates.json", "w") as f:
json.dump({"mean": {"point_estimate": result}}, f)


def _bench(size, ty):
column, use_dict = {
"i64": ("int64", False),
"bool": ("bool", False),
"utf8": ("string", False),
"utf8 dict": ("string", True),
}[ty]

result = _bench_single(size, column, use_dict)
print(result)
_report(f"read {ty} 2_{size}", result)


for size in range(10, 22, 2):
for ty in ["i64", "bool", "utf8", "utf8 dict"]:
print(size, ty)
_bench(size, ty)
20 changes: 20 additions & 0 deletions benchmarks/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import subprocess


# run pyarrow
subprocess.call(["python", "benchmarks/bench_read.py"])


for ty in ["i64", "bool", "utf8", "utf8 dict"]:
args = [
"cargo",
"bench",
"--features",
"io_parquet,io_parquet_compression",
"--bench",
"read_parquet",
"--",
f"{ty} 2",
]

subprocess.call(args)
51 changes: 51 additions & 0 deletions benchmarks/summarize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import os


def _read_reports(engine: str):
root = {
"arrow2": "target/criterion",
"pyarrow": "benchmarks/runs",
}[engine]

result = []
for item in os.listdir(root):
if item == "report":
continue

with open(os.path.join(root, item, "new", "estimates.json")) as f:
data = json.load(f)

ms = data["mean"]["point_estimate"] / 1000
task = item.split()[0]
type = " ".join(item.split()[1:-1])
size = int(item.split()[-1].split("_")[1])
result.append(
{
"engine": engine,
"task": task,
"type": type,
"size": size,
"time": ms,
}
)
return result


def _print_report(result):
for ty in ["i64", "bool", "utf8", "utf8 dict"]:
print(ty)
r = filter(lambda x: x["type"] == ty, result)
r = sorted(r, key=lambda x: x["size"])
for row in r:
print(row["time"])


def print_report():
for engine in ["arrow2", "pyarrow"]:
print(engine)
result = _read_reports(engine)
_print_report(result)


print_report()
26 changes: 0 additions & 26 deletions parquet_integration/bench_read.py

This file was deleted.

65 changes: 49 additions & 16 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ def case_basic_nullable(size=1):
float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0]
string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"]
boolean = [True, None, False, False, None, True, None, None, True, True]
string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊"] * 10
string_large = [
"ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊"
] * 10
decimal = [Decimal(e) if e is not None else None for e in int64]

fields = [
Expand All @@ -23,9 +25,9 @@ def case_basic_nullable(size=1):
pa.field("uint32", pa.uint32()),
pa.field("string_large", pa.utf8()),
# decimal testing
pa.field("decimal_9", pa.decimal128(9,0)),
pa.field("decimal_18", pa.decimal128(18,0)),
pa.field("decimal_26", pa.decimal128(26,0)),
pa.field("decimal_9", pa.decimal128(9, 0)),
pa.field("decimal_18", pa.decimal128(18, 0)),
pa.field("decimal_26", pa.decimal128(26, 0)),
]
schema = pa.schema(fields)

Expand Down Expand Up @@ -67,9 +69,9 @@ def case_basic_required(size=1):
nullable=False,
),
pa.field("uint32", pa.uint32(), nullable=False),
pa.field("decimal_9", pa.decimal128(9,0), nullable=False),
pa.field("decimal_18", pa.decimal128(18,0), nullable=False),
pa.field("decimal_26", pa.decimal128(26,0), nullable=False),
pa.field("decimal_9", pa.decimal128(9, 0), nullable=False),
pa.field("decimal_18", pa.decimal128(18, 0), nullable=False),
pa.field("decimal_26", pa.decimal128(26, 0), nullable=False),
]
schema = pa.schema(fields)

Expand Down Expand Up @@ -156,42 +158,73 @@ def case_nested(size):
)


def write_pyarrow(case, size=1, page_version=1, use_dictionary=False):
def write_pyarrow(
case,
size: int,
page_version: int,
use_dictionary: bool,
multiple_pages: bool,
compression: bool,
):
data, schema, path = case(size)

base_path = f"{PYARROW_PATH}/v{page_version}"
if use_dictionary:
base_path = f"{base_path}/dict"

if multiple_pages:
base_path = f"{base_path}/multi"

if compression:
base_path = f"{base_path}/snappy"

if compression:
compression = "snappy"
else:
compression = None

if multiple_pages:
data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages
else:
data_page_size = 2 ** 40 # i.e. a large number to ensure a single page

t = pa.table(data, schema=schema)
os.makedirs(base_path, exist_ok=True)
pa.parquet.write_table(
t,
f"{base_path}/{path}",
row_group_size=2 ** 40,
use_dictionary=use_dictionary,
compression=None,
compression=compression,
write_statistics=True,
data_page_size=2 ** 40, # i.e. a large number to ensure a single page
data_page_size=data_page_size,
data_page_version=f"{page_version}.0",
)


for case in [case_basic_nullable, case_basic_required, case_nested]:
for version in [1, 2]:
for use_dict in [True, False]:
write_pyarrow(case, 1, version, use_dict)
write_pyarrow(case, 1, version, use_dict, False, False)


def case_benches(size):
assert size % 8 == 0
size //= 8
data, schema, path = case_basic_nullable(1)
data, schema, _ = case_basic_nullable(1)
for k in data:
data[k] = data[k][:8] * size
data[k] = data[k][:8] * (size // 8)
return data, schema, f"benches_{size}.parquet"


# for read benchmarks
for i in range(3 + 10, 3 + 22, 2):
write_pyarrow(case_benches, 2 ** i, 1) # V1
for i in range(10, 22, 2):
# two pages (dict)
write_pyarrow(case_benches, 2 ** i, 1, True, False, False)
# single page
write_pyarrow(case_benches, 2 ** i, 1, False, False, False)
# multiple pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, False)
# multiple compressed pages
write_pyarrow(case_benches, 2 ** i, 1, False, True, True)
# single compressed page
write_pyarrow(case_benches, 2 ** i, 1, False, False, True)

0 comments on commit ee68d18

Please sign in to comment.