Skip to content

Commit

Permalink
Reorder read csv args (#2026)
Browse files Browse the repository at this point in the history
* Rename "has_headers" to "has_header" and "stop_after_n_rows" to "n_rows".

* Remove "projection" as keyword and add functionality to "columns".

* Add "has_headers" to "has_header" and "stop_after_n_rows" to "n_rows" argument mapping for renamed arguments.
  • Loading branch information
ghuls committed Dec 11, 2021
1 parent 365bbe9 commit 04aa99c
Show file tree
Hide file tree
Showing 26 changed files with 490 additions and 438 deletions.
2 changes: 1 addition & 1 deletion polars/benches/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn prepare_reader() -> Result<CsvReader<'static, File>> {
let path =
std::env::var("CSV_SRC").expect("env var CSV_SRC pointing to the csv_file is not set");

Ok(CsvReader::from_path(&path)?.with_stop_after_n_rows(Some(10000)))
Ok(CsvReader::from_path(&path)?.with_n_rows(Some(10000)))
}

fn csv_parsing_benchmark(c: &mut Criterion) {
Expand Down
2 changes: 1 addition & 1 deletion polars/benches/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ lazy_static! {
let mut df = CsvReader::from_path(&path)
.expect("could not read file")
// 1M rows
.with_stop_after_n_rows(Some(1000000))
.with_n_rows(Some(1000000))
.finish()
.unwrap();
df.may_apply("id1", |s| s.cast(&DataType::Categorical))
Expand Down
12 changes: 6 additions & 6 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where
/// Aggregates chunk afterwards to a single chunk.
rechunk: bool,
/// Stop reading from the csv after this number of rows is reached
stop_after_n_rows: Option<usize>,
n_rows: Option<usize>,
// used by error ignore logic
max_records: Option<usize>,
skip_rows: usize,
Expand Down Expand Up @@ -246,8 +246,8 @@ where

/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
pub fn with_stop_after_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.stop_after_n_rows = num_rows;
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}

Expand Down Expand Up @@ -415,7 +415,7 @@ where
CsvReader {
reader,
rechunk: true,
stop_after_n_rows: None,
n_rows: None,
max_records: Some(128),
skip_rows: 0,
projection: None,
Expand Down Expand Up @@ -490,7 +490,7 @@ where
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let mut csv_reader = CoreReader::new(
reader_bytes,
self.stop_after_n_rows,
self.n_rows,
self.skip_rows,
self.projection,
self.max_records,
Expand Down Expand Up @@ -523,7 +523,7 @@ where
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let mut csv_reader = CoreReader::new(
reader_bytes,
self.stop_after_n_rows,
self.n_rows,
self.skip_rows,
self.projection,
self.max_records,
Expand Down
18 changes: 6 additions & 12 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct IpcReader<R> {
reader: R,
/// Aggregates chunks afterwards to a single chunk.
rechunk: bool,
stop_after_n_rows: Option<usize>,
n_rows: Option<usize>,
projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
}
Expand All @@ -81,8 +81,8 @@ impl<R: Read + Seek> IpcReader<R> {
Ok(metadata.schema().clone())
}
/// Stop reading when `n` rows are read.
pub fn with_stop_after_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.stop_after_n_rows = num_rows;
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}

Expand Down Expand Up @@ -119,13 +119,7 @@ impl<R: Read + Seek> IpcReader<R> {
}),
);

finish_reader(
reader,
rechunk,
self.stop_after_n_rows,
predicate,
aggregate,
)
finish_reader(reader, rechunk, self.n_rows, predicate, aggregate)
}
}

Expand All @@ -150,7 +144,7 @@ where
IpcReader {
reader,
rechunk: true,
stop_after_n_rows: None,
n_rows: None,
columns: None,
projection: None,
}
Expand Down Expand Up @@ -199,7 +193,7 @@ where
}

let ipc_reader = read::FileReader::new(&mut self.reader, metadata, self.projection);
finish_reader(ipc_reader, rechunk, self.stop_after_n_rows, None, None)
finish_reader(ipc_reader, rechunk, self.n_rows, None, None)
}
}

Expand Down
10 changes: 5 additions & 5 deletions polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ pub trait ArrowReader {
pub(crate) fn finish_reader<R: ArrowReader>(
mut reader: R,
rechunk: bool,
stop_after_n_rows: Option<usize>,
n_rows: Option<usize>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
) -> Result<DataFrame> {
use polars_core::utils::accumulate_dataframes_vertical;
use std::convert::TryFrom;

let mut n_rows = 0;
let mut num_rows = 0;
let mut parsed_dfs = Vec::with_capacity(1024);

while let Some(batch) = reader.next_record_batch()? {
n_rows += batch.num_rows();
num_rows += batch.num_rows();

let mut df = DataFrame::try_from(batch)?;

Expand All @@ -102,8 +102,8 @@ pub(crate) fn finish_reader<R: ArrowReader>(
}

parsed_dfs.push(df);
if let Some(n) = stop_after_n_rows {
if n_rows >= n {
if let Some(n) = n_rows {
if num_rows >= n {
break;
}
}
Expand Down
22 changes: 8 additions & 14 deletions polars/polars-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::sync::Arc;
pub struct ParquetReader<R: Read + Seek> {
reader: R,
rechunk: bool,
stop_after_n_rows: Option<usize>,
n_rows: Option<usize>,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
}
Expand All @@ -56,24 +56,18 @@ where
let reader = read::RecordReader::try_new(
&mut self.reader,
projection.map(|x| x.to_vec()),
self.stop_after_n_rows,
self.n_rows,
None,
None,
)?;

finish_reader(
reader,
rechunk,
self.stop_after_n_rows,
predicate,
aggregate,
)
finish_reader(reader, rechunk, self.n_rows, predicate, aggregate)
}

/// Stop parsing when `n` rows are parsed. By settings this parameter the csv will be parsed
/// sequentially.
pub fn with_stop_after_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.stop_after_n_rows = num_rows;
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}

Expand Down Expand Up @@ -116,7 +110,7 @@ where
ParquetReader {
reader,
rechunk: false,
stop_after_n_rows: None,
n_rows: None,
columns: None,
projection: None,
}
Expand Down Expand Up @@ -145,11 +139,11 @@ where
let reader = read::RecordReader::try_new(
&mut self.reader,
self.projection,
self.stop_after_n_rows,
self.n_rows,
None,
None,
)?;
finish_reader(reader, rechunk, self.stop_after_n_rows, None, None)
finish_reader(reader, rechunk, self.n_rows, None, None)
}
}

Expand Down
14 changes: 5 additions & 9 deletions polars/polars-lazy/src/datafusion/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn to_datafusion_lp(lp: LogicalPlan) -> Result<DLogicalPlan> {
delimiter,
ignore_errors,
skip_rows,
stop_after_n_rows,
n_rows,
..
} => {
let schema = schema.to_arrow();
Expand All @@ -300,24 +300,20 @@ pub fn to_datafusion_lp(lp: LogicalPlan) -> Result<DLogicalPlan> {
.delimiter(delimiter)
.schema(&schema);
if ignore_errors || skip_rows > 0 {
return Err(PolarsError::ComputeError("DataFusion does not support `ignore_errors`, `skip_rows`, `stop_after_n_rows`, `with_columns`".into()));
return Err(PolarsError::ComputeError("DataFusion does not support `ignore_errors`, `skip_rows`, `n_rows`, `with_columns`".into()));
}
let builder =
LogicalPlanBuilder::scan_csv(try_path_to_str(&path)?, options, None).unwrap();
match stop_after_n_rows {
match n_rows {
Some(n) => builder.limit(n).unwrap().build().unwrap(),
None => builder.build().unwrap(),
}
}
#[cfg(feature = "parquet")]
ParquetScan {
path,
stop_after_n_rows,
..
} => {
ParquetScan { path, n_rows, .. } => {
let builder =
LogicalPlanBuilder::scan_parquet(try_path_to_str(&path)?, None, 8).unwrap();
match stop_after_n_rows {
match n_rows {
Some(n) => builder.limit(n).unwrap().build().unwrap(),
None => builder.build().unwrap(),
}
Expand Down
22 changes: 9 additions & 13 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct LazyCsvReader<'a> {
has_header: bool,
ignore_errors: bool,
skip_rows: usize,
stop_after_n_rows: Option<usize>,
n_rows: Option<usize>,
cache: bool,
schema: Option<SchemaRef>,
schema_overwrite: Option<&'a Schema>,
Expand All @@ -59,7 +59,7 @@ impl<'a> LazyCsvReader<'a> {
has_header: true,
ignore_errors: false,
skip_rows: 0,
stop_after_n_rows: None,
n_rows: None,
cache: true,
schema: None,
schema_overwrite: None,
Expand All @@ -73,8 +73,8 @@ impl<'a> LazyCsvReader<'a> {

/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
pub fn with_stop_after_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.stop_after_n_rows = num_rows;
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}

Expand Down Expand Up @@ -184,7 +184,7 @@ impl<'a> LazyCsvReader<'a> {
self.has_header,
self.ignore_errors,
self.skip_rows,
self.stop_after_n_rows,
self.n_rows,
self.cache,
self.schema,
self.schema_overwrite,
Expand Down Expand Up @@ -295,12 +295,8 @@ impl LazyFrame {

/// Create a LazyFrame directly from a parquet scan.
#[cfg(feature = "parquet")]
pub fn scan_parquet(
path: String,
stop_after_n_rows: Option<usize>,
cache: bool,
) -> Result<Self> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(path, stop_after_n_rows, cache)?
pub fn scan_parquet(path: String, n_rows: Option<usize>, cache: bool) -> Result<Self> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(path, n_rows, cache)?
.build()
.into();
lf.opt_state.agg_scan_projection = true;
Expand All @@ -309,9 +305,9 @@ impl LazyFrame {

/// Create a LazyFrame directly from a ipc scan.
#[cfg(feature = "ipc")]
pub fn scan_ipc(path: String, stop_after_n_rows: Option<usize>, cache: bool) -> Result<Self> {
pub fn scan_ipc(path: String, n_rows: Option<usize>, cache: bool) -> Result<Self> {
let options = IpcOptions {
stop_after_n_rows,
n_rows,
cache,
with_columns: None,
};
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub enum ALogicalPlan {
with_columns: Option<Vec<String>>,
predicate: Option<Node>,
aggregate: Vec<Node>,
stop_after_n_rows: Option<usize>,
n_rows: Option<usize>,
cache: bool,
},
DataFrameScan {
Expand Down Expand Up @@ -382,7 +382,7 @@ impl ALogicalPlan {
output_schema,
with_columns,
predicate,
stop_after_n_rows,
n_rows,
cache,
..
} => {
Expand All @@ -398,7 +398,7 @@ impl ALogicalPlan {
with_columns: with_columns.clone(),
predicate: new_predicate,
aggregate: exprs,
stop_after_n_rows: *stop_after_n_rows,
n_rows: *n_rows,
cache: *cache,
}
}
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub(crate) fn to_alp(
with_columns,
predicate,
aggregate,
stop_after_n_rows,
n_rows,
cache,
} => ALogicalPlan::ParquetScan {
path,
Expand All @@ -240,7 +240,7 @@ pub(crate) fn to_alp(
.into_iter()
.map(|expr| to_aexpr(expr, expr_arena))
.collect(),
stop_after_n_rows,
n_rows,
cache,
},
LogicalPlan::DataFrameScan {
Expand Down Expand Up @@ -695,15 +695,15 @@ pub(crate) fn node_to_lp(
with_columns,
predicate,
aggregate,
stop_after_n_rows,
n_rows,
cache,
} => LogicalPlan::ParquetScan {
path,
schema,
with_columns,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
stop_after_n_rows,
n_rows,
cache,
},
ALogicalPlan::DataFrameScan {
Expand Down

0 comments on commit 04aa99c

Please sign in to comment.