Skip to content

Commit

Permalink
fix row_count behavior in lazy optimizations (#2567)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 7, 2022
1 parent 13e5062 commit 71a2491
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 15 deletions.
11 changes: 7 additions & 4 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl DataFrame {
/// let df1: DataFrame = df!("Name" => &["James", "Mary", "John", "Patricia"])?;
/// assert_eq!(df1.shape(), (4, 1));
///
/// let df2: DataFrame = df1.with_row_count("Id")?;
/// let df2: DataFrame = df1.with_row_count("Id", None)?;
/// assert_eq!(df2.shape(), (4, 2));
/// println!("{}", df2);
///
Expand All @@ -300,10 +300,13 @@ impl DataFrame {
/// | 3 | Patricia |
/// +-----+----------+
/// ```
pub fn with_row_count(&self, name: &str) -> Result<Self> {
pub fn with_row_count(&self, name: &str, offset: Option<u32>) -> Result<Self> {
let mut columns = Vec::with_capacity(self.columns.len() + 1);
columns
.push(UInt32Chunked::from_vec(name, (0..self.height() as u32).collect()).into_series());
let offset = offset.unwrap_or(0);
columns.push(
UInt32Chunked::from_vec(name, (offset..(self.height() as u32) + offset).collect())
.into_series(),
);

self.columns.iter().for_each(|s| columns.push(s.clone()));
DataFrame::new(columns)
Expand Down
12 changes: 9 additions & 3 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ impl LazyFrame {
}

/// Add a new column at index 0 that counts the rows.
pub fn with_row_count(self, name: &str) -> LazyFrame {
pub fn with_row_count(self, name: &str, offset: Option<u32>) -> LazyFrame {
let schema = self.schema();

let mut fields = schema.fields().clone();
Expand All @@ -1000,9 +1000,15 @@ impl LazyFrame {

let name = name.to_owned();

let opt = AllowedOptimizations {
slice_pushdown: false,
predicate_pushdown: false,
..Default::default()
};

self.map(
move |df: DataFrame| df.with_row_count(&name),
Some(AllowedOptimizations::default()),
move |df: DataFrame| df.with_row_count(&name, offset),
Some(opt),
Some(new_schema),
Some("WITH ROW COUNT"),
)
Expand Down
75 changes: 75 additions & 0 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,78 @@ fn test_lazy_filter_and_rename() {

assert_eq!(lf.collect().unwrap().get_column_names(), &["x", "b", "c"]);
}

#[test]
fn test_with_row_count_opts() -> Result<()> {
let df = df![
"a" => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
]?;

let out = df
.clone()
.lazy()
.with_row_count("row_nr", None)
.tail(5)
.collect()?;
let expected = df![
"row_nr" => [5, 6, 7, 8, 9],
"a" => [5, 6, 7, 8, 9],
]?;
assert!(out.frame_equal(&expected));
let out = df
.clone()
.lazy()
.with_row_count("row_nr", None)
.slice(1, 2)
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[1, 2]
);

let out = df
.clone()
.lazy()
.with_row_count("row_nr", None)
.filter(col("a").eq(lit(3i32)))
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[3]
);

let out = df
.clone()
.lazy()
.slice(1, 2)
.with_row_count("row_nr", None)
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[0, 1]
);

let out = df
.lazy()
.filter(col("a").eq(lit(3i32)))
.with_row_count("row_nr", None)
.collect()?;
assert_eq!(
out.column("row_nr")?
.u32()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[0]
);

Ok(())
}
6 changes: 4 additions & 2 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2288,16 +2288,18 @@ def pipe(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
"""
return func(self, *args, **kwargs)

def with_row_count(self, name: str = "row_nr") -> "DataFrame":
def with_row_count(self, name: str = "row_nr", offset: int = 0) -> "DataFrame":
"""
Add a column at index 0 that counts the rows.
Parameters
----------
name
Name of the column to add.
offset
Start the row count at this offset. Default = 0
"""
return wrap_df(self._df.with_row_count(name))
return wrap_df(self._df.with_row_count(name, offset))

def groupby(
self,
Expand Down
6 changes: 4 additions & 2 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,16 +1080,18 @@ def first(self) -> "LazyFrame":
"""
return self.slice(0, 1)

def with_row_count(self, name: str = "row_nr") -> "LazyFrame":
def with_row_count(self, name: str = "row_nr", offset: int = 0) -> "LazyFrame":
"""
Add a column at index 0 that counts the rows.
Parameters
----------
name
Name of the column to add.
offset
Start the row count at this offset
"""
return wrap_ldf(self._ldf.with_row_count(name))
return wrap_ldf(self._ldf.with_row_count(name, offset))

def fill_null(self, fill_value: Union[int, str, "pli.Expr"]) -> "LazyFrame":
"""
Expand Down
7 changes: 5 additions & 2 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,8 +806,11 @@ impl PyDataFrame {
}
}

pub fn with_row_count(&self, name: &str) -> PyResult<Self> {
let df = self.df.with_row_count(name).map_err(PyPolarsEr::from)?;
pub fn with_row_count(&self, name: &str, offset: Option<u32>) -> PyResult<Self> {
let df = self
.df
.with_row_count(name, offset)
.map_err(PyPolarsEr::from)?;
Ok(df.into())
}

Expand Down
4 changes: 2 additions & 2 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ impl PyLazyFrame {
ldf.melt(id_vars, value_vars).into()
}

pub fn with_row_count(&self, name: &str) -> Self {
pub fn with_row_count(&self, name: &str, offset: Option<u32>) -> Self {
let ldf = self.ldf.clone();
ldf.with_row_count(name).into()
ldf.with_row_count(name, offset).into()
}

pub fn map(&self, lambda: PyObject, predicate_pd: bool, projection_pd: bool) -> Self {
Expand Down

0 comments on commit 71a2491

Please sign in to comment.