Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions benchmarks/compress-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use vortex_bench::Target;
use vortex_bench::compress::CompressMeasurements;
use vortex_bench::compress::CompressOp;
use vortex_bench::compress::Compressor;
use vortex_bench::compress::READ_PROJECTION_COLUMNS;
use vortex_bench::compress::READ_PROJECTION_ROOT_COLUMNS;
use vortex_bench::compress::benchmark_compress;
use vortex_bench::compress::benchmark_decompress;
use vortex_bench::compress::calculate_ratios;
Expand Down Expand Up @@ -134,6 +136,13 @@ async fn run_compress(
StructListOfInts::new(100, 1000, 50),
StructListOfInts::new(1000, 1000, 50),
StructListOfInts::new(10000, 1000, 50),
// Very wide file: project a fixed 10k columns out of 100k, across 10 chunks.
StructListOfInts::new_with_projection(
READ_PROJECTION_ROOT_COLUMNS,
1000,
10,
Some(READ_PROJECTION_COLUMNS),
),
];

let datasets: Vec<&dyn Dataset> = [
Expand Down
9 changes: 8 additions & 1 deletion benchmarks/compress-bench/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use parquet::arrow::ArrowWriter;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::basic::Compression;
use parquet::basic::ZstdLevel;
use parquet::file::properties::WriterProperties;
use vortex_bench::Format;
use vortex_bench::compress::Compressor;
use vortex_bench::compress::read_projection;

/// Compressor implementation for Parquet format with ZSTD compression.
pub struct ParquetCompressor {
Expand Down Expand Up @@ -108,7 +110,12 @@ pub fn parquet_compress_write(

#[inline(never)]
pub fn parquet_decompress_read(buf: Bytes) -> anyhow::Result<usize> {
let builder = ParquetRecordBatchReaderBuilder::try_new(buf)?;
let mut builder = ParquetRecordBatchReaderBuilder::try_new(buf)?;
if let Some(cols) = read_projection(builder.schema().fields().len()) {
// Project the given top-level (root) columns.
let mask = ProjectionMask::roots(builder.parquet_schema(), cols.iter().copied());
builder = builder.with_projection(mask);
}
let reader = builder.build()?;
let mut nbytes = 0;
for batch in reader {
Expand Down
15 changes: 14 additions & 1 deletion benchmarks/compress-bench/src/vortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ use bytes::Bytes;
use futures::StreamExt;
use futures::pin_mut;
use vortex::array::IntoArray;
use vortex::dtype::FieldNames;
use vortex::expr::root;
use vortex::expr::select;
use vortex::file::OpenOptionsSessionExt;
use vortex::file::WriteOptionsSessionExt;
use vortex_bench::Format;
use vortex_bench::SESSION;
use vortex_bench::compress::Compressor;
use vortex_bench::compress::read_projection;
use vortex_bench::conversions::parquet_to_vortex_chunks;

/// Compressor implementation for Vortex format.
Expand Down Expand Up @@ -58,7 +62,16 @@ impl Compressor for VortexCompressor {
// Now decompress
let start = Instant::now();
let data = Bytes::from(buf);
let scan = SESSION.open_options().open_buffer(data)?.scan()?;
let mut scan = SESSION.open_options().open_buffer(data)?.scan()?;
let root_columns = scan
.dtype()?
.as_struct_fields_opt()
.map_or(0, |fields| fields.nfields());
if let Some(cols) = read_projection(root_columns) {
// Columns are named "0".."num_columns-1"; project the given subset.
let names: FieldNames = cols.iter().map(|i| i.to_string()).collect();
scan = scan.with_projection(select(names, root()));
}
let schema = Arc::new(scan.dtype()?.to_arrow_schema()?);

let stream = scan.into_record_batch_stream(schema)?;
Expand Down
9 changes: 8 additions & 1 deletion benchmarks/lance-bench/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tempfile::TempDir;
use vortex_bench::Format;
use vortex_bench::compress::Compressor;
use vortex_bench::compress::read_projection;

use crate::convert::convert_utf8view_batch;
use crate::convert::convert_utf8view_schema;
Expand All @@ -28,7 +29,13 @@ use crate::convert::convert_utf8view_schema;
pub async fn lance_decompress_read(path: &str) -> anyhow::Result<usize> {
// Open the Lance dataset from the filesystem path.
let dataset = Dataset::open(path).await?;
let scanner = dataset.scan();
let mut scanner = dataset.scan();

// Apply the fixed wide-table read projection, if any. Columns are named "0".."n-1".
if let Some(cols) = read_projection(dataset.schema().fields.len()) {
let names: Vec<String> = cols.iter().map(|i| i.to_string()).collect();
scanner.project(&names)?;
}

// Convert to a stream of RecordBatches.
let mut stream = scanner.try_into_stream().await?;
Expand Down
28 changes: 28 additions & 0 deletions vortex-bench/src/compress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@ use crate::Format;
use crate::measurements::CompressionTimingMeasurement;
use crate::measurements::CustomUnitMeasurement;

/// Number of top-level columns in the wide-table decompression projection benchmark.
pub const READ_PROJECTION_ROOT_COLUMNS: usize = 100_000;

/// Number of top-level columns read by the wide-table decompression projection benchmark.
pub const READ_PROJECTION_COLUMNS: usize = 10_000;

/// Fixed read projection for the wide-table decompression projection benchmark.
pub static READ_PROJECTION: [usize; READ_PROJECTION_COLUMNS] = make_read_projection();

const fn make_read_projection() -> [usize; READ_PROJECTION_COLUMNS] {
let stride = READ_PROJECTION_ROOT_COLUMNS / READ_PROJECTION_COLUMNS;
let mut projection = [0; READ_PROJECTION_COLUMNS];
let mut idx = 0;
while idx < READ_PROJECTION_COLUMNS {
projection[idx] = idx * stride;
idx += 1;
}
projection
}

/// Read projection for a file with `root_columns` top-level columns, if this benchmark projects it.
pub fn read_projection(root_columns: usize) -> Option<&'static [usize]> {
(root_columns == READ_PROJECTION_ROOT_COLUMNS).then_some(&READ_PROJECTION)
}

#[derive(Default)]
pub struct CompressMeasurements {
pub timings: Vec<CompressionTimingMeasurement>,
Expand Down Expand Up @@ -100,6 +125,9 @@ pub trait Compressor: Send + Sync {
///
/// This method first compresses the data to the target format, then decompresses it.
/// The timing returned should only measure the decompression phase.
///
/// Format implementations apply the fixed wide-table read projection when the input schema
/// matches the projection benchmark.
async fn decompress(&self, parquet_path: &Path) -> Result<Duration>;
}

Expand Down
17 changes: 16 additions & 1 deletion vortex-bench/src/datasets/struct_list_of_ints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,26 @@ pub struct StructListOfInts {

impl StructListOfInts {
pub fn new(num_columns: usize, row_count: usize, chunk_count: usize) -> Self {
Self::new_with_projection(num_columns, row_count, chunk_count, None)
}

/// Like [`StructListOfInts::new`], but names the dataset for a projected decompress benchmark.
pub fn new_with_projection(
num_columns: usize,
row_count: usize,
chunk_count: usize,
project_columns: Option<usize>,
) -> Self {
let mut name =
format!("wide table cols={num_columns} chunks={chunk_count} rows={row_count}");
if let Some(count) = project_columns {
name.push_str(&format!(" project={count}"));
}
Self {
num_columns,
row_count,
chunk_count,
name: format!("wide table cols={num_columns} chunks={chunk_count} rows={row_count}"),
name,
}
}
}
Expand Down
Loading