Skip to content

Commit

Permalink
reuse buffers in parallel parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 23, 2021
1 parent 1c5cf4e commit ced3286
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 4 deletions.
2 changes: 2 additions & 0 deletions polars/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ clippy:
-p polars-io \
-p polars-lazy \
-p polars-arrow \
-p polars-utils \
-p polars-time

clippy-default:
Expand All @@ -30,6 +31,7 @@ test:
-p polars-core \
-p polars-arrow \
-p polars-time \
-p polars-utils \
-- \
--test-threads=2

Expand Down
3 changes: 2 additions & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ json = ["arrow/io_json"]
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# ipc = []
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression"]
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "polars-utils"]
dtype-datetime = ["polars-core/dtype-datetime", "polars-core/temporal"]
dtype-date = ["polars-core/dtype-date"]
dtype-time = ["polars-core/dtype-time", "polars-core/temporal"]
Expand Down Expand Up @@ -45,6 +45,7 @@ num = "^0.4.0"
num_cpus = "1.13.0"
polars-arrow = { version = "0.18.0", path = "../polars-arrow" }
polars-core = { version = "0.18.0", path = "../polars-core", features = ["private"], default-features = false }
polars-utils = { version = "0.1.0", path = "../polars-utils", optional = true }
rayon = "1.5"
regex = "1.4"
simdutf8 = "0.1"
Expand Down
16 changes: 13 additions & 3 deletions polars/polars-io/src/parquet/read_par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::io::parquet::read::{FileMetaData, MutStreamingIterator};
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::POOL;
use polars_utils::contention_pool::LowContentionPool;
use rayon::prelude::*;
use std::borrow::Cow;
use std::convert::TryFrom;
Expand Down Expand Up @@ -42,6 +43,12 @@ pub(crate) fn parallel_read<R: MmapBytesReader>(
let n_groups = file_metadata.row_groups.len();
let mut dfs = Vec::with_capacity(n_groups);

// we need to store two buffers to be reused, so the contention pool size
// is the number of threads * 3 b1, b2, and column_chunks
let pool_size = POOL.current_num_threads() * 2 + 1;

let cont_pool = LowContentionPool::<Vec<u8>>::new(pool_size);

for row_group in 0..n_groups {
let columns = POOL.install(|| {
parq_fields
Expand All @@ -57,14 +64,15 @@ pub(crate) fn parallel_read<R: MmapBytesReader>(
// create a new reader
let reader = Cursor::new(bytes);

let b1 = cont_pool.get();
// get compressed column pages
let mut columns = read::get_column_iterator(
reader,
&file_metadata,
row_group,
field_i,
None,
Vec::with_capacity(64),
b1,
);

let mut column_chunks = Vec::with_capacity(64);
Expand All @@ -81,8 +89,10 @@ pub(crate) fn parallel_read<R: MmapBytesReader>(
let columns = read::ReadColumnIterator::new(field.clone(), column_chunks);
let field = &arrow_schema.fields()[field_i];

let (arr, _b1, _b2_) =
read::column_iter_to_array(columns, field, Vec::with_capacity(64))?;
let b2 = cont_pool.get();
let (arr, b1, b2) = read::column_iter_to_array(columns, field, b2)?;
cont_pool.set(b1);
cont_pool.set(b2);
Series::try_from((field.name().as_str(), Arc::from(arr) as ArrayRef))
})
.collect::<Result<Vec<_>>>()
Expand Down
1 change: 1 addition & 0 deletions polars/polars-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
parking_lot = "0.11"
35 changes: 35 additions & 0 deletions polars/polars-utils/src/contention_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};

pub struct LowContentionPool<T> {
stack: Vec<Mutex<T>>,
size: AtomicUsize,
}

impl<T: Default> LowContentionPool<T> {
pub fn new(size: usize) -> Self {
let mut stack = Vec::with_capacity(size);
for _ in 0..size {
stack.push(Mutex::new(T::default()))
}
let size = AtomicUsize::new(size);

Self { stack, size }
}

pub fn get(&self) -> T {
let size = self.size.fetch_sub(1, Ordering::AcqRel);
// implementation error if this fails
assert!(size <= self.stack.len());
let mut locked = self.stack[size - 1].lock();
std::mem::take(&mut *locked)
}

pub fn set(&self, value: T) {
let size = self.size.fetch_add(1, Ordering::AcqRel);
// implementation error if this fails
assert!(size <= self.stack.len());
let mut locked = self.stack[size - 1].lock();
*locked = value;
}
}
1 change: 1 addition & 0 deletions polars/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod arena;
pub mod contention_pool;
mod error;
4 changes: 4 additions & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ced3286

Please sign in to comment.