Skip to content

Commit

Permalink
more csv-optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 14, 2021
1 parent 9cb382a commit 692c6ac
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 39 deletions.
3 changes: 2 additions & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2",
#arrow = { package = "arrow2", version="0.5.3", --default-features=false }
polars-core = {version = "0.16.0", path = "../polars-core", features = ["private"], default-features=false}
polars-arrow = {version = "0.16.0", path = "../polars-arrow"}
lexical = {version = "6", optional = true}
lexical = {version = "6", optional = true, default-features=false, features = ["std", "parse-floats", "parse-integers"]}
num_cpus = "1.13.0"
csv-core = {version = "0.1.10", optional=true}
regex = "1.4"
lazy_static = "1.4"
memmap = { package = "memmap2", version = "0.5.0", optional=true}
memchr = "2.4"
anyhow = "1.0"
rayon = "1.5"
ahash = "0.7"
Expand Down
27 changes: 11 additions & 16 deletions polars/polars-io/src/csv_core/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ where
self.append_null()
} else {
let bytes = drop_quotes(bytes);
let result = T::parse(bytes);
// its faster to work on options.
// if we need to throw an error, we parse again to be able to throw the error
let result = T::parse(bytes).ok();

match (result, ignore_errors) {
(Ok(value), _) => self.append_value(value),
(Err(_), true) => self.append_null(),
(Err(err), _) => {
return Err(err);
(Some(value), _) => self.append_value(value),
(None, true) => self.append_null(),
(None, _) => {
T::parse(bytes)?;
}
};
}
Expand Down Expand Up @@ -161,11 +163,10 @@ impl ParsedBuffer<Utf8Type> for Utf8Field {
needs_escaping: bool,
) -> Result<()> {
// Only for lossy utf8 we check utf8 now. Otherwise we check all utf8 at the end.
let parse_result = if !delay_utf8_validation(self.encoding, ignore_errors) {
// first check utf8 validity
simdutf8::basic::from_utf8(bytes).is_ok()
} else {
let parse_result = if delay_utf8_validation(self.encoding, ignore_errors) {
true
} else {
simdutf8::basic::from_utf8(bytes).is_ok()
};
let data_len = self.data.len();

Expand All @@ -179,13 +180,7 @@ impl ParsedBuffer<Utf8Type> for Utf8Field {
let n_written = if needs_escaping {
// Safety:
// we just allocated enough capacity and data_len is correct.
unsafe {
let out_buf = std::slice::from_raw_parts_mut(
self.data.as_mut_ptr().add(data_len),
bytes.len(),
);
escape_field(bytes, self.quote_char, out_buf)
}
unsafe { escape_field(bytes, self.quote_char, &mut self.data[data_len..]) }
} else {
self.data.extend_from_slice(bytes);
bytes.len()
Expand Down
35 changes: 13 additions & 22 deletions polars/polars-io/src/csv_core/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,21 +259,13 @@ impl<'a> SplitFields<'a> {
}

fn finish_eol(&mut self, need_escaping: bool, idx: usize) -> Option<(&'a [u8], bool)> {
if self.finished {
None
} else {
self.finished = true;
Some((&self.v[..idx], need_escaping))
}
self.finished = true;
Some((&self.v[..idx], need_escaping))
}

fn finish(&mut self, need_escaping: bool) -> Option<(&'a [u8], bool)> {
if self.finished {
None
} else {
self.finished = true;
Some((self.v, need_escaping))
}
self.finished = true;
Some((self.v, need_escaping))
}

fn eof_oel(&self, current_ch: u8) -> bool {
Expand Down Expand Up @@ -309,15 +301,15 @@ impl<'a> Iterator for SplitFields<'a> {

#[inline]
fn next(&mut self) -> Option<(&'a [u8], bool)> {
if self.finished {
if self.v.is_empty() || self.finished {
return None;
}

let mut needs_escaping = false;
// There can be strings with delimiters:
// "Street, City",

let pos = if self.quoting && !self.v.is_empty() && self.v[0] == self.quote_char {
let pos = if self.quoting && self.v[0] == self.quote_char {
needs_escaping = true;
// There can be pair of double-quotes within string.
// Each of the embedded double-quote characters must be represented
Expand Down Expand Up @@ -355,11 +347,7 @@ impl<'a> Iterator for SplitFields<'a> {

idx as usize
} else {
match self
.v
.iter()
.position(|x| *x == self.delimiter || *x == b'\n')
{
match memchr::memchr2(self.delimiter, b'\n', self.v) {
None => return self.finish(needs_escaping),
Some(idx) => match self.v[idx] {
b'\n' => return self.finish_eol(needs_escaping, idx),
Expand Down Expand Up @@ -407,11 +395,12 @@ pub(crate) fn parse_lines(
ignore_parser_errors: bool,
n_lines: usize,
) -> Result<usize> {
let n_lines = n_lines as u32;
// This variable will store the number of bytes we read. It is important to do this bookkeeping
// to be able to correctly parse the strings later.
let mut read = offset;

let mut line_count = 0;
let mut line_count = 0u32;
loop {
if line_count > n_lines {
return Ok(read);
Expand Down Expand Up @@ -444,12 +433,14 @@ pub(crate) fn parse_lines(
let mut processed_fields = 0;

let mut iter = SplitFields::new(bytes, delimiter, quote_char);
let mut idx = 0;
let mut idx = 0u32;
loop {
let mut read_sol = 0;

let track_bytes = |read: &mut usize, bytes: &mut &[u8], read_sol: usize| {
*read += read_sol;
// benchmarking showed it is 6% faster to take the min of these two
// not needed for correctness.
*bytes = &bytes[std::cmp::min(read_sol, bytes.len())..];
};

Expand All @@ -462,7 +453,7 @@ pub(crate) fn parse_lines(
// +1 is the split character that is consumed by the iterator.
read_sol += field_len + 1;

if (idx - 1) == next_projected {
if (idx - 1) == next_projected as u32 {
// the iterator is finished when it encounters a `\n`
// this could be preceded by a '\r'
if field_len > 0 && field[field_len - 1] == b'\r' {
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 692c6ac

Please sign in to comment.