Skip to content

Commit

Permalink
fix invalid read statistic in scan csv
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 18, 2021
1 parent 200f769 commit 3f7d7cc
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 36 deletions.
3 changes: 2 additions & 1 deletion polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ where
schema_overwrite: None,
dtype_overwrite: None,
sample_size: 1024,
chunk_size: 8192,
chunk_size: 1 << 16,
low_memory: false,
comment_char: None,
null_values: None,
Expand Down Expand Up @@ -1263,6 +1263,7 @@ linenum,last_name,first_name
Ok(())
}

#[test]
fn test_projection_and_quoting() -> Result<()> {
let csv = "a,b,c,d
A1,'B1',C1,1
Expand Down
16 changes: 1 addition & 15 deletions polars/polars-io/src/csv_core/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ trait ParsedBuffer<T> {
&mut self,
bytes: &[u8],
ignore_errors: bool,
start_pos: usize,
_needs_escaping: bool,
) -> Result<()>;
}
Expand All @@ -91,7 +90,6 @@ where
&mut self,
bytes: &[u8],
ignore_errors: bool,
_start_pos: usize,
_needs_escaping: bool,
) -> Result<()> {
if bytes.is_empty() {
Expand Down Expand Up @@ -159,7 +157,6 @@ impl ParsedBuffer<Utf8Type> for Utf8Field {
&mut self,
bytes: &[u8],
ignore_errors: bool,
_start_pos: usize,
needs_escaping: bool,
) -> Result<()> {
// Only for lossy utf8 we check utf8 now. Otherwise we check all utf8 at the end.
Expand Down Expand Up @@ -224,7 +221,6 @@ impl ParsedBuffer<BooleanType> for BooleanChunkedBuilder {
&mut self,
bytes: &[u8],
ignore_errors: bool,
start_pos: usize,
_needs_escaping: bool,
) -> Result<()> {
if bytes.eq_ignore_ascii_case(b"false") {
Expand All @@ -236,8 +232,7 @@ impl ParsedBuffer<BooleanType> for BooleanChunkedBuilder {
} else {
return Err(PolarsError::ComputeError(
format!(
"Error while parsing value {} at byte position {} as boolean",
start_pos,
"Error while parsing value {} as boolean",
String::from_utf8_lossy(bytes)
)
.into(),
Expand Down Expand Up @@ -400,7 +395,6 @@ impl Buffer {
&mut self,
bytes: &[u8],
ignore_errors: bool,
start_pos: usize,
needs_escaping: bool,
) -> Result<()> {
use Buffer::*;
Expand All @@ -409,15 +403,13 @@ impl Buffer {
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
),
Int32(buf) => {
<PrimitiveChunkedBuilder<Int32Type> as ParsedBuffer<Int32Type>>::parse_bytes(
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
)
}
Expand All @@ -426,7 +418,6 @@ impl Buffer {
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
)
}
Expand All @@ -435,7 +426,6 @@ impl Buffer {
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
)
}
Expand All @@ -444,7 +434,6 @@ impl Buffer {
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
)
}
Expand All @@ -453,7 +442,6 @@ impl Buffer {
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
)
}
Expand All @@ -462,15 +450,13 @@ impl Buffer {
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
)
}
Utf8(buf) => <Utf8Field as ParsedBuffer<Utf8Type>>::parse_bytes(
buf,
bytes,
ignore_errors,
start_pos,
needs_escaping,
),
}
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ impl<'a> CoreReader<'a> {
let local_bytes = &bytes[read..stop_at_nbytes];

last_read = read;
read = parse_lines(
read += parse_lines(
local_bytes,
read,
delimiter,
Expand Down Expand Up @@ -530,7 +530,7 @@ impl<'a> CoreReader<'a> {
let local_bytes = &bytes[read..stop_at_nbytes];

last_read = read;
read = parse_lines(
read += parse_lines(
local_bytes,
read,
delimiter,
Expand All @@ -542,7 +542,7 @@ impl<'a> CoreReader<'a> {
ignore_parser_errors,
// chunk size doesn't really matter anymore,
// less calls if we increase the size
chunk_size * 320000,
usize::MAX,
)?;
}
Ok(DataFrame::new_no_checks(
Expand Down
29 changes: 12 additions & 17 deletions polars/polars-io/src/csv_core/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,30 +395,29 @@ pub(crate) fn parse_lines(
ignore_parser_errors: bool,
n_lines: usize,
) -> Result<usize> {
// we use the pointers to track the no of bytes read.
let start = bytes.as_ptr() as usize;
let original_bytes_len = bytes.len();
let n_lines = n_lines as u32;
// This variable will store the number of bytes we read. It is important to do this bookkeeping
// to be able to correctly parse the strings later.
let mut read = offset;

let mut line_count = 0u32;
loop {
if line_count > n_lines {
return Ok(read);
let end = bytes.as_ptr() as usize;
return Ok(end - start);
}
let (b, r) = skip_whitespace(bytes);
let (b, _) = skip_whitespace(bytes);
bytes = b;
if bytes.is_empty() {
return Ok(read);
return Ok(original_bytes_len);
}
read += r;

// deal with comments
if let Some(c) = comment_char {
// line is a comment -> skip
if bytes[0] == c {
let (bytes_rem, len) = skip_this_line(bytes, quote_char, 0);
let (bytes_rem, _) = skip_this_line(bytes, quote_char, 0);
bytes = bytes_rem;
read += len;
continue;
}
}
Expand All @@ -437,8 +436,7 @@ pub(crate) fn parse_lines(
loop {
let mut read_sol = 0;

let track_bytes = |read: &mut usize, bytes: &mut &[u8], read_sol: usize| {
*read += read_sol;
let track_bytes = |bytes: &mut &[u8], read_sol: usize| {
// benchmarking showed it is 6% faster to take the min of these two
// not needed for correctness.
*bytes = &bytes[std::cmp::min(read_sol, bytes.len())..];
Expand Down Expand Up @@ -478,7 +476,7 @@ pub(crate) fn parse_lines(
if add_null {
buf.add_null()
} else {
buf.add(field, ignore_parser_errors, read, needs_escaping)
buf.add(field, ignore_parser_errors, needs_escaping)
.map_err(|e| {
PolarsError::ComputeError(
format!(
Expand All @@ -499,18 +497,15 @@ pub(crate) fn parse_lines(
// if we have all projected columns we are done with this line
match a {
Some(p) => {
track_bytes(&mut read, &mut bytes, read_sol);
track_bytes(&mut bytes, read_sol);
next_projected = p
}
None => {
if let Some(b'\n') = bytes.get(0) {
bytes = &bytes[read_sol..];
read += read_sol
} else {
let (bytes_rem, len) =
skip_this_line(bytes, quote_char, offset);
let (bytes_rem, _) = skip_this_line(bytes, quote_char, offset);
bytes = bytes_rem;
read += len;
}
break;
}
Expand Down

0 comments on commit 3f7d7cc

Please sign in to comment.