Skip to content

Commit

Permalink
feat(rust, python): inversely scale chunk_size with thread count in s… (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 2, 2022
1 parent 9db4312 commit afb755b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 8 deletions.
8 changes: 4 additions & 4 deletions polars/polars-io/src/csv/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ use crate::prelude::NullValues;

pub(crate) fn get_file_chunks(
bytes: &[u8],
n_threads: usize,
n_chunks: usize,
expected_fields: usize,
delimiter: u8,
quote_char: Option<u8>,
eol_char: u8,
) -> Vec<(usize, usize)> {
let mut last_pos = 0;
let total_len = bytes.len();
let chunk_size = total_len / n_threads;
let mut offsets = Vec::with_capacity(n_threads);
for _ in 0..n_threads {
let chunk_size = total_len / n_chunks;
let mut offsets = Vec::with_capacity(n_chunks);
for _ in 0..n_chunks {
let search_pos = last_pos + chunk_size;

if search_pos >= bytes.len() {
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ impl<R: MmapBytesReader> ParquetReader<R> {
}

impl<R: MmapBytesReader + 'static> ParquetReader<R> {
pub fn batched(self) -> PolarsResult<BatchedParquetReader> {
pub fn batched(self, chunk_size: usize) -> PolarsResult<BatchedParquetReader> {
BatchedParquetReader::new(
Box::new(self.reader),
self.n_rows.unwrap_or(usize::MAX),
self.projection,
self.row_count,
chunk_size,
)
}
}
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ pub struct BatchedParquetReader {
n_row_groups: usize,
chunks_fifo: VecDeque<DataFrame>,
parallel: ParallelStrategy,
chunk_size: usize,
}

impl BatchedParquetReader {
Expand All @@ -324,6 +325,7 @@ impl BatchedParquetReader {
limit: usize,
projection: Option<Vec<usize>>,
row_count: Option<RowCount>,
chunk_size: usize,
) -> PolarsResult<Self> {
let metadata = read::read_metadata(&mut reader)?;
let schema = read::schema::infer_schema(&metadata)?;
Expand Down Expand Up @@ -360,6 +362,7 @@ impl BatchedParquetReader {
n_row_groups,
chunks_fifo: VecDeque::with_capacity(POOL.current_num_threads()),
parallel,
chunk_size,
})
}

Expand Down Expand Up @@ -408,7 +411,7 @@ impl BatchedParquetReader {

for mut df in dfs {
// make sure that the chunks are not too large
let n = df.shape().0 / 50_000;
let n = df.shape().0 / self.chunk_size;
if n > 1 {
for df in split_df(&mut df, n)? {
self.chunks_fifo.push_back(df)
Expand Down
6 changes: 5 additions & 1 deletion polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl CsvSource {
let schema_ref =
unsafe { std::mem::transmute::<&Schema, &'static Schema>(schema.as_ref()) };

// inversely scale the chunk size by the number of threads so that we reduce memory pressure
// in streaming
let chunk_size = std::cmp::max(CHUNK_SIZE * 12 / POOL.current_num_threads(), 10_000);

let reader = CsvReader::from_path(&path)
.unwrap()
.has_header(options.has_header)
Expand All @@ -59,7 +63,7 @@ impl CsvSource {
.with_end_of_line_char(options.eol_char)
.with_encoding(options.encoding)
.with_rechunk(options.rechunk)
.with_chunk_size(CHUNK_SIZE)
.with_chunk_size(chunk_size)
.with_row_count(options.row_count)
.with_parse_dates(options.parse_dates);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use polars_plan::prelude::ParquetOptions;
use polars_utils::IdxSize;

use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult};
use crate::CHUNK_SIZE;

pub struct ParquetSource {
batched_reader: BatchedParquetReader,
Expand All @@ -30,11 +31,15 @@ impl ParquetSource {
});

let file = std::fs::File::open(path).unwrap();

// inversely scale the chunk size by the number of threads so that we reduce memory pressure
// in streaming
let chunk_size = std::cmp::max(CHUNK_SIZE * 12 / POOL.current_num_threads(), 10_000);
let batched_reader = ParquetReader::new(file)
.with_n_rows(options.n_rows)
.with_row_count(options.row_count)
.with_projection(projection)
.batched()?;
.batched(chunk_size)?;

Ok(ParquetSource {
batched_reader,
Expand Down

0 comments on commit afb755b

Please sign in to comment.