Skip to content

Commit

Permalink
csv: allow reading with different eol character (#4080)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 19, 2022
1 parent cf82cf3 commit 5c3430f
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 80 deletions.
136 changes: 84 additions & 52 deletions polars/polars-io/src/csv/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ pub(crate) fn skip_bom(input: &[u8]) -> &[u8] {

/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
pub(crate) fn next_line_position_naive(input: &[u8]) -> Option<usize> {
let pos = input.iter().position(|b| *b == b'\n')? + 1;
pub(crate) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
let pos = input.iter().position(|b| *b == eol_char)? + 1;
if input.len() - pos == 0 {
return None;
}
Expand All @@ -29,19 +29,20 @@ pub(crate) fn next_line_position(
expected_fields: usize,
delimiter: u8,
quote_char: Option<u8>,
eol_char: u8,
) -> Option<usize> {
let mut total_pos = 0;
if input.is_empty() {
return None;
}
loop {
let pos = memchr::memchr(b'\n', input)? + 1;
let pos = memchr::memchr(eol_char, input)? + 1;
if input.len() - pos == 0 {
return None;
}
let line = SplitLines::new(&input[pos..], b'\n').next();
let line = SplitLines::new(&input[pos..], eol_char).next();
if let Some(line) = line {
if SplitFields::new(line, delimiter, quote_char)
if SplitFields::new(line, delimiter, quote_char, eol_char)
.into_iter()
.count()
== expected_fields
Expand All @@ -57,8 +58,8 @@ pub(crate) fn next_line_position(
}
}

pub(crate) fn is_line_ending(b: u8) -> bool {
b == b'\n' || b == b'\r'
pub(crate) fn is_line_ending(b: u8, eol_char: u8) -> bool {
b == eol_char || b == b'\r'
}

pub(crate) fn is_whitespace(b: u8) -> bool {
Expand Down Expand Up @@ -89,10 +90,10 @@ where
/// 'field_1,field_2'
/// and not with
/// '\nfield_1,field_1'
pub(crate) fn skip_header(input: &[u8]) -> (&[u8], usize) {
match next_line_position_naive(input) {
pub(crate) fn skip_header(input: &[u8], eol_char: u8) -> (&[u8], usize) {
match next_line_position_naive(input, eol_char) {
Some(mut pos) => {
if input[pos] == b'\n' {
if input[pos] == eol_char {
pos += 1;
}
(&input[pos..], pos)
Expand All @@ -116,19 +117,23 @@ pub(crate) fn skip_whitespace_exclude(input: &[u8], exclude: u8) -> &[u8] {

#[inline]
/// Can be used to skip whitespace, but exclude the delimiter
pub(crate) fn skip_whitespace_line_ending_exclude(input: &[u8], exclude: u8) -> &[u8] {
pub(crate) fn skip_whitespace_line_ending_exclude(
input: &[u8],
exclude: u8,
eol_char: u8,
) -> &[u8] {
skip_condition(input, |b| {
b != exclude && (is_whitespace(b) || is_line_ending(b))
b != exclude && (is_whitespace(b) || is_line_ending(b, eol_char))
})
}

#[inline]
pub(crate) fn skip_line_ending(input: &[u8]) -> &[u8] {
skip_condition(input, is_line_ending)
pub(crate) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
skip_condition(input, |b| is_line_ending(b, eol_char))
}

/// Get the mean and standard deviation of length of lines in bytes
pub(crate) fn get_line_stats(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)> {
pub(crate) fn get_line_stats(bytes: &[u8], n_lines: usize, eol_char: u8) -> Option<(f32, f32)> {
let mut n_read = 0;
let mut lengths = Vec::with_capacity(n_lines);
let file_len = bytes.len();
Expand All @@ -139,7 +144,7 @@ pub(crate) fn get_line_stats(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)>
return None;
}
bytes_trunc = &bytes[n_read..];
match memchr::memchr(b'\n', bytes_trunc) {
match memchr::memchr(eol_char, bytes_trunc) {
Some(position) => {
n_read += position + 1;
lengths.push(position + 1);
Expand Down Expand Up @@ -216,11 +221,15 @@ impl<'a> Iterator for SplitLines<'a> {
}
}
}
// return line up to this position
let ret = Some(&self.v[..(pos - 1) as usize]);
// skip the '\n' token and update slice.
self.v = &self.v[pos as usize..];
ret

unsafe {
debug_assert!((pos as usize) <= self.v.len());
// return line up to this position
let ret = Some(self.v.get_unchecked(..(pos - 1) as usize));
// skip the '\n' token and update slice.
self.v = self.v.get_unchecked(pos as usize..);
ret
}
}
}

Expand All @@ -232,22 +241,30 @@ pub(crate) struct SplitFields<'a> {
finished: bool,
quote_char: u8,
quoting: bool,
eol_char: u8,
}

impl<'a> SplitFields<'a> {
pub(crate) fn new(slice: &'a [u8], delimiter: u8, quote_char: Option<u8>) -> Self {
pub(crate) fn new(
slice: &'a [u8],
delimiter: u8,
quote_char: Option<u8>,
eol_char: u8,
) -> Self {
Self {
v: slice,
delimiter,
finished: false,
quote_char: quote_char.unwrap_or(b'"'),
quoting: quote_char.is_some(),
eol_char,
}
}

fn finish_eol(&mut self, need_escaping: bool, idx: usize) -> Option<(&'a [u8], bool)> {
unsafe fn finish_eol(&mut self, need_escaping: bool, idx: usize) -> Option<(&'a [u8], bool)> {
self.finished = true;
Some((&self.v[..idx], need_escaping))
debug_assert!(idx <= self.v.len());
Some((self.v.get_unchecked(..idx), need_escaping))
}

fn finish(&mut self, need_escaping: bool) -> Option<(&'a [u8], bool)> {
Expand All @@ -256,7 +273,7 @@ impl<'a> SplitFields<'a> {
}

fn eof_oel(&self, current_ch: u8) -> bool {
current_ch == self.delimiter || current_ch == b'\n'
current_ch == self.delimiter || current_ch == self.eol_char
}
}

Expand Down Expand Up @@ -322,8 +339,10 @@ impl<'a> Iterator for SplitFields<'a> {
}

if !in_field && self.eof_oel(c) {
if c == b'\n' {
return self.finish_eol(needs_escaping, current_idx as usize);
if c == self.eol_char {
// safety
// we are in bounds
return unsafe { self.finish_eol(needs_escaping, current_idx as usize) };
}
idx = current_idx;
break;
Expand All @@ -337,26 +356,36 @@ impl<'a> Iterator for SplitFields<'a> {

idx as usize
} else {
match memchr::memchr2(self.delimiter, b'\n', self.v) {
match memchr::memchr2(self.delimiter, self.eol_char, self.v) {
None => return self.finish(needs_escaping),
Some(idx) => match self.v[idx] {
b'\n' => return self.finish_eol(needs_escaping, idx),
_ => idx,
Some(idx) => unsafe {
// Safety:
// idx was just found
if *self.v.get_unchecked(idx) == self.eol_char {
return self.finish_eol(needs_escaping, idx);
} else {
idx
}
},
}
};

let ret = Some((&self.v[..pos], needs_escaping));
self.v = &self.v[pos + 1..];
ret
unsafe {
debug_assert!(pos <= self.v.len());
// safety
// we are in bounds
let ret = Some((self.v.get_unchecked(..pos), needs_escaping));
self.v = self.v.get_unchecked(pos + 1..);
ret
}
}
}

#[inline]
fn skip_this_line(bytes: &[u8], quote: Option<u8>) -> &[u8] {
fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
let pos = match quote {
Some(quote) => find_quoted(bytes, quote, b'\n'),
None => bytes.iter().position(|x| *x == b'\n'),
Some(quote) => find_quoted(bytes, quote, eol_char),
None => bytes.iter().position(|x| *x == eol_char),
};
match pos {
None => &[],
Expand All @@ -380,6 +409,7 @@ pub(super) fn parse_lines(
delimiter: u8,
comment_char: Option<u8>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<&NullValuesCompiled>,
projection: &[usize],
buffers: &mut [Buffer],
Expand All @@ -392,13 +422,6 @@ pub(super) fn parse_lines(
!projection.is_empty(),
"at least one column should be projected"
);
// only when we have one column \n should not be skipped
// other widths should have commas.
let skipwh = if schema_len > 1 {
skip_whitespace_line_ending_exclude
} else {
skip_whitespace_exclude
};

// we use the pointers to track the no of bytes read.
let start = bytes.as_ptr() as usize;
Expand All @@ -412,7 +435,13 @@ pub(super) fn parse_lines(
return Ok(end - start);
}

bytes = skipwh(bytes, delimiter);
// only when we have one column \n should not be skipped
// other widths should have commas.
bytes = if schema_len > 1 {
skip_whitespace_line_ending_exclude(bytes, delimiter, eol_char)
} else {
skip_whitespace_exclude(bytes, delimiter)
};
if bytes.is_empty() {
return Ok(original_bytes_len);
}
Expand All @@ -421,7 +450,7 @@ pub(super) fn parse_lines(
if let Some(c) = comment_char {
// line is a comment -> skip
if bytes[0] == c {
let bytes_rem = skip_this_line(bytes, quote_char);
let bytes_rem = skip_this_line(bytes, quote_char, eol_char);
bytes = bytes_rem;
continue;
}
Expand All @@ -434,7 +463,7 @@ pub(super) fn parse_lines(
let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
let mut processed_fields = 0;

let mut iter = SplitFields::new(bytes, delimiter, quote_char);
let mut iter = SplitFields::new(bytes, delimiter, quote_char, eol_char);
let mut idx = 0u32;
let mut read_sol = 0;
loop {
Expand Down Expand Up @@ -510,11 +539,14 @@ pub(super) fn parse_lines(
match projection_iter.next() {
Some(p) => next_projected = p,
None => {
if bytes.get(read_sol - 1) == Some(&b'\n') {
if bytes.get(read_sol - 1) == Some(&eol_char) {
bytes = &bytes[read_sol..];
} else {
let bytes_rem =
skip_this_line(&bytes[read_sol - 1..], quote_char);
let bytes_rem = skip_this_line(
&bytes[read_sol - 1..],
quote_char,
eol_char,
);
bytes = bytes_rem;
}
break;
Expand Down Expand Up @@ -550,14 +582,14 @@ mod test {
#[test]
fn test_splitfields() {
let input = "\"foo\",\"bar\"";
let mut fields = SplitFields::new(input.as_bytes(), b',', Some(b'"'));
let mut fields = SplitFields::new(input.as_bytes(), b',', Some(b'"'), b'\n');

assert_eq!(fields.next(), Some(("\"foo\"".as_bytes(), true)));
assert_eq!(fields.next(), Some(("\"bar\"".as_bytes(), true)));
assert_eq!(fields.next(), None);

let input2 = "\"foo\n bar\";\"baz\";12345";
let mut fields2 = SplitFields::new(input2.as_bytes(), b';', Some(b'"'));
let mut fields2 = SplitFields::new(input2.as_bytes(), b';', Some(b'"'), b'\n');

assert_eq!(fields2.next(), Some(("\"foo\n bar\"".as_bytes(), true)));
assert_eq!(fields2.next(), Some(("\"baz\"".as_bytes(), true)));
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ where
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValues>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&'a [ScanAggregation]>,
Expand Down Expand Up @@ -208,6 +209,11 @@ where
self
}

pub fn with_end_of_line_char(mut self, eol_char: u8) -> Self {
self.eol_char = eol_char;
self
}

/// Set values that will be interpreted as missing/ null. Note that any value you set as null value
/// will not be escaped, so if quotation marks are part of the null value you should include them.
pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
Expand Down Expand Up @@ -344,6 +350,7 @@ where
chunk_size: 1 << 18,
low_memory: false,
comment_char: None,
eol_char: b'\n',
null_values: None,
predicate: None,
aggregate: None,
Expand Down Expand Up @@ -424,6 +431,7 @@ where
self.low_memory,
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values,
self.predicate,
self.aggregate,
Expand Down Expand Up @@ -455,6 +463,7 @@ where
self.low_memory,
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values,
self.predicate,
self.aggregate,
Expand Down

0 comments on commit 5c3430f

Please sign in to comment.