Skip to content

Commit

Permalink
parquet reader from core parts
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 22, 2021
1 parent ade3670 commit f93c66f
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 8 deletions.
1 change: 1 addition & 0 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ thiserror = "^1.0"
[features]
strings = []
compute = ["arrow/compute_cast"]
parquet = ["arrow/io_parquet", "arrow/io_parquet_compression"]
5 changes: 5 additions & 0 deletions polars/polars-arrow/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#[cfg(feature = "parquet")]
mod parquet;

#[cfg(feature = "parquet")]
pub use parquet::read_parquet;
63 changes: 63 additions & 0 deletions polars/polars-arrow/src/io/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use arrow::datatypes::Schema;
use arrow::io::parquet::read::FileMetaData;
use arrow::{array::ArrayRef, error::Result, io::parquet::read};
use std::borrow::Cow;
use std::io::{Read, Seek};
use std::sync::Arc;

pub fn read_parquet<R: Read + Seek>(
mut reader: R,
limit: usize,
projection: Option<&[usize]>,
schema: &Schema,
metadata: Option<FileMetaData>,
) -> Result<Vec<Vec<ArrayRef>>> {
let metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader))?;
let row_group_len = metadata.row_groups.len();

let projection = projection
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned((0usize..schema.fields().len()).collect::<Vec<_>>()));

let mut rb = Vec::with_capacity(row_group_len);

let mut buf_1 = Vec::with_capacity(1024);
let mut buf_2 = Vec::with_capacity(1024);

let mut remaining_rows = limit;

for rg in 0..row_group_len {
let arrs = projection
.clone()
.iter()
.map(|column_i| {
let b1 = std::mem::take(&mut buf_1);
let b2 = std::mem::take(&mut buf_2);

// the get_column_iterator is an iterator of columns, each column contains compressed pages.
// get_column_iterator yields `Vec<Vec<CompressedPage>>`:
// outer `Vec` is len 1 for primitive types,
// inner `Vec` is whatever number of pages the chunk contains.
let column_iter =
read::get_column_iterator(&mut reader, &metadata, rg, *column_i, None, b1);
let fld = schema.field(*column_i);
let (mut array, b1, b2) = read::column_iter_to_array(column_iter, fld, b2)?;

if array.len() > remaining_rows {
array = array.slice(0, remaining_rows);
}
remaining_rows -= array.len();

buf_1 = b1;
buf_2 = b2;

Ok(Arc::from(array))
})
.collect::<Result<Vec<_>>>()?;
rb.push(arrs)
}

Ok(rb)
}
1 change: 1 addition & 0 deletions polars/polars-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod bit_util;
pub mod compute;
pub mod error;
pub mod index;
pub mod io;
pub mod is_valid;
pub mod kernels;
pub mod prelude;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ dtype-u8 = []
dtype-u16 = []
dtype-categorical = []

parquet = ["arrow/io_parquet"]
parquet = ["arrow/io_parquet", "polars-arrow/parquet"]

docs-selection = [
"ndarray",
Expand Down
30 changes: 23 additions & 7 deletions polars/polars-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use arrow::io::parquet::{
read,
write::{self, *},
};
use polars_arrow::io::read_parquet;
use polars_core::prelude::*;
use rayon::prelude::*;
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::io::{Read, Seek, Write};
use std::sync::Arc;

Expand Down Expand Up @@ -122,7 +124,6 @@ where
}

fn finish(mut self) -> Result<DataFrame> {
let rechunk = self.rechunk;
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::get_schema(&metadata)?;

Expand All @@ -136,14 +137,28 @@ where
self.projection = Some(prj);
}

let reader = read::RecordReader::try_new(
let chunks = read_parquet(
&mut self.reader,
self.projection,
self.n_rows,
None,
None,
self.n_rows.unwrap_or(usize::MAX),
self.projection.as_deref(),
&schema,
Some(metadata),
)?;
finish_reader(reader, rechunk, self.n_rows, None, None)
let mut df = accumulate_dataframes_vertical(chunks.into_iter().map(|cols| {
DataFrame::new_no_checks(
cols.into_iter()
.enumerate()
.map(|(i, arr)| {
Series::try_from((schema.field(i).name().as_str(), arr)).unwrap()
})
.collect(),
)
}))?;
if self.rechunk {
df.rechunk();
}

Ok(df)
}
}

Expand Down Expand Up @@ -185,6 +200,7 @@ pub struct ParquetWriter<W> {
compression: write::Compression,
}

use polars_core::utils::accumulate_dataframes_vertical;
pub use write::Compression as ParquetCompression;

impl<W> ParquetWriter<W>
Expand Down

0 comments on commit f93c66f

Please sign in to comment.