diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 4fae152bfa3..4cf14fe82ef 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -11,7 +11,9 @@ fn main() -> Result<()> { let file_path = &args[1]; let reader = File::open(file_path)?; - let reader = read::FileReader::try_new(reader, None, None, None, None)?; + let reader = read::FileReader::try_new(reader, Some(&[8]), None, None, None)?; + + println!("{:#?}", reader.metadata()); let start = SystemTime::now(); for maybe_chunk in reader { diff --git a/examples/parquet_read_parallel/Cargo.toml b/examples/parquet_read_parallel/Cargo.toml index 9a102404d70..46f10ff2047 100644 --- a/examples/parquet_read_parallel/Cargo.toml +++ b/examples/parquet_read_parallel/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" [dependencies] arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] } rayon = { version = "1", default-features = false } +log = "0.4" +chrono = { version = "0.4", default_features = false, features = ["std", "clock"] } diff --git a/examples/parquet_read_parallel/src/main.rs b/examples/parquet_read_parallel/src/main.rs index 4e1a60b401f..3b6eb243bc8 100644 --- a/examples/parquet_read_parallel/src/main.rs +++ b/examples/parquet_read_parallel/src/main.rs @@ -4,46 +4,67 @@ use std::io::BufReader; use std::sync::Arc; use std::time::SystemTime; +use log::trace; use rayon::prelude::*; -use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read}; +use arrow2::{ + array::Array, + chunk::Chunk, + error::Result, + io::parquet::read::{self, ArrayIter}, +}; -fn parallel_read(path: &str, row_group: usize) -> Result>> { +mod logger; + +/// # Panic +/// If the iterators are empty +fn deserialize_parallel(columns: &mut [ArrayIter<'static>]) -> Result>> { + // CPU-bounded + let columns = columns + .par_iter_mut() + .map(|iter| iter.next().transpose()) + .collect::>>()?; + + Chunk::try_new(columns.into_iter().map(|x| x.unwrap()).collect()) +} + +fn parallel_read(path: &str, row_group: usize) -> Result<()> { let mut file = BufReader::new(File::open(path)?); let metadata = read::read_metadata(&mut file)?; let schema = read::infer_schema(&metadata)?; - // read (IO-bounded) all columns into memory (use a subset of the fields to project) - let columns = read::read_columns( - &mut file, - &metadata.row_groups[row_group], - schema.fields, - None, - )?; + let row_group = &metadata.row_groups[row_group]; - // CPU-bounded - let columns = columns - .into_par_iter() - .map(|mut iter| { - // when chunk_size != None, `iter` must be iterated multiple times to get all the chunks - // see the implementation of `arrow2::io::parquet::read::RowGroupDeserializer::next` - // to see how this can be done. - iter.next().unwrap() - }) - .collect::>>()?; + let chunk_size = 1024 * 8; - Chunk::try_new(columns) + // read (IO-bounded) all columns into memory (use a subset of the fields to project) + let mut columns = + read::read_columns_many(&mut file, row_group, schema.fields, Some(chunk_size))?; + + // deserialize (CPU-bounded) to arrow + let mut num_rows = row_group.num_rows(); + while num_rows > 0 { + num_rows = num_rows.saturating_sub(chunk_size); + trace!("[parquet/deserialize][start]"); + let chunk = deserialize_parallel(&mut columns)?; + trace!("[parquet/deserialize][end][{}]", chunk.len()); + assert!(!chunk.is_empty()); + } + Ok(()) } fn main() -> Result<()> { + log::set_logger(&logger::LOGGER) + .map(|()| log::set_max_level(log::LevelFilter::Trace)) + .unwrap(); + use std::env; let args: Vec = env::args().collect(); let file_path = &args[1]; let row_group = args[2].parse::().unwrap(); let start = SystemTime::now(); - let batch = parallel_read(file_path, row_group)?; - assert!(!batch.is_empty()); + parallel_read(file_path, row_group)?; println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(())