Skip to content

Commit

Permalink
ignore error option for readers; closes #69
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 11, 2020
1 parent 4c4ec9b commit 90d8a88
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 38 deletions.
39 changes: 16 additions & 23 deletions polars/src/chunked_array/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ macro_rules! cmp_float_with_nans {

macro_rules! agg_float_with_nans {
($self:ident, $agg_method:ident, $precision:ty) => {{
if $self.null_count() == 0 {
$self.into_no_null_iter()
.$agg_method(|&a, &b| cmp_float_with_nans!(a, b, $precision))
} else {
$self.into_iter()
.filter(|opt| opt.is_some())
.map(|opt| opt.unwrap())
.$agg_method(|&a, &b| cmp_float_with_nans!(a, b, $precision))
}

}}
if $self.null_count() == 0 {
$self
.into_no_null_iter()
.$agg_method(|&a, &b| cmp_float_with_nans!(a, b, $precision))
} else {
$self
.into_iter()
.filter(|opt| opt.is_some())
.map(|opt| opt.unwrap())
.$agg_method(|&a, &b| cmp_float_with_nans!(a, b, $precision))
}
}};
}

impl<T> ChunkAgg<T::Native> for ChunkedArray<T>
Expand Down Expand Up @@ -60,12 +61,8 @@ where

fn min(&self) -> Option<T::Native> {
match T::get_data_type() {
ArrowDataType::Float32 => {
agg_float_with_nans!(self, min_by, f32)
},
ArrowDataType::Float64 => {
agg_float_with_nans!(self, min_by, f64)
}
ArrowDataType::Float32 => agg_float_with_nans!(self, min_by, f32),
ArrowDataType::Float64 => agg_float_with_nans!(self, min_by, f64),
_ => self
.downcast_chunks()
.iter()
Expand All @@ -76,12 +73,8 @@ where

fn max(&self) -> Option<T::Native> {
match T::get_data_type() {
ArrowDataType::Float32 => {
agg_float_with_nans!(self, max_by, f32)
},
ArrowDataType::Float64 => {
agg_float_with_nans!(self, max_by, f64)
}
ArrowDataType::Float32 => agg_float_with_nans!(self, max_by, f32),
ArrowDataType::Float64 => agg_float_with_nans!(self, max_by, f64),
_ => self
.downcast_chunks()
.iter()
Expand Down
72 changes: 68 additions & 4 deletions polars/src/frame/ser/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
//! assert_eq!("sepal.length", df.get_columns()[0].name());
//! # assert_eq!(1, df.column("sepal.length").unwrap().chunks().len());
//! ```
use crate::frame::ser::finish_reader;
use crate::prelude::*;
use crate::{frame::ser::finish_reader, utils::clone};
pub use arrow::csv::{ReaderBuilder, WriterBuilder};
use arrow::error::ArrowError;
use std::io::{Read, Seek, Write};
use std::sync::Arc;

Expand Down Expand Up @@ -138,6 +139,10 @@ where
reader_builder: ReaderBuilder,
/// Aggregates chunk afterwards to a single chunk.
rechunk: bool,
/// Continue with next batch when a ParserError is encountered.
ignore_parser_error: bool,
// use by error ignore logic
max_records: Option<usize>,
}

impl<R> SerReader<R> for CsvReader<R>
Expand All @@ -150,6 +155,8 @@ where
reader,
reader_builder: ReaderBuilder::new(),
rechunk: true,
ignore_parser_error: false,
max_records: None,
}
}

Expand All @@ -159,10 +166,41 @@ where
self
}

/// Continue with next batch when a ParserError is encountered.
fn with_ignore_parser_error(mut self) -> Self {
self.ignore_parser_error = true;
self
}

/// Read the file and create the DataFrame.
fn finish(self) -> Result<DataFrame> {
let rechunk = self.rechunk;
finish_reader(self.reader_builder.build(self.reader)?, rechunk)
fn finish(mut self) -> Result<DataFrame> {
// It could be that we could not infer schema due to invalid lines.
// If we have a CsvError or ParserError we half the number of lines we use for
// schema inference
let reader = if self.ignore_parser_error && self.max_records.is_some() {
if self.max_records < Some(1) {
return Err(PolarsError::Other("Could not infer schema".to_string()));
}
let reader_val;
loop {
let rb = clone(&self.reader_builder);
match rb.build(clone(&self.reader)) {
Err(ArrowError::CsvError(_)) | Err(ArrowError::ParseError(_)) => {
self.max_records = self.max_records.map(|v| v / 2);
self.reader_builder = self.reader_builder.infer_schema(self.max_records);
}
Err(e) => return Err(PolarsError::ArrowError(e)),
Ok(reader) => {
reader_val = reader;
break;
}
}
}
reader_val
} else {
self.reader_builder.build(self.reader)?
};
finish_reader(reader, self.rechunk, self.ignore_parser_error)
}
}

Expand Down Expand Up @@ -208,6 +246,8 @@ where

/// Set the CSV reader to infer the schema of the file
pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
// used by error ignore logic
self.max_records = max_records;
self.reader_builder = self.reader_builder.infer_schema(max_records);
self
}
Expand Down Expand Up @@ -241,4 +281,28 @@ mod test {
let csv = std::str::from_utf8(&buf).unwrap();
assert_eq!("days,temp\n0,22.1\n1,19.9\n2,7\n3,2\n4,3\n", csv);
}

#[test]
fn test_parser_error_ignore() {
use std::io::Cursor;

let s = r#"
"sepal.length","sepal.width","petal.length","petal.width","variety"
5.1,3.5,1.4,.2,"Setosa"
5.1,3.5,1.4,.2,"Setosa"
4.9,3,1.4,.2,"Setosa", "extra-column"
"#;

let file = Cursor::new(s);

// just checks if unwrap doesn't panic
CsvReader::new(file)
// we also check if infer schema ignores errors
.infer_schema(Some(10))
.has_header(true)
.with_batch_size(2)
.with_ignore_parser_error()
.finish()
.unwrap();
}
}
11 changes: 9 additions & 2 deletions polars/src/frame/ser/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct IPCReader<R> {
reader: R,
/// Aggregates chunks afterwards to a single chunk.
rechunk: bool,
ignore_parser_error: bool,
}

impl<R> ArrowReader for ArrowIPCFileReader<R>
Expand All @@ -69,17 +70,23 @@ where
IPCReader {
reader,
rechunk: true,
ignore_parser_error: false,
}
}
fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}

fn with_ignore_parser_error(mut self) -> Self {
self.ignore_parser_error = true;
self
}

fn finish(self) -> Result<DataFrame> {
let rechunk = self.rechunk;
let ipc_reader = ArrowIPCFileReader::try_new(self.reader)?;
finish_reader(ipc_reader, rechunk)
finish_reader(ipc_reader, rechunk, self.ignore_parser_error)
}
}

Expand All @@ -104,7 +111,7 @@ where
fn new(writer: &'a mut W) -> Self {
IPCWriter {
writer,
batch_size: 1000,
batch_size: 200_000,
}
}

Expand Down
14 changes: 13 additions & 1 deletion polars/src/frame/ser/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ where
reader: R,
reader_builder: ReaderBuilder,
rechunk: bool,
ignore_parser_error: bool,
}

impl<R> SerReader<R> for JsonReader<R>
Expand All @@ -83,17 +84,27 @@ where
reader,
reader_builder: ReaderBuilder::new(),
rechunk: true,
ignore_parser_error: false,
}
}

fn with_ignore_parser_error(mut self) -> Self {
self.ignore_parser_error = true;
self
}

fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}

fn finish(self) -> Result<DataFrame> {
let rechunk = self.rechunk;
finish_reader(self.reader_builder.build(self.reader)?, rechunk)
finish_reader(
self.reader_builder.build(self.reader)?,
rechunk,
self.ignore_parser_error,
)
}
}

Expand All @@ -114,6 +125,7 @@ where
}

/// Set the batch size (number of records to load at one time)
/// This heavily influences loading time.
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.reader_builder = self.reader_builder.with_batch_size(batch_size);
self
Expand Down
30 changes: 24 additions & 6 deletions polars/src/frame/ser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ pub mod json;
pub mod parquet;
use crate::prelude::*;
use arrow::{
csv::Reader as ArrowCsvReader, error::Result as ArrowResult, json::Reader as ArrowJsonReader,
record_batch::RecordBatch,
csv::Reader as ArrowCsvReader, error::ArrowError, error::Result as ArrowResult,
json::Reader as ArrowJsonReader, record_batch::RecordBatch,
};
use std::io::{Read, Seek, Write};
use std::sync::Arc;
Expand All @@ -21,6 +21,9 @@ where
/// Rechunk to a single chunk after Reading file.
fn set_rechunk(self, rechunk: bool) -> Self;

/// Continue with next batch when a ParserError is encountered.
fn with_ignore_parser_error(self) -> Self;

/// Take the SerReader and return a parsed DataFrame.
fn finish(self) -> Result<DataFrame>;
}
Expand Down Expand Up @@ -59,7 +62,11 @@ impl<R: Read> ArrowReader for ArrowJsonReader<R> {
}
}

pub fn finish_reader<R: ArrowReader>(mut reader: R, rechunk: bool) -> Result<DataFrame> {
pub fn finish_reader<R: ArrowReader>(
mut reader: R,
rechunk: bool,
ignore_parser_error: bool,
) -> Result<DataFrame> {
fn init_ca<T>(field: &Field) -> ChunkedArray<T>
where
T: PolarsDataType,
Expand Down Expand Up @@ -117,19 +124,30 @@ pub fn finish_reader<R: ArrowReader>(mut reader: R, rechunk: bool) -> Result<Dat
Series::TimestampSecond(init_ca(field))
}
ArrowDataType::LargeList(_) => Series::LargeList(init_ca(field)),
_ => unimplemented!(),
t => panic!(format!("Arrow datatype {:?} is not supported", t)),
})
.collect::<Vec<_>>();

while let Some(batch) = reader.next()? {
loop {
let batch = match reader.next() {
Err(ArrowError::ParseError(s)) => {
if ignore_parser_error {
continue;
} else {
return Err(PolarsError::ArrowError(ArrowError::ParseError(s)));
}
}
Err(e) => return Err(PolarsError::ArrowError(e)),
Ok(None) => break,
Ok(Some(batch)) => batch,
};
batch
.columns()
.into_iter()
.zip(&mut columns)
.map(|(arr, ser)| ser.append_array(arr.clone()))
.collect::<Result<Vec<_>>>()?;
}

if rechunk {
columns = columns
.into_iter()
Expand Down
13 changes: 11 additions & 2 deletions polars/src/frame/ser/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct ParquetReader<R> {
reader: R,
rechunk: bool,
batch_size: usize,
ignore_parser_error: bool,
}

impl ArrowReader for ParquetRecordBatchReader {
Expand All @@ -44,6 +45,7 @@ impl ArrowReader for ParquetRecordBatchReader {

impl<R> ParquetReader<R> {
/// Set the size of the read buffer. Batch size is the amount of rows read at once.
/// This heavily influences loading time.
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
Expand All @@ -58,7 +60,9 @@ where
ParquetReader {
reader,
rechunk: true,
batch_size: 2048,
// parquets are often large, so use a large batch size
batch_size: 524288,
ignore_parser_error: false,
}
}

Expand All @@ -67,13 +71,18 @@ where
self
}

fn with_ignore_parser_error(mut self) -> Self {
self.ignore_parser_error = true;
self
}

fn finish(self) -> Result<DataFrame> {
let rechunk = self.rechunk;

let file_reader = Rc::new(SerializedFileReader::new(self.reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let record_reader = arrow_reader.get_record_reader(self.batch_size)?;
finish_reader(record_reader, rechunk)
finish_reader(record_reader, rechunk, self.ignore_parser_error)
}
}

Expand Down
5 changes: 5 additions & 0 deletions polars/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,8 @@ macro_rules! match_arrow_data_type_apply_macro {
}
}};
}

/// Clone if upstream hasn't implemented clone
pub(crate) fn clone<T>(t: &T) -> T {
unsafe { mem::transmute_copy(t) }
}

0 comments on commit 90d8a88

Please sign in to comment.