Skip to content

Commit

Permalink
refactor[rust]: prepare csv code for batched reading (#4999)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 28, 2022
1 parent cfee8f4 commit a25ca08
Show file tree
Hide file tree
Showing 18 changed files with 583 additions and 273 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"polars/polars-utils",
"polars/polars-ops",
"polars/polars-algo",
"polars/polars-pipe",
"polars-sql",
"examples/read_csv",
"examples/read_parquet",
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use serde::{Deserialize, Serialize};
pub use write::CsvWriter;

use crate::aggregations::ScanAggregation;
use crate::csv::read_impl::{cast_columns, CoreReader};
use crate::csv::read_impl::CoreReader;
use crate::csv::utils::get_reader_bytes;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
Expand Down
238 changes: 122 additions & 116 deletions polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#[cfg(feature = "dtype-categorical")]
use polars_core::toggle_string_cache;

use super::*;
use crate::csv::read_impl::BatchedCsvReader;

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down Expand Up @@ -130,6 +128,8 @@ where
skip_rows_after_header: usize,
parse_dates: bool,
row_count: Option<RowCount>,
// temporary schema needed for batch lifetimes
owned_schema: Option<Box<Schema>>,
}

impl<'a, R> CsvReader<'a, R>
Expand Down Expand Up @@ -326,9 +326,111 @@ impl<'a> CsvReader<'a, File> {
}
}

impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
fn core_reader<'b>(
&'b mut self,
schema: Option<&'b Schema>,
to_cast: Vec<Field>,
) -> PolarsResult<CoreReader<'b>>
where
'a: 'b,
{
let reader_bytes = get_reader_bytes(&mut self.reader)?;
CoreReader::new(
reader_bytes,
self.n_rows,
self.skip_rows_before_header,
std::mem::take(&mut self.projection),
self.max_records,
self.delimiter,
self.has_header,
self.ignore_parser_errors,
self.schema,
std::mem::take(&mut self.columns),
self.encoding,
self.n_threads,
schema,
self.dtype_overwrite,
self.sample_size,
self.chunk_size,
self.low_memory,
self.comment_char,
self.quote_char,
self.eol_char,
std::mem::take(&mut self.null_values),
std::mem::take(&mut self.predicate),
self.aggregate,
to_cast,
self.skip_rows_after_header,
std::mem::take(&mut self.row_count),
self.parse_dates,
)
}

fn prepare_schema_overwrite(&self, overwriting_schema: &Schema) -> (Schema, Vec<Field>, bool) {
// This branch we check if there are dtypes we cannot parse.
// We only support a few dtypes in the parser and later cast to the required dtype
let mut to_cast = Vec::with_capacity(overwriting_schema.len());

let mut _has_categorical = false;

#[allow(clippy::unnecessary_filter_map)]
let fields: Vec<_> = overwriting_schema
.iter_fields()
.filter_map(|mut fld| {
use DataType::*;
match fld.data_type() {
Time => {
to_cast.push(fld);
// let inference decide the column type
None
}
Int8 | Int16 | UInt8 | UInt16 => {
// We have not compiled these buffers, so we cast them later.
to_cast.push(fld.clone());
fld.coerce(DataType::Int32);
Some(fld)
}
#[cfg(feature = "dtype-categorical")]
Categorical(_) => {
_has_categorical = true;
Some(fld)
}
_ => Some(fld),
}
})
.collect();
let schema = Schema::from(fields);
(schema, to_cast, _has_categorical)
}

pub fn batched(&'a mut self) -> PolarsResult<BatchedCsvReader<'a>> {
if let Some(schema) = self.schema_overwrite {
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
self.owned_schema = Some(Box::new(schema));

// safety
// we boxed the schema and we refer to the boxed pointer
// the schema will drop once self drops
// so it is bound to 'a
let schema = unsafe {
std::mem::transmute::<Option<&Schema>, Option<&Schema>>(
self.owned_schema.as_ref().map(|b| b.as_ref()),
)
};

let csv_reader = self.core_reader(schema, to_cast)?;
csv_reader.batched(has_cat)
} else {
let csv_reader = self.core_reader(self.schema, vec![])?;
csv_reader.batched(false)
}
}
}

impl<'a, R> SerReader<R> for CsvReader<'a, R>
where
R: MmapBytesReader,
R: MmapBytesReader + 'a,
{
/// Create a new CsvReader from a file/ stream
fn new(reader: R) -> Self {
Expand Down Expand Up @@ -361,129 +463,40 @@ where
skip_rows_after_header: 0,
parse_dates: false,
row_count: None,
owned_schema: None,
}
}

/// Read the file and create the DataFrame.
fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
// we cannot append categorical under local string cache, so we cast them later.
#[allow(unused_mut)]
let mut to_cast_local = vec![];
let schema_overwrite = self.schema_overwrite;
let dtype_overwrite = self.dtype_overwrite;
let should_parse_dates = self.parse_dates;
let low_memory = self.low_memory;

#[cfg(feature = "dtype-categorical")]
let mut has_categorical = false;

let mut df = if let Some(schema) = self.schema_overwrite {
// This branch we check if there are dtypes we cannot parse.
// We only support a few dtypes in the parser and later cast to the required dtype
let mut to_cast = Vec::with_capacity(schema.len());

#[allow(clippy::unnecessary_filter_map)]
let fields: Vec<_> = schema
.iter_fields()
.filter_map(|mut fld| {
use DataType::*;
match fld.data_type() {
Time => {
to_cast.push(fld);
// let inference decide the column type
None
}
Int8 | Int16 | UInt8 | UInt16 => {
// We have not compiled these buffers, so we cast them later.
to_cast.push(fld.clone());
fld.coerce(DataType::Int32);
Some(fld)
}
#[cfg(feature = "dtype-categorical")]
DataType::Categorical(_) => {
has_categorical = true;
Some(fld)
}
_ => Some(fld),
}
})
.collect();
let schema = Schema::from(fields);
let mut _cat_lock = None;

let mut df = if let Some(schema) = schema_overwrite {
let (schema, to_cast, _has_cat) = self.prepare_schema_overwrite(schema);

#[cfg(feature = "dtype-categorical")]
if has_categorical {
toggle_string_cache(true);
if _has_cat {
_cat_lock = Some(polars_core::IUseStringCache::new())
}

// we cannot overwrite self, because the lifetime is already instantiated with `a, and
// the lifetime that accompanies this scope is shorter.
// So we just build_csv_reader from here
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let mut csv_reader = CoreReader::new(
reader_bytes,
self.n_rows,
self.skip_rows_before_header,
self.projection,
self.max_records,
self.delimiter,
self.has_header,
self.ignore_parser_errors,
self.schema,
self.columns,
self.encoding,
self.n_threads,
Some(&schema),
self.dtype_overwrite,
self.sample_size,
self.chunk_size,
self.low_memory,
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values,
self.predicate,
self.aggregate,
&to_cast,
self.skip_rows_after_header,
self.row_count,
self.parse_dates,
)?;
let mut csv_reader = self.core_reader(Some(&schema), to_cast)?;
csv_reader.as_df()?
} else {
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let mut csv_reader = CoreReader::new(
reader_bytes,
self.n_rows,
self.skip_rows_before_header,
self.projection,
self.max_records,
self.delimiter,
self.has_header,
self.ignore_parser_errors,
self.schema,
self.columns,
self.encoding,
self.n_threads,
self.schema,
self.dtype_overwrite,
self.sample_size,
self.chunk_size,
self.low_memory,
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values,
self.predicate,
self.aggregate,
&[],
self.skip_rows_after_header,
self.row_count,
self.parse_dates,
)?;
let mut csv_reader = self.core_reader(self.schema, vec![])?;
csv_reader.as_df()?
};

// Important that this rechunk is never done in parallel.
// As that leads to great memory overhead.
if rechunk && df.n_chunks()? > 1 {
if self.low_memory {
if low_memory {
df.as_single_chunk();
} else {
df.as_single_chunk_par();
Expand All @@ -492,9 +505,9 @@ where

#[cfg(feature = "temporal")]
// only needed until we also can parse time columns in place
if self.parse_dates {
if should_parse_dates {
// determine the schema that's given by the user. That should not be changed
let fixed_schema = match (self.schema_overwrite, self.dtype_overwrite) {
let fixed_schema = match (schema_overwrite, dtype_overwrite) {
(Some(schema), _) => Cow::Borrowed(schema),
(None, Some(dtypes)) => {
let fields: Vec<_> = dtypes
Expand All @@ -509,13 +522,6 @@ where
};
df = parse_dates(df, &fixed_schema)
}

cast_columns(&mut df, &to_cast_local, true)?;

#[cfg(feature = "dtype-categorical")]
if has_categorical {
toggle_string_cache(false);
}
Ok(df)
}
}
Expand Down

0 comments on commit a25ca08

Please sign in to comment.