Skip to content

Commit

Permalink
row_count in csv reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 8, 2022
1 parent b2cb777 commit f839325
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 122 deletions.
11 changes: 11 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,17 @@ impl DataFrame {
DataFrame::new(columns)
}

/// Add a row count in place.
pub fn with_row_count_mut(&mut self, name: &str, offset: Option<u32>) -> &mut Self {
let offset = offset.unwrap_or(0);
self.columns.insert(
0,
UInt32Chunked::from_vec(name, (offset..(self.height() as u32) + offset).collect())
.into_series(),
);
self
}

/// Create a new `DataFrame` but does not check the length or duplicate occurrence of the `Series`.
///
/// It is advised to use [Series::new](Series::new) in favor of this method.
Expand Down
21 changes: 12 additions & 9 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,8 @@ where
}

/// Add a `row_count` column.
pub fn with_row_count(mut self, name: Option<&str>, offset: u32) -> Self {
let rc = RowCount {
name: name.unwrap_or("row_count").to_string(),
offset,
};
self.row_count = Some(rc);
pub fn with_row_count(mut self, rc: Option<RowCount>) -> Self {
self.row_count = rc;
self
}

Expand Down Expand Up @@ -453,7 +449,7 @@ where
schema_overwrite: None,
dtype_overwrite: None,
sample_size: 1024,
chunk_size: 1 << 16,
chunk_size: 1 << 18,
low_memory: false,
comment_char: None,
null_values: None,
Expand Down Expand Up @@ -643,6 +639,7 @@ fn parse_dates(df: DataFrame, fixed_schema: &Schema) -> DataFrame {
#[cfg(test)]
mod test {
use crate::prelude::*;
use crate::RowCount;
use polars_core::datatypes::AnyValue;
use polars_core::prelude::*;
use std::io::Cursor;
Expand Down Expand Up @@ -1519,15 +1516,21 @@ foo,bar
#[test]
fn test_with_row_count() -> Result<()> {
let df = CsvReader::from_path(FOODS_CSV)?
.with_row_count(Some("rc"), 0)
.with_row_count(Some(RowCount {
name: "rc".into(),
offset: 0,
}))
.finish()?;
let rc = df.column("rc")?;
assert_eq!(
rc.u32()?.into_no_null_iter().collect::<Vec<_>>(),
(0u32..27).collect::<Vec<_>>()
);
let df = CsvReader::from_path(FOODS_CSV)?
.with_row_count(Some("rc_2"), 10)
.with_row_count(Some(RowCount {
name: "rc_2".into(),
offset: 10,
}))
.finish()?;
let rc = df.column("rc_2")?;
assert_eq!(
Expand Down
30 changes: 15 additions & 15 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl<'a> CoreReader<'a> {
let projection = &projection;

let mut read = bytes_offset_thread;
let mut df: Option<DataFrame> = None;
let mut dfs = Vec::with_capacity(256);

let mut last_read = usize::MAX;
loop {
Expand Down Expand Up @@ -486,6 +486,11 @@ impl<'a> CoreReader<'a> {
.map(|buf| buf.into_series())
.collect::<Result<_>>()?,
);
let current_row_count = local_df.height() as u32;
if let Some(rc) = &self.row_count {
local_df.with_row_count_mut(&rc.name, Some(rc.offset));
};

if let Some(predicate) = predicate {
let s = predicate.evaluate(&local_df)?;
let mask =
Expand Down Expand Up @@ -519,23 +524,18 @@ impl<'a> CoreReader<'a> {
}
}
}
match &mut df {
None => df = Some(local_df),
Some(df) => {
df.vstack_mut(&local_df).unwrap();
}
}
cast_columns(&mut local_df, self.to_cast, false)?;
dfs.push((local_df, current_row_count));
}

df.map(|mut df| {
cast_columns(&mut df, self.to_cast, false)?;
Ok(df)
})
.transpose()
Ok(dfs)
})
.collect::<Result<Vec<_>>>()
})?;
accumulate_dataframes_vertical(dfs.into_iter().flatten())
let mut dfs = dfs.into_iter().flatten().collect::<Vec<_>>();
if self.row_count.is_some() {
update_row_counts(&mut dfs)
}
accumulate_dataframes_vertical(dfs.into_iter().map(|t| t.0))
} else {
// let exponential growth solve the needed size. This leads to less memory overhead
// in the later rechunk. Because we have large chunks they are easier reused for the
Expand Down Expand Up @@ -608,7 +608,7 @@ impl<'a> CoreReader<'a> {

cast_columns(&mut df, self.to_cast, false)?;
if let Some(rc) = &self.row_count {
df = df.with_row_count(&rc.name, Some(rc.offset)).unwrap()
df.with_row_count_mut(&rc.name, Some(rc.offset));
}
let n_read = df.height() as u32;
Ok((df, n_read))
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#[cfg(any(feature = "ipc", feature = "parquet"))]
use crate::ArrowSchema;
use crate::RowCount;
use dirs::home_dir;
use polars_core::frame::DataFrame;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -33,7 +32,9 @@ pub(crate) fn update_row_counts(dfs: &mut [(DataFrame, u32)]) {
if !dfs.is_empty() {
let mut previous = dfs[0].1;
for (df, n_read) in &mut dfs[1..] {
df.get_columns_mut().get_mut(0).map(|s| *s = &*s + previous);
if let Some(s) = df.get_columns_mut().get_mut(0) {
*s = &*s + previous;
}
previous = *n_read;
}
}
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use polars_core::prelude::*;
use polars_io::csv::{CsvEncoding, NullValues};
use polars_io::csv_core::utils::get_reader_bytes;
use polars_io::csv_core::utils::infer_file_schema;
use polars_io::RowCount;

#[derive(Clone)]
#[cfg(feature = "csv-file")]
Expand All @@ -25,6 +26,7 @@ pub struct LazyCsvReader<'a> {
rechunk: bool,
skip_rows_after_header: usize,
encoding: CsvEncoding,
row_count: Option<RowCount>,
}

#[cfg(feature = "csv-file")]
Expand All @@ -49,6 +51,7 @@ impl<'a> LazyCsvReader<'a> {
rechunk: true,
skip_rows_after_header: 0,
encoding: CsvEncoding::Utf8,
row_count: None,
}
}

Expand All @@ -59,6 +62,13 @@ impl<'a> LazyCsvReader<'a> {
self
}

/// Add a `row_count` column.
#[must_use]
pub fn with_row_count(mut self, rc: Option<RowCount>) -> Self {
self.row_count = rc;
self
}

/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
#[must_use]
Expand Down Expand Up @@ -213,6 +223,7 @@ impl<'a> LazyCsvReader<'a> {
self.rechunk,
self.skip_rows_after_header,
self.encoding,
self.row_count,
)?
.build()
.into();
Expand Down
55 changes: 34 additions & 21 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::prelude::{
use crate::logical_plan::FETCH_ROWS;
use crate::utils::{combine_predicates_expr, expr_to_root_column_names};
use polars_arrow::prelude::QuantileInterpolOptions;
use polars_io::RowCount;

#[derive(Clone, Debug)]
pub struct JoinOptions {
Expand Down Expand Up @@ -991,27 +992,39 @@ impl LazyFrame {
}

/// Add a new column at index 0 that counts the rows.
pub fn with_row_count(self, name: &str, offset: Option<u32>) -> LazyFrame {
let schema = self.schema();

let mut fields = schema.fields().clone();
fields.insert(0, Field::new(name, DataType::UInt32));
let new_schema = Schema::new(fields);

let name = name.to_owned();

let opt = AllowedOptimizations {
slice_pushdown: false,
predicate_pushdown: false,
..Default::default()
};

self.map(
move |df: DataFrame| df.with_row_count(&name, offset),
Some(opt),
Some(new_schema),
Some("WITH ROW COUNT"),
)
pub fn with_row_count(mut self, name: &str, offset: Option<u32>) -> LazyFrame {
match &mut self.logical_plan {
// Do the row count at scan
#[cfg(feature = "csv-file")]
LogicalPlan::CsvScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
self
}
_ => {
let schema = self.schema();

let mut fields = schema.fields().clone();
fields.insert(0, Field::new(name, DataType::UInt32));
let new_schema = Schema::new(fields);

let name = name.to_owned();

let opt = AllowedOptimizations {
slice_pushdown: false,
predicate_pushdown: false,
..Default::default()
};
self.map(
move |df: DataFrame| df.with_row_count(&name, offset),
Some(opt),
Some(new_schema),
Some("WITH ROW COUNT"),
)
}
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use polars_io::csv_core::utils::infer_file_schema;
use polars_io::ipc::IpcReader;
#[cfg(feature = "parquet")]
use polars_io::parquet::ParquetReader;
use polars_io::RowCount;
#[cfg(feature = "csv-file")]
use polars_io::{
csv::NullValues,
Expand Down Expand Up @@ -103,6 +104,7 @@ impl LogicalPlanBuilder {
rechunk: bool,
skip_rows_after_header: usize,
encoding: CsvEncoding,
row_count: Option<RowCount>,
) -> Result<Self> {
let path = path.into();
let mut file = std::fs::File::open(&path)?;
Expand Down Expand Up @@ -149,6 +151,7 @@ impl LogicalPlanBuilder {
null_values,
rechunk,
encoding,
row_count,
},
predicate: None,
aggregate: vec![],
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::csv::{CsvEncoding, NullValues};
use polars_io::RowCount;

#[derive(Clone, Debug)]
pub struct CsvParserOptions {
Expand All @@ -17,6 +18,7 @@ pub struct CsvParserOptions {
pub(crate) null_values: Option<NullValues>,
pub(crate) rechunk: bool,
pub(crate) encoding: CsvEncoding,
pub(crate) row_count: Option<RowCount>,
}
#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl Executor for CsvExec {
.with_quote_char(self.options.quote_char)
.with_encoding(self.options.encoding)
.with_rechunk(self.options.rechunk)
.with_row_count(std::mem::take(&mut self.options.row_count))
.finish()?;

if self.options.cache {
Expand Down
27 changes: 27 additions & 0 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use polars_io::RowCount;

#[test]
fn test_parquet_exec() -> Result<()> {
Expand Down Expand Up @@ -328,3 +329,29 @@ fn skip_rows_and_slice() -> Result<()> {
assert_eq!(out.shape(), (1, 4));
Ok(())
}

#[test]
fn test_row_count() -> Result<()> {
for offset in [0u32, 10] {
let df = LazyCsvReader::new(FOODS_CSV.to_string())
.with_row_count(Some(RowCount {
name: "rc".into(),
offset: offset,
}))
.finish()?
.collect()?;

let rc = df.column("rc")?;
assert_eq!(
rc.u32()?.into_no_null_iter().collect::<Vec<_>>(),
(offset..27 + offset).collect::<Vec<_>>()
);
}

let lf = LazyCsvReader::new(FOODS_CSV.to_string())
.finish()?
.with_row_count("foo", None);
assert!(row_count_at_scan(lf));

Ok(())
}
19 changes: 19 additions & 0 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ fn get_arenas() -> (Arena<AExpr>, Arena<ALogicalPlan>) {
(expr_arena, lp_arena)
}

pub(crate) fn row_count_at_scan(q: LazyFrame) -> bool {
let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();

(&lp_arena).iter(lp).any(|(_, lp)| {
use ALogicalPlan::*;
match lp {
CsvScan {
options:
CsvParserOptions {
row_count: Some(_), ..
},
..
} => true,
_ => false,
}
})
}

pub(crate) fn predicate_at_scan(q: LazyFrame) -> bool {
let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
Expand Down

0 comments on commit f839325

Please sign in to comment.