Skip to content

Commit

Permalink
csv: ignore quoted lines in skip lines (#4191)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 31, 2022
1 parent a1eff29 commit 0c649c3
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 15 deletions.
20 changes: 17 additions & 3 deletions polars/polars-io/src/csv/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) fn skip_bom(input: &[u8]) -> &[u8] {
/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
pub(crate) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
let pos = input.iter().position(|b| *b == eol_char)? + 1;
let pos = memchr::memchr(eol_char, input)? + 1;
if input.len() - pos == 0 {
return None;
}
Expand All @@ -40,7 +40,10 @@ pub(crate) fn next_line_position(
if input.len() - pos == 0 {
return None;
}
let line = SplitLines::new(&input[pos..], eol_char).next();
debug_assert!(pos <= input.len());
let new_input = unsafe { input.get_unchecked(pos..) };
let line = SplitLines::new(new_input, eol_char).next();

if let Some(line) = line {
if SplitFields::new(line, delimiter, quote_char, eol_char)
.into_iter()
Expand All @@ -49,10 +52,21 @@ pub(crate) fn next_line_position(
{
return Some(total_pos + pos);
} else {
input = &input[pos + 1..];
debug_assert!(pos < input.len());
unsafe {
input = input.get_unchecked(pos + 1..);
}
total_pos += pos + 1;
}
} else {
// no new line found, check latest line (without eol) for number of fields
if SplitFields::new(new_input, delimiter, quote_char, eol_char)
.into_iter()
.count()
== expected_fields
{
return Some(total_pos + pos);
}
return None;
}
}
Expand Down
12 changes: 6 additions & 6 deletions polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ where
n_rows: Option<usize>,
// used by error ignore logic
max_records: Option<usize>,
skip_rows: usize,
skip_rows_before_header: usize,
/// Optional indexes of the columns to project
projection: Option<Vec<usize>>,
/// Optional column names to project/ select.
Expand Down Expand Up @@ -179,9 +179,9 @@ where
self
}

/// Skip the first `n` rows during parsing. The header will be parsed an `n` lines.
/// Skip the first `n` rows during parsing. The header will be parsed at `n` lines.
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.skip_rows = skip_rows;
self.skip_rows_before_header = skip_rows;
self
}

Expand Down Expand Up @@ -334,7 +334,7 @@ where
rechunk: true,
n_rows: None,
max_records: Some(128),
skip_rows: 0,
skip_rows_before_header: 0,
projection: None,
delimiter: None,
has_header: true,
Expand Down Expand Up @@ -414,7 +414,7 @@ where
let mut csv_reader = CoreReader::new(
reader_bytes,
self.n_rows,
self.skip_rows,
self.skip_rows_before_header,
self.projection,
self.max_records,
self.delimiter,
Expand Down Expand Up @@ -446,7 +446,7 @@ where
let mut csv_reader = CoreReader::new(
reader_bytes,
self.n_rows,
self.skip_rows,
self.skip_rows_before_header,
self.projection,
self.max_records,
self.delimiter,
Expand Down
33 changes: 27 additions & 6 deletions polars/polars-io/src/csv/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ pub(crate) struct CoreReader<'a> {
/// Current line number, used in error reporting
line_number: usize,
ignore_parser_errors: bool,
skip_rows: usize,
skip_rows_before_header: usize,
// after the header, we need to take embedded lines into account
skip_rows_after_header: usize,
n_rows: Option<usize>,
encoding: CsvEncoding,
n_threads: Option<usize>,
Expand Down Expand Up @@ -234,7 +236,6 @@ impl<'a> CoreReader<'a> {
}
}
};
skip_rows += skip_rows_after_header;
if let Some(dtypes) = dtype_overwrite {
let mut s = schema.into_owned();
for (index, dt) in dtypes.iter().enumerate() {
Expand Down Expand Up @@ -267,7 +268,8 @@ impl<'a> CoreReader<'a> {
projection,
line_number: if has_header { 1 } else { 0 },
ignore_parser_errors,
skip_rows,
skip_rows_before_header: skip_rows,
skip_rows_after_header,
n_rows,
encoding,
n_threads,
Expand Down Expand Up @@ -307,15 +309,34 @@ impl<'a> CoreReader<'a> {
bytes = skip_header(bytes, eol_char).0;
}

if self.skip_rows > 0 {
for _ in 0..self.skip_rows {
// This does not check embedding of new line chars in string quotes.
if self.skip_rows_before_header > 0 {
for _ in 0..self.skip_rows_before_header {
let pos = next_line_position_naive(bytes, eol_char)
.ok_or_else(|| PolarsError::NoData("not enough lines to skip".into()))?;
bytes = &bytes[pos..];
}
}

if self.skip_rows_after_header > 0 {
for _ in 0..self.skip_rows_after_header {
let pos = match bytes.first() {
Some(first) if Some(*first) == self.comment_char => {
next_line_position_naive(bytes, eol_char)
}
_ => next_line_position(
bytes,
self.schema.len(),
self.delimiter,
self.quote_char,
eol_char,
),
}
.ok_or_else(|| PolarsError::NoData("not enough lines to skip".into()))?;

bytes = &bytes[pos..];
}
}

let starting_point_offset = bytes.as_ptr() as usize - starting_point_offset;

Ok((bytes, starting_point_offset))
Expand Down
1 change: 1 addition & 0 deletions polars/tests/it/io/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ fn test_skip_rows() -> Result<()> {
.with_delimiter(b' ')
.finish()?;

dbg!(&df);
assert_eq!(df.height(), 3);
Ok(())
}
Expand Down
16 changes: 16 additions & 0 deletions py-polars/tests/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,19 @@ def test_csv_write_escape_newlines() -> None:
f.seek(0)
read_df = pl.read_csv(f)
assert df.frame_equal(read_df)


def test_skip_new_line_embedded_lines() -> None:
csv = r"""a,b,c,d,e\n
1,2,3,"\n Test",\n
4,5,6,"Test A",\n
7,8,9,"Test B \n",\n"""

df = pl.read_csv(csv.encode(), skip_rows_after_header=1, infer_schema_length=0)
assert df.to_dict(False) == {
"a": ["4", "7"],
"b": ["5", "8"],
"c": ["6", "9"],
"d": ["Test A", "Test B \\n"],
"e\\n": ["\\n", "\\n"],
}

0 comments on commit 0c649c3

Please sign in to comment.