Skip to content

Commit

Permalink
csv-parser add 'null_values' argument
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 9, 2021
1 parent 7766fb1 commit f5d5323
Show file tree
Hide file tree
Showing 15 changed files with 220 additions and 17 deletions.
57 changes: 57 additions & 0 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,35 @@ pub enum CsvEncoding {
LossyUtf8,
}

#[derive(Clone, Debug)]
pub enum NullValues {
/// A single value that's used for all columns
AllColumns(String),
/// A different null value per column
Columns(Vec<String>),
/// Tuples that map column names to null value of that column
Named(Vec<(String, String)>),
}

impl NullValues {
/// Use the schema and the null values to produce a null value for every column.
pub(crate) fn process(self, schema: &Schema) -> Result<Vec<String>> {
let out = match self {
NullValues::Columns(v) => v,
NullValues::AllColumns(v) => (0..schema.len()).map(|_| v.clone()).collect(),
NullValues::Named(v) => {
let mut null_values = vec!["".to_string(); schema.len()];
for (name, null_value) in v {
let i = schema.index_of(&name)?;
null_values[i] = null_value;
}
null_values
}
};
Ok(out)
}
}

/// Create a new DataFrame by reading a csv file.
///
/// # Example
Expand Down Expand Up @@ -181,6 +210,7 @@ where
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
null_values: Option<NullValues>,
}

impl<'a, R> CsvReader<'a, R>
Expand Down Expand Up @@ -251,6 +281,12 @@ where
self
}

/// Set values that will be interpreted as missing/ null.
pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
self.null_values = null_values;
self
}

/// Overwrite the schema with the dtypes in this given Schema. The given schema may be a subset
/// of the total schema.
pub fn with_dtypes(mut self, schema: Option<&'a Schema>) -> Self {
Expand Down Expand Up @@ -333,6 +369,7 @@ where
self.chunk_size,
self.low_memory,
self.comment_char,
self.null_values,
)
}
}
Expand Down Expand Up @@ -372,6 +409,7 @@ where
chunk_size: 8192,
low_memory: false,
comment_char: None,
null_values: None,
}
}

Expand Down Expand Up @@ -427,6 +465,7 @@ where
self.chunk_size,
self.low_memory,
self.comment_char,
self.null_values,
)?;
let mut df = csv_reader.as_df(None, None)?;

Expand Down Expand Up @@ -929,4 +968,22 @@ AUDCAD,1616455921,0.96212,0.95666,1
assert_eq!(df.shape(), (3, 5));
Ok(())
}

#[test]
fn test_null_values_argument() -> Result<()> {
let csv = r"1,a,foo
null-value,b,bar,
3,null-value,ham
";

let file = Cursor::new(csv);
let df = CsvReader::new(file)
.has_header(false)
.with_null_values(NullValues::AllColumns("null-value".to_string()).into())
.finish()?;
use polars_core::df;
dbg!(df);
// assert_eq!(df.shape(), (3, 5));
Ok(())
}
}
14 changes: 14 additions & 0 deletions polars/polars-io/src/csv_core/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,20 @@ impl Buffer {
}
}

pub(crate) fn add_null(&mut self) {
match self {
Buffer::Boolean(v) => v.append_null(),
Buffer::Int32(v) => v.append_null(),
Buffer::Int64(v) => v.append_null(),
Buffer::UInt32(v) => v.append_null(),
#[cfg(feature = "dtype-u64")]
Buffer::UInt64(v) => v.append_null(),
Buffer::Float32(v) => v.append_null(),
Buffer::Float64(v) => v.append_null(),
Buffer::Utf8(v) => v.builder.append_null(),
};
}

#[inline]
pub(crate) fn add(
&mut self,
Expand Down
9 changes: 8 additions & 1 deletion polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::csv::CsvEncoding;
use crate::csv::{CsvEncoding, NullValues};
use crate::csv_core::utils::*;
use crate::csv_core::{buffer::*, parser::*};
use crate::mmap::MmapBytesReader;
Expand Down Expand Up @@ -39,6 +39,7 @@ pub struct SequentialReader<R: Read + MmapBytesReader> {
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
null_values: Option<Vec<String>>,
}

impl<R> fmt::Debug for SequentialReader<R>
Expand Down Expand Up @@ -308,6 +309,7 @@ impl<R: Read + Sync + Send + MmapBytesReader> SequentialReader<R> {
read,
delimiter,
self.comment_char,
self.null_values.as_ref(),
projection,
&mut buffers,
ignore_parser_errors,
Expand Down Expand Up @@ -397,6 +399,7 @@ impl<R: Read + Sync + Send + MmapBytesReader> SequentialReader<R> {
read,
delimiter,
self.comment_char,
self.null_values.as_ref(),
projection,
&mut buffers,
ignore_parser_errors,
Expand Down Expand Up @@ -505,6 +508,7 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
null_values: Option<NullValues>,
) -> Result<SequentialReader<R>> {
// check if schema should be inferred
let delimiter = delimiter.unwrap_or(b',');
Expand All @@ -524,6 +528,8 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
}
};

let null_values = null_values.map(|nv| nv.process(&schema)).transpose()?;

if let Some(cols) = columns {
let mut prj = Vec::with_capacity(cols.len());
for col in cols {
Expand Down Expand Up @@ -553,5 +559,6 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
chunk_size,
low_memory,
comment_char,
null_values,
})
}
40 changes: 27 additions & 13 deletions polars/polars-io/src/csv_core/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ pub(crate) fn parse_lines(
offset: usize,
delimiter: u8,
comment_char: Option<u8>,
null_values: Option<&Vec<String>>,
projection: &[usize],
buffers: &mut [Buffer],
ignore_parser_errors: bool,
Expand Down Expand Up @@ -383,19 +384,32 @@ pub(crate) fn parse_lines(
// SAFETY: processed fields index can never exceed the projection indices.
buffers.get_unchecked_mut(processed_fields)
};
// let buf = &mut buffers[processed_fields];
buf.add(field, ignore_parser_errors, read, encoding)
.map_err(|e| {
PolarsError::Other(
format!(
"{:?} on thread line {}; on input: {}",
e,
idx,
String::from_utf8_lossy(field)
let mut add_null = false;

// if we have null values argument, check if this field equal null value
if let Some(null_values) = &null_values {
if let Some(null_value) = null_values.get(processed_fields) {
if field == null_value.as_bytes() {
add_null = true;
}
}
}
if add_null {
buf.add_null()
} else {
buf.add(field, ignore_parser_errors, read, encoding)
.map_err(|e| {
PolarsError::Other(
format!(
"{:?} on thread line {}; on input: {}",
e,
idx,
String::from_utf8_lossy(field)
)
.into(),
)
.into(),
)
})?;
})?;
}

processed_fields += 1;

Expand All @@ -421,7 +435,7 @@ pub(crate) fn parse_lines(
buffers.get_unchecked_mut(processed_fields)
};

buf.add(&[], true, read, encoding)?;
buf.add_null();
processed_fields += 1;
}

Expand Down
10 changes: 10 additions & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::prelude::aggregate_scan_projections::agg_projection;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
use crate::utils::combine_predicates_expr;
use crate::{logical_plan::FETCH_ROWS, prelude::*};
use polars_io::csv::NullValues;

#[derive(Clone)]
#[cfg(feature = "csv-file")]
Expand All @@ -35,6 +36,7 @@ pub struct LazyCsvReader<'a> {
schema_overwrite: Option<&'a Schema>,
low_memory: bool,
comment_char: Option<u8>,
null_values: Option<NullValues>,
}

#[cfg(feature = "csv-file")]
Expand All @@ -52,6 +54,7 @@ impl<'a> LazyCsvReader<'a> {
schema_overwrite: None,
low_memory: false,
comment_char: None,
null_values: None,
}
}

Expand Down Expand Up @@ -105,6 +108,12 @@ impl<'a> LazyCsvReader<'a> {
self
}

/// Set values that will be interpreted as missing/ null.
pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
self.null_values = null_values;
self
}

/// Cache the DataFrame after reading.
pub fn with_cache(mut self, cache: bool) -> Self {
self.cache = cache;
Expand All @@ -130,6 +139,7 @@ impl<'a> LazyCsvReader<'a> {
self.schema_overwrite,
self.low_memory,
self.comment_char,
self.null_values,
)
.build()
.into();
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::utils::{
rename_expr_root_name,
};
use crate::{prelude::*, utils};
use polars_io::csv::NullValues;

pub(crate) mod aexpr;
pub(crate) mod alp;
Expand Down Expand Up @@ -146,6 +147,7 @@ pub struct CsvParserOptions {
pub(crate) low_memory: bool,
pub(crate) ignore_errors: bool,
pub(crate) cache: bool,
pub(crate) null_values: Option<NullValues>,
}

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
Expand Down Expand Up @@ -976,6 +978,7 @@ impl LogicalPlanBuilder {
schema_overwrite: Option<&Schema>,
low_memory: bool,
comment_char: Option<u8>,
null_values: Option<NullValues>,
) -> Self {
let path = path.into();
let mut file = std::fs::File::open(&path).expect("could not open file");
Expand Down Expand Up @@ -1006,6 +1009,7 @@ impl LogicalPlanBuilder {
low_memory,
cache,
comment_char,
null_values,
},
predicate: None,
aggregate: vec![],
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl Executor for CsvExec {
.with_stop_after_n_rows(stop_after_n_rows)
.with_columns(with_columns)
.low_memory(self.options.low_memory)
.with_null_values(self.options.null_values.clone())
.with_encoding(CsvEncoding::LossyUtf8);

let aggregate = if self.aggregate.is_empty() {
Expand Down
12 changes: 11 additions & 1 deletion py-polars/polars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from . import datatypes
from .datatypes import DataType, pytype_to_polars_type
from ._html import NotebookFormatter
from .utils import coerce_arrow, _is_expr
from .utils import coerce_arrow, _is_expr, _process_null_values
import polars
import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -176,6 +176,7 @@ def read_csv(
dtype: Optional[Dict[str, Type[DataType]]] = None,
low_memory: bool = False,
comment_char: Optional[str] = None,
null_values: Optional[Union[str, List[str], Dict[str, str]]] = None,
) -> "DataFrame":
"""
Read a CSV file into a Dataframe.
Expand Down Expand Up @@ -217,6 +218,12 @@ def read_csv(
Reduce memory usage in expense of performance.
comment_char
character that indicates the start of a comment line, for instance '#'.
null_values
Values to interpret as null values. You can provide a:
- str -> all values encountered equal to this string will be null
- List[str] -> A null value per column.
- Dict[str, str] -> A dictionary that maps column name to a null value string.
Example
---
Expand Down Expand Up @@ -246,6 +253,8 @@ def read_csv(
for k, v in dtype.items():
dtype_list.append((k, pytype_to_polars_type(v)))

null_values = _process_null_values(null_values) # type: ignore

self._df = PyDataFrame.read_csv(
file,
infer_schema_length,
Expand All @@ -264,6 +273,7 @@ def read_csv(
dtype_list,
low_memory,
comment_char,
null_values,
)
return self

Expand Down

0 comments on commit f5d5323

Please sign in to comment.