Skip to content

Commit

Permalink
row_count at scan for ipc and parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 9, 2022
1 parent 947d6a3 commit f73e8fe
Show file tree
Hide file tree
Showing 25 changed files with 393 additions and 90 deletions.
2 changes: 1 addition & 1 deletion polars/polars-io/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where
schema.clone().fields,
);

finish_reader(avro_reader, rechunk, None, None, None, &schema)
finish_reader(avro_reader, rechunk, None, None, None, &schema, None)
}
}

Expand Down
29 changes: 27 additions & 2 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct IpcReader<R> {
n_rows: Option<usize>,
projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
row_count: Option<RowCount>,
}

impl<R: Read + Seek> IpcReader<R> {
Expand All @@ -93,6 +94,12 @@ impl<R: Read + Seek> IpcReader<R> {
self
}

/// Add a `row_count` column.
pub fn with_row_count(mut self, row_count: Option<RowCount>) -> Self {
self.row_count = row_count;
self
}

/// Set the reader's column projection. This counts from 0, meaning that
/// `vec![0, 4]` would select the 1st and 5th column.
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
Expand Down Expand Up @@ -124,7 +131,15 @@ impl<R: Read + Seek> IpcReader<R> {

let reader = read::FileReader::new(&mut self.reader, metadata, projection);

finish_reader(reader, rechunk, self.n_rows, predicate, aggregate, &schema)
finish_reader(
reader,
rechunk,
self.n_rows,
predicate,
aggregate,
&schema,
self.row_count,
)
}
}

Expand All @@ -148,6 +163,7 @@ where
n_rows: None,
columns: None,
projection: None,
row_count: None,
}
}

Expand Down Expand Up @@ -203,7 +219,15 @@ where
};

let ipc_reader = read::FileReader::new(&mut self.reader, metadata, self.projection);
finish_reader(ipc_reader, rechunk, self.n_rows, None, None, &schema)
finish_reader(
ipc_reader,
rechunk,
self.n_rows,
None,
None,
&schema,
self.row_count,
)
}
}

Expand Down Expand Up @@ -232,6 +256,7 @@ pub struct IpcWriter<W> {
}

use crate::aggregations::ScanAggregation;
use crate::RowCount;
use polars_core::frame::ArrowChunk;
pub use write::Compression as IpcCompression;

Expand Down
8 changes: 7 additions & 1 deletion polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,22 @@ pub(crate) fn finish_reader<R: ArrowReader>(
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
arrow_schema: &ArrowSchema,
row_count: Option<RowCount>,
) -> Result<DataFrame> {
use polars_core::utils::accumulate_dataframes_vertical;

let mut num_rows = 0;
let mut parsed_dfs = Vec::with_capacity(1024);

while let Some(batch) = reader.next_record_batch()? {
let current_num_rows = num_rows as u32;
num_rows += batch.len();

let mut df = DataFrame::try_from((batch, arrow_schema.fields.as_slice()))?;

if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(current_num_rows + rc.offset));
}

if let Some(predicate) = &predicate {
let s = predicate.evaluate(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");
Expand All @@ -122,6 +127,7 @@ pub(crate) fn finish_reader<R: ArrowReader>(
apply_aggregations(&mut df, aggregate)?;

parsed_dfs.push(df);

if let Some(n) = n_rows {
if num_rows >= n {
break;
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::mmap::MmapBytesReader;
use crate::parquet::read_impl::read_parquet;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::RowCount;
use arrow::io::parquet::read;
use polars_core::prelude::*;
use std::io::{Read, Seek};
Expand All @@ -17,6 +18,7 @@ pub struct ParquetReader<R: Read + Seek> {
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
parallel: bool,
row_count: Option<RowCount>,
}

impl<R: MmapBytesReader> ParquetReader<R> {
Expand All @@ -42,6 +44,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
predicate,
aggregate,
self.parallel,
self.row_count,
)
.map(|mut df| {
if rechunk {
Expand Down Expand Up @@ -77,6 +80,12 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

/// Add a `row_count` column.
pub fn with_row_count(mut self, row_count: Option<RowCount>) -> Self {
self.row_count = row_count;
self
}

pub fn schema(mut self) -> Result<Schema> {
let metadata = read::read_metadata(&mut self.reader)?;

Expand All @@ -94,6 +103,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
columns: None,
projection: None,
parallel: true,
row_count: None,
}
}

Expand Down Expand Up @@ -125,6 +135,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
None,
None,
self.parallel,
self.row_count,
)
.map(|mut df| {
if self.rechunk {
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::predicates::collect_statistics;
use crate::predicates::{apply_predicate, arrow_schema_to_empty_df, PhysicalIoExpr};
use crate::utils::apply_projection;
use crate::RowCount;
use arrow::io::parquet::read;
use arrow::io::parquet::read::{to_deserializer, FileMetaData};
use polars_core::prelude::*;
Expand All @@ -25,6 +26,7 @@ pub fn read_parquet<R: MmapBytesReader>(
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
parallel: bool,
row_count: Option<RowCount>,
) -> Result<DataFrame> {
let reader = ReaderBytes::from(&reader);
let bytes = reader.deref();
Expand All @@ -43,14 +45,17 @@ pub fn read_parquet<R: MmapBytesReader>(

let mut remaining_rows = limit;

let mut previous_row_count = 0;
for rg in 0..row_group_len {
let md = &file_metadata.row_groups[rg];
let current_row_count = md.num_rows() as u32;
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md.columns(), schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
previous_row_count += current_row_count;
continue;
} else if !matches!(should_read, Err(PolarsError::NotFound(_))) {
let _ = should_read?;
Expand Down Expand Up @@ -102,10 +107,14 @@ pub fn read_parquet<R: MmapBytesReader>(
remaining_rows = file_metadata.row_groups[rg].num_rows() as usize;

let mut df = DataFrame::new_no_checks(columns);
if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(previous_row_count + rc.offset));
}

apply_predicate(&mut df, predicate.as_deref())?;
apply_aggregations(&mut df, aggregate)?;

previous_row_count += current_row_count;
dfs.push(df)
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ impl<'a> LazyCsvReader<'a> {

/// Add a `row_count` column.
#[must_use]
pub fn with_row_count(mut self, rc: Option<RowCount>) -> Self {
self.row_count = rc;
pub fn with_row_count(mut self, row_count: Option<RowCount>) -> Self {
self.row_count = row_count;
self
}

Expand Down
23 changes: 17 additions & 6 deletions polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::functions::concat;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::RowCount;

#[derive(Copy, Clone)]
#[derive(Clone)]
pub struct ScanArgsIpc {
pub n_rows: Option<usize>,
pub cache: bool,
pub rechunk: bool,
pub row_count: Option<RowCount>,
}

impl Default for ScanArgsIpc {
Expand All @@ -15,16 +17,19 @@ impl Default for ScanArgsIpc {
n_rows: None,
cache: true,
rechunk: true,
row_count: None,
}
}
}

impl LazyFrame {
fn scan_ipc_impl(path: String, args: ScanArgsIpc) -> Result<Self> {
let options = LpScanOptions {
dbg!(&args.row_count);
let options = IpcScanOptions {
n_rows: args.n_rows,
cache: args.cache,
with_columns: None,
row_count: args.row_count,
};
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path, options)?.build().into();
lf.opt_state.agg_scan_projection = true;
Expand All @@ -41,18 +46,24 @@ impl LazyFrame {
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{}", e).into()))?;
let path_string = path.to_string_lossy().into_owned();
let mut args = args.clone();
args.row_count = None;
Self::scan_ipc_impl(path_string, args)
})
.collect::<Result<Vec<_>>>()?;

concat(&lfs, args.rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|lf| {
.map(|mut lf| {
if let Some(n_rows) = args.n_rows {
lf.slice(0, n_rows as u32)
} else {
lf
lf = lf.slice(0, n_rows as u32);
};

if let Some(rc) = args.row_count {
lf = lf.with_row_count(&rc.name, Some(rc.offset))
}

lf
})
} else {
Self::scan_ipc_impl(path, args)
Expand Down
20 changes: 20 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,10 @@ impl LazyFrame {
}

/// Add a new column at index 0 that counts the rows.
///
/// # Warning
/// This can have a negative effect on query performance.
/// This may for instance block predicate pushdown optimization.
pub fn with_row_count(mut self, name: &str, offset: Option<u32>) -> LazyFrame {
match &mut self.logical_plan {
// Do the row count at scan
Expand All @@ -1003,6 +1007,22 @@ impl LazyFrame {
});
self
}
#[cfg(feature = "ipc")]
LogicalPlan::IpcScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
self
}
#[cfg(feature = "parquet")]
LogicalPlan::ParquetScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
self
}
_ => {
let schema = self.schema();

Expand Down
30 changes: 19 additions & 11 deletions polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::functions::concat;
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::RowCount;

#[derive(Copy, Clone)]
#[derive(Clone)]
pub struct ScanArgsParquet {
pub n_rows: Option<usize>,
pub cache: bool,
pub parallel: bool,
pub rechunk: bool,
pub row_count: Option<RowCount>,
}

impl Default for ScanArgsParquet {
Expand All @@ -17,6 +19,7 @@ impl Default for ScanArgsParquet {
cache: true,
parallel: true,
rechunk: true,
row_count: None,
}
}
}
Expand All @@ -27,10 +30,12 @@ impl LazyFrame {
n_rows: Option<usize>,
cache: bool,
parallel: bool,
row_count: Option<RowCount>,
) -> Result<Self> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(path, n_rows, cache, parallel)?
.build()
.into();
let mut lf: LazyFrame =
LogicalPlanBuilder::scan_parquet(path, n_rows, cache, parallel, row_count)?
.build()
.into();
lf.opt_state.agg_scan_projection = true;
Ok(lf)
}
Expand All @@ -45,21 +50,24 @@ impl LazyFrame {
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{}", e).into()))?;
let path_string = path.to_string_lossy().into_owned();
Self::scan_parquet_impl(path_string, args.n_rows, args.cache, false)
Self::scan_parquet_impl(path_string, args.n_rows, args.cache, false, None)
})
.collect::<Result<Vec<_>>>()?;

concat(&lfs, args.rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|lf| {
.map(|mut lf| {
if let Some(n_rows) = args.n_rows {
lf.slice(0, n_rows as u32)
} else {
lf
}
lf = lf.slice(0, n_rows as u32)
};

if let Some(rc) = args.row_count {
lf = lf.with_row_count(&rc.name, Some(rc.offset))
};
lf
})
} else {
Self::scan_parquet_impl(path, args.n_rows, args.cache, args.parallel)
Self::scan_parquet_impl(path, args.n_rows, args.cache, args.parallel, args.row_count)
}
}
}

0 comments on commit f73e8fe

Please sign in to comment.