Skip to content

Commit

Permalink
fix row count for u64 idx (#3285)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 3, 2022
1 parent 29e1737 commit ee76815
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 24 deletions.
18 changes: 12 additions & 6 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,25 +321,31 @@ impl DataFrame {
/// | 3 | Patricia |
/// +-----+----------+
/// ```
pub fn with_row_count(&self, name: &str, offset: Option<u32>) -> Result<Self> {
pub fn with_row_count(&self, name: &str, offset: Option<IdxSize>) -> Result<Self> {
let mut columns = Vec::with_capacity(self.columns.len() + 1);
let offset = offset.unwrap_or(0);
columns.push(
UInt32Chunked::from_vec(name, (offset..(self.height() as u32) + offset).collect())
.into_series(),
IdxCa::from_vec(
name,
(offset..(self.height() as IdxSize) + offset).collect(),
)
.into_series(),
);

columns.extend_from_slice(&self.columns);
DataFrame::new(columns)
}

/// Add a row count in place.
pub fn with_row_count_mut(&mut self, name: &str, offset: Option<u32>) -> &mut Self {
pub fn with_row_count_mut(&mut self, name: &str, offset: Option<IdxSize>) -> &mut Self {
let offset = offset.unwrap_or(0);
self.columns.insert(
0,
UInt32Chunked::from_vec(name, (offset..(self.height() as u32) + offset).collect())
.into_series(),
IdxCa::from_vec(
name,
(offset..(self.height() as IdxSize) + offset).collect(),
)
.into_series(),
);
self
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl<'a> CoreReader<'a> {
.map(|buf| buf.into_series())
.collect::<Result<_>>()?,
);
let current_row_count = local_df.height() as u32;
let current_row_count = local_df.height() as IdxSize;
if let Some(rc) = &self.row_count {
local_df.with_row_count_mut(&rc.name, Some(rc.offset));
};
Expand Down Expand Up @@ -630,7 +630,7 @@ impl<'a> CoreReader<'a> {
if let Some(rc) = &self.row_count {
df.with_row_count_mut(&rc.name, Some(rc.offset));
}
let n_read = df.height() as u32;
let n_read = df.height() as IdxSize;
Ok((df, n_read))
})
.collect::<Result<Vec<_>>>()
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub(crate) fn finish_reader<R: ArrowReader>(
let mut parsed_dfs = Vec::with_capacity(1024);

while let Some(batch) = reader.next_record_batch()? {
let current_num_rows = num_rows as u32;
let current_num_rows = num_rows as IdxSize;
num_rows += batch.len();
let mut df = DataFrame::try_from((batch, arrow_schema.fields.as_slice()))?;

Expand Down
3 changes: 2 additions & 1 deletion polars/polars-io/src/options.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use polars_arrow::prelude::IdxSize;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RowCount {
pub name: String,
pub offset: u32,
pub offset: IdxSize,
}
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let mut previous_row_count = 0;
for rg in 0..row_group_len {
let md = &file_metadata.row_groups[rg];
let current_row_count = md.num_rows() as u32;
let current_row_count = md.num_rows() as IdxSize;
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? {
Expand Down
3 changes: 1 addition & 2 deletions polars/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use crate::ArrowSchema;
use dirs::home_dir;
use polars_core::frame::DataFrame;
#[cfg(any(feature = "ipc", feature = "avro", feature = "parquet"))]
use polars_core::prelude::*;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -69,7 +68,7 @@ pub(crate) fn columns_to_projection(

/// Because of threading every row starts from `0` or from `offset`.
/// We must correct that so that they are monotonically increasing.
pub(crate) fn update_row_counts(dfs: &mut [(DataFrame, u32)]) {
pub(crate) fn update_row_counts(dfs: &mut [(DataFrame, IdxSize)]) {
if !dfs.is_empty() {
let mut previous = dfs[0].1;
for (df, n_read) in &mut dfs[1..] {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ impl LazyFrame {
/// # Warning
/// This can have a negative effect on query performance.
/// This may for instance block predicate pushdown optimization.
pub fn with_row_count(mut self, name: &str, offset: Option<u32>) -> LazyFrame {
pub fn with_row_count(mut self, name: &str, offset: Option<IdxSize>) -> LazyFrame {
match &mut self.logical_plan {
// Do the row count at scan
#[cfg(feature = "csv-file")]
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ fn skip_rows_and_slice() -> Result<()> {
#[test]
fn test_row_count() -> Result<()> {
let _guard = SINGLE_LOCK.lock().unwrap();
for offset in [0u32, 10] {
for offset in [0 as IdxSize, 10] {
let lf = LazyCsvReader::new(FOODS_CSV.to_string())
.with_row_count(Some(RowCount {
name: "rc".into(),
Expand All @@ -347,7 +347,7 @@ fn test_row_count() -> Result<()> {
let df = lf.collect()?;
let rc = df.column("rc")?;
assert_eq!(
rc.u32()?.into_no_null_iter().collect::<Vec<_>>(),
rc.idx()?.into_no_null_iter().collect::<Vec<_>>(),
(offset..27 + offset).collect::<Vec<_>>()
);

Expand All @@ -365,7 +365,7 @@ fn test_row_count() -> Result<()> {
let df = lf.collect()?;
let rc = df.column("rc")?;
assert_eq!(
rc.u32()?.into_no_null_iter().collect::<Vec<_>>(),
rc.idx()?.into_no_null_iter().collect::<Vec<_>>(),
(offset..27 + offset).collect::<Vec<_>>()
);

Expand All @@ -384,7 +384,7 @@ fn test_row_count() -> Result<()> {
let df = lf.collect()?;
let rc = df.column("rc")?;
assert_eq!(
rc.u32()?.into_no_null_iter().collect::<Vec<_>>(),
rc.idx()?.into_no_null_iter().collect::<Vec<_>>(),
(offset..27 + offset).collect::<Vec<_>>()
);
}
Expand Down
10 changes: 5 additions & 5 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ fn test_with_row_count_opts() -> Result<()> {
.tail(5)
.collect()?;
let expected = df![
"row_nr" => [5_u32, 6, 7, 8, 9],
"row_nr" => [5 as IdxSize, 6, 7, 8, 9],
"a" => [5, 6, 7, 8, 9],
]?;

Expand All @@ -328,7 +328,7 @@ fn test_with_row_count_opts() -> Result<()> {
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.idx()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[1, 2]
Expand All @@ -342,7 +342,7 @@ fn test_with_row_count_opts() -> Result<()> {
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.idx()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[3]
Expand All @@ -356,7 +356,7 @@ fn test_with_row_count_opts() -> Result<()> {
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.idx()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[0, 1]
Expand All @@ -369,7 +369,7 @@ fn test_with_row_count_opts() -> Result<()> {
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.idx()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[0]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/tests/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn test_row_count_pd() -> Result<()> {
.collect()?;

let expected = df![
"row_count" => [0u32, 1, 2],
"row_count" => [0 as IdxSize, 1, 2],
"x" => [3i32, 6, 9]
]?;

Expand Down

0 comments on commit ee76815

Please sign in to comment.