Skip to content

Commit

Permalink
Improve partitioned groupby (#3263)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 30, 2022
1 parent e093f90 commit c8cc566
Show file tree
Hide file tree
Showing 20 changed files with 431 additions and 320 deletions.
4 changes: 2 additions & 2 deletions polars/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ integration-tests:
miri:
# not tested on all features because miri does not support SIMD
# some tests are also filtered, because miri cannot deal with the rayon threadpool
# Miri also reports UB in prettytable.rs, so we must toggle that feature off.
MIRIFLAGS="-Zmiri-disable-isolation" \
# we ignore leaks because the thread pool of rayon is never killed.
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" \
POLARS_ALLOW_EXTENSION=1 \
cargo miri test \
--no-default-features \
Expand Down
43 changes: 36 additions & 7 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,44 @@ pub fn split_series(s: &Series, n: usize) -> Result<Vec<Series>> {
#[cfg(feature = "private")]
#[doc(hidden)]
pub fn split_df(df: &DataFrame, n: usize) -> Result<Vec<DataFrame>> {
trait Len {
fn len(&self) -> usize;
}
impl Len for DataFrame {
fn len(&self) -> usize {
self.height()
let total_len = df.height();
let chunk_size = total_len / n;
let mut out = Vec::with_capacity(n);

for i in 0..n {
let offset = i * chunk_size;
let len = if i == (n - 1) {
total_len - offset
} else {
chunk_size
};
let df = df.slice((i * chunk_size) as i64, len);
if df.n_chunks()? > 1 {
let iter = df.iter_chunks().map(|chunk| {
DataFrame::new_no_checks(
df.iter()
.zip(chunk.into_arrays())
.map(|(s, arr)| {
// Safety:
// datatypes are correct
unsafe {
Series::from_chunks_and_dtype_unchecked(
s.name(),
vec![arr],
s.dtype(),
)
}
})
.collect(),
)
});
out.extend(iter)
} else {
out.push(df)
}
}
split_array!(df, n, i64)

Ok(out)
}

pub fn slice_slice<T>(vals: &[T], offset: i64, len: usize) -> &[T] {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl<'a> CoreReader<'a> {
total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;

// if we only need to parse n_rows,
// we first try to use the line statistics the total bytes we need to process
// we first try to use the line statistics to estimate the total bytes we need to process
if let Some(n_rows) = self.n_rows {
total_rows = std::cmp::min(n_rows, total_rows);

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/csv_core/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) fn next_line_position(
return None;
}
loop {
let pos = input.iter().position(|b| *b == b'\n')? + 1;
let pos = memchr::memchr(b'\n', input)? + 1;
if input.len() - pos == 0 {
return None;
}
Expand Down Expand Up @@ -133,7 +133,7 @@ pub(crate) fn get_line_stats(bytes: &[u8], n_lines: usize) -> Option<(f32, f32)>
return None;
}
bytes_trunc = &bytes[n_read..];
match bytes_trunc.iter().position(|&b| b == b'\n') {
match memchr::memchr(b'\n', bytes_trunc) {
Some(position) => {
n_read += position + 1;
lengths.push(position + 1);
Expand Down
11 changes: 9 additions & 2 deletions polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub fn infer_file_schema(

let byterecord = SplitFields::new(header_line, delimiter, quote_char);
if has_header {
byterecord
let headers = byterecord
.map(|(slice, needs_escaping)| {
let slice_escaped = if needs_escaping && (slice.len() >= 2) {
&slice[1..(slice.len() - 1)]
Expand All @@ -211,7 +211,14 @@ pub fn infer_file_schema(
let s = parse_bytes_with_encoding(slice_escaped, encoding)?;
Ok(s.into())
})
.collect::<Result<_>>()?
.collect::<Result<Vec<_>>>()?;

if PlHashSet::from_iter(headers.iter()).len() != headers.len() {
return Err(PolarsError::ComputeError(
"CSV header contains duplicate column names".into(),
));
}
headers
} else {
let mut column_names: Vec<String> = byterecord
.enumerate()
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ regex = { version = "1.5", optional = true }
serde = { version = "1", features = ["derive"], optional = true }

polars-arrow = { version = "0.21.0", path = "../polars-arrow" }
polars-core = { version = "0.21.0", path = "../polars-core", features = ["lazy", "private", "zip_with"], default-features = false }
polars-core = { version = "0.21.0", path = "../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false }
polars-io = { version = "0.21.0", path = "../polars-io", features = ["lazy", "csv-file", "private"], default-features = false }
polars-time = { version = "0.21.0", path = "../polars-time", optional = true }
polars-utils = { version = "0.21.0", path = "../polars-utils" }
Expand Down

0 comments on commit c8cc566

Please sign in to comment.