Skip to content

Commit

Permalink
update arrow with parquet fixes (#2564)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 6, 2022
1 parent ccb0216 commit 6f08d15
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 280 deletions.
4 changes: 2 additions & 2 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b46a636f31d70a20bd54df9c7f9e9363053ade08", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "cherry_pick", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "0175d9e3c54f6c3c81e3a6db3f1c39b4ba3b93fe", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "cherry_pick", default-features = false }
# arrow = { package = "arrow2", version = "0.9", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ unsafe_unwrap = "^0.1.0"

[dependencies.arrow]
package = "arrow2"
# git = "https://github.com/jorgecarleitao/arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "b46a636f31d70a20bd54df9c7f9e9363053ade08"
branch = "cherry_pick"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "0175d9e3c54f6c3c81e3a6db3f1c39b4ba3b93fe"
# branch = "cherry_pick"
# version = "0.9"
default-features = false
features = [
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ json = ["arrow/io_json"]
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# ipc = []
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "polars-utils"]
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "polars-utils", "memmap"]
dtype-datetime = ["polars-core/dtype-datetime", "polars-core/temporal"]
dtype-date = ["polars-core/dtype-date"]
dtype-time = ["polars-core/dtype-time", "polars-core/temporal"]
Expand All @@ -31,8 +31,8 @@ private = []
[dependencies]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "b46a636f31d70a20bd54df9c7f9e9363053ade08", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "cherry_pick", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "0175d9e3c54f6c3c81e3a6db3f1c39b4ba3b93fe", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "cherry_pick", default-features = false }
# arrow = { package = "arrow2", version = "0.9", default-features = false }
csv-core = { version = "0.1.10", optional = true }
dirs = "4.0"
Expand Down
7 changes: 2 additions & 5 deletions polars/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,8 @@ pub(crate) fn collect_statistics(
let mut fields = vec![];
let mut stats = vec![];

for (column_chunk_md, fld) in md.iter().zip(&schema.fields) {
if let Some(parquet_stats) = column_chunk_md.statistics() {
let parquet_stats = parquet_stats?;
let st = deserialize_statistics(&*parquet_stats)?;

for fld in &schema.fields {
for st in deserialize_statistics(fld, md)?.into_iter().flatten() {
fields.push(fld.into());
stats.push(ColumnStats(st));
}
Expand Down
59 changes: 15 additions & 44 deletions polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use super::{ArrowReader, ArrowResult};
use crate::aggregations::ScanAggregation;
use crate::mmap::MmapBytesReader;
use crate::parquet::read_impl::{parallel_read, read_parquet};
use crate::parquet::read_impl::read_parquet;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use arrow::io::parquet::read;
use polars_core::frame::ArrowChunk;
use polars_core::prelude::*;
use std::io::{Read, Seek};
use std::sync::Arc;
Expand All @@ -32,22 +30,18 @@ impl<R: MmapBytesReader> ParquetReader<R> {
) -> Result<DataFrame> {
// this path takes predicates and parallelism into account
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::get_schema(&metadata)?;

let f = match self.parallel {
true => parallel_read,
false => read_parquet,
};
let schema = read::schema::infer_schema(&metadata)?;

let rechunk = self.rechunk;
f(
read_parquet(
self.reader,
self.n_rows.unwrap_or(usize::MAX),
projection,
&schema,
Some(metadata),
predicate,
aggregate,
self.parallel,
)
.map(|mut df| {
if rechunk {
Expand Down Expand Up @@ -86,17 +80,11 @@ impl<R: MmapBytesReader> ParquetReader<R> {
pub fn schema(mut self) -> Result<Schema> {
let metadata = read::read_metadata(&mut self.reader)?;

let schema = read::get_schema(&metadata)?;
let schema = read::infer_schema(&metadata)?;
Ok(schema.into())
}
}

impl<R: Read + Seek> ArrowReader for read::RecordReader<R> {
fn next_record_batch(&mut self) -> ArrowResult<Option<ArrowChunk>> {
self.next().map_or(Ok(None), |v| v.map(Some))
}
}

impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
fn new(reader: R) -> Self {
ParquetReader {
Expand All @@ -116,7 +104,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {

fn finish(mut self) -> Result<DataFrame> {
let metadata = read::read_metadata(&mut self.reader)?;
let schema = read::schema::get_schema(&metadata)?;
let schema = read::schema::infer_schema(&metadata)?;

if let Some(cols) = self.columns {
let mut prj = Vec::with_capacity(cols.len());
Expand All @@ -128,38 +116,21 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
self.projection = Some(prj);
}

if self.parallel {
let rechunk = self.rechunk;
return parallel_read(
self.reader,
self.n_rows.unwrap_or(usize::MAX),
self.projection.as_deref(),
&schema,
Some(metadata),
None,
None,
)
.map(|mut df| {
if rechunk {
df.rechunk();
};
df
});
}

let mut df = read_parquet(
read_parquet(
self.reader,
self.n_rows.unwrap_or(usize::MAX),
self.projection.as_deref(),
&schema,
Some(metadata),
None,
None,
)?;
if self.rechunk {
df.rechunk();
}

Ok(df)
self.parallel,
)
.map(|mut df| {
if self.rechunk {
df.rechunk();
}
df
})
}
}

0 comments on commit 6f08d15

Please sign in to comment.