Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed example of parallel read via rayon (#958)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 22, 2022
1 parent 7556ebc commit debf00a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 23 deletions.
4 changes: 3 additions & 1 deletion examples/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ fn main() -> Result<()> {
let file_path = &args[1];

let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, None, None, None)?;
let reader = read::FileReader::try_new(reader, Some(&[8]), None, None, None)?;

println!("{:#?}", reader.metadata());

let start = SystemTime::now();
for maybe_chunk in reader {
Expand Down
2 changes: 2 additions & 0 deletions examples/parquet_read_parallel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ edition = "2021"
[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] }
rayon = { version = "1", default-features = false }
log = "0.4"
chrono = { version = "0.4", default_features = false, features = ["std", "clock"] }
65 changes: 43 additions & 22 deletions examples/parquet_read_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,67 @@ use std::io::BufReader;
use std::sync::Arc;
use std::time::SystemTime;

use log::trace;
use rayon::prelude::*;

use arrow2::{array::Array, chunk::Chunk, error::Result, io::parquet::read};
use arrow2::{
array::Array,
chunk::Chunk,
error::Result,
io::parquet::read::{self, ArrayIter},
};

fn parallel_read(path: &str, row_group: usize) -> Result<Chunk<Arc<dyn Array>>> {
mod logger;

/// # Panic
/// If the iterators are empty
fn deserialize_parallel(columns: &mut [ArrayIter<'static>]) -> Result<Chunk<Arc<dyn Array>>> {
// CPU-bounded
let columns = columns
.par_iter_mut()
.map(|iter| iter.next().transpose())
.collect::<Result<Vec<_>>>()?;

Chunk::try_new(columns.into_iter().map(|x| x.unwrap()).collect())
}

fn parallel_read(path: &str, row_group: usize) -> Result<()> {
let mut file = BufReader::new(File::open(path)?);
let metadata = read::read_metadata(&mut file)?;
let schema = read::infer_schema(&metadata)?;

// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let columns = read::read_columns(
&mut file,
&metadata.row_groups[row_group],
schema.fields,
None,
)?;
let row_group = &metadata.row_groups[row_group];

// CPU-bounded
let columns = columns
.into_par_iter()
.map(|mut iter| {
// when chunk_size != None, `iter` must be iterated multiple times to get all the chunks
// see the implementation of `arrow2::io::parquet::read::RowGroupDeserializer::next`
// to see how this can be done.
iter.next().unwrap()
})
.collect::<Result<Vec<_>>>()?;
let chunk_size = 1024 * 8;

Chunk::try_new(columns)
// read (IO-bounded) all columns into memory (use a subset of the fields to project)
let mut columns =
read::read_columns_many(&mut file, row_group, schema.fields, Some(chunk_size))?;

// deserialize (CPU-bounded) to arrow
let mut num_rows = row_group.num_rows();
while num_rows > 0 {
num_rows = num_rows.saturating_sub(chunk_size);
trace!("[parquet/deserialize][start]");
let chunk = deserialize_parallel(&mut columns)?;
trace!("[parquet/deserialize][end][{}]", chunk.len());
assert!(!chunk.is_empty());
}
Ok(())
}

fn main() -> Result<()> {
log::set_logger(&logger::LOGGER)
.map(|()| log::set_max_level(log::LevelFilter::Trace))
.unwrap();

use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];
let row_group = args[2].parse::<usize>().unwrap();

let start = SystemTime::now();
let batch = parallel_read(file_path, row_group)?;
assert!(!batch.is_empty());
parallel_read(file_path, row_group)?;
println!("took: {} ms", start.elapsed().unwrap().as_millis());

Ok(())
Expand Down

0 comments on commit debf00a

Please sign in to comment.