Skip to content

Commit

Permalink
Parquet writing: reduce heap allocs (#3879)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 4, 2022
1 parent 665edbb commit 585c10c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 57 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f57dbd5dbc61b940a71decd5f81d0fd4c93b158d", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "arity_assign", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false, features = ["compute_concatenate"] }
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ thiserror = "^1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "f57dbd5dbc61b940a71decd5f81d0fd4c93b158d"
rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe"
# path = "../../../arrow2"
# branch = "arity_assign"
# version = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f57dbd5dbc61b940a71decd5f81d0fd4c93b158d", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "f5f6b7e3aa10b80dc574abacf96b30e0927410fe", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "arity_assign", default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false }
Expand Down
74 changes: 21 additions & 53 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use super::ArrowResult;
use arrow::array::Array;
use arrow::chunk::Chunk;
use arrow::datatypes::DataType as ArrowDataType;
Expand All @@ -9,40 +8,11 @@ use arrow::io::parquet::write::{self, FileWriter, *};
use arrow::io::parquet::write::{DynIter, DynStreamingIterator, Encoding};
use polars_core::prelude::*;
use rayon::prelude::*;
use std::collections::VecDeque;
use std::io::Write;

use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df};
pub use write::{BrotliLevel, CompressionOptions as ParquetCompression, GzipLevel, ZstdLevel};

struct Bla {
columns: VecDeque<CompressedPage>,
current: Option<CompressedPage>,
}

impl Bla {
pub fn new(columns: VecDeque<CompressedPage>) -> Self {
Self {
columns,
current: None,
}
}
}

impl FallibleStreamingIterator for Bla {
type Item = CompressedPage;
type Error = ArrowError;

fn advance(&mut self) -> ArrowResult<()> {
self.current = self.columns.pop_front();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
self.current.as_ref()
}
}

/// Write a DataFrame to parquet format
///
/// # Example
Expand Down Expand Up @@ -128,12 +98,8 @@ where
let row_group_iter = rb_iter.filter_map(|batch| match batch.len() {
0 => None,
_ => {
let row_group = create_serializer(
batch,
parquet_schema.fields().to_vec(),
encodings.clone(),
options,
);
let row_group =
create_serializer(batch, parquet_schema.fields().to_vec(), &encodings, options);

Some(row_group)
}
Expand All @@ -152,38 +118,40 @@ where
fn create_serializer(
batch: Chunk<Box<dyn Array>>,
fields: Vec<ParquetType>,
encodings: Vec<Vec<Encoding>>,
encodings: &[Vec<Encoding>],
options: WriteOptions,
) -> std::result::Result<RowGroupIter<'static, ArrowError>, ArrowError> {
let columns = batch
.columns()
.par_iter()
.zip(fields)
.zip(encodings)
.flat_map(move |((array, type_), encoding)| {
.map(move |((array, type_), encoding)| {
let encoded_columns = array_to_columns(array, type_, options, encoding).unwrap();

encoded_columns
.into_iter()
.map(|encoded_pages| {
let encoded_pages = DynIter::new(
encoded_pages
.into_iter()
.map(|x| x.map_err(|e| ParquetError::General(e.to_string()))),
// iterator over pages
let pages = DynStreamingIterator::new(
Compressor::new_from_vec(
encoded_pages.map(|result| {
result.map_err(|e| ParquetError::General(format!("{}", e)))
}),
options.compression,
vec![],
)
.map_err(|e| ArrowError::External(format!("{}", e), Box::new(e))),
);
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<ArrowResult<VecDeque<_>>>()

Ok(pages)
})
.collect::<Vec<_>>()
})
.collect::<ArrowResult<Vec<VecDeque<CompressedPage>>>>()?;
.flatten()
.collect::<Vec<_>>();

let row_group = DynIter::new(columns.into_iter());

let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
Ok(row_group)
}
2 changes: 1 addition & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 585c10c

Please sign in to comment.