Skip to content

Commit

Permalink
fix schemas of groupby rolling/dynamic (#3028)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 1, 2022
1 parent dd580b0 commit b12a2e8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 1 deletion.
16 changes: 15 additions & 1 deletion polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ impl<'a> ALogicalPlanBuilder<'a> {
maintain_order: bool,
options: GroupbyOptions,
) -> Self {
debug_assert!(!(keys.is_empty() && options.dynamic.is_none()));
let current_schema = self.schema();
// TODO! add this line if LogicalPlan is dropped in favor of ALogicalPlan
// let aggs = rewrite_projections(aggs, current_schema);
Expand All @@ -639,6 +638,21 @@ impl<'a> ALogicalPlanBuilder<'a> {
let other = aexprs_to_schema(&aggs, current_schema, Context::Aggregation, self.expr_arena);
schema.merge(other);

let index_columns = &[
options
.rolling
.as_ref()
.map(|options| &options.index_column),
options
.dynamic
.as_ref()
.map(|options| &options.index_column),
];
for &name in index_columns.iter().flatten() {
let dtype = current_schema.get(name).unwrap();
schema.with_column(name.clone(), dtype.clone());
}

let lp = ALogicalPlan::Aggregate {
input: self.root,
keys,
Expand Down
19 changes: 19 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,25 @@ impl LogicalPlanBuilder {
);
schema.merge(other);

let index_columns = &[
rolling_options
.as_ref()
.map(|options| &options.index_column),
dynamic_options
.as_ref()
.map(|options| &options.index_column),
];
for &name in index_columns.iter().flatten() {
let dtype = try_delayed!(
current_schema
.get(name)
.ok_or_else(|| PolarsError::NotFound(name.clone())),
self.0,
into
);
schema.with_column(name.clone(), dtype.clone());
}

LogicalPlan::Aggregate {
input: Box::new(self.0),
keys,
Expand Down
61 changes: 61 additions & 0 deletions polars/tests/it/lazy/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,64 @@ fn test_drop() -> Result<()> {
assert_eq!(out.width(), 0);
Ok(())
}

#[test]
#[cfg(feature = "dynamic_groupby")]
fn test_special_groupby_schemas() -> Result<()> {
let df = df![
"a" => [1, 2, 3, 4, 5],
"b" => [1, 2, 3, 4, 5],
]?;

let out = df
.clone()
.lazy()
.groupby_rolling(
[],
RollingGroupOptions {
index_column: "a".into(),
period: Duration::parse("2i"),
offset: Duration::parse("0i"),
closed_window: ClosedWindow::Left,
},
)
.agg([col("b").sum().alias("sum")])
.select([col("a"), col("sum")])
.collect()?;

assert_eq!(
out.column("sum")?
.i32()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[3, 5, 7, 9, 5]
);

let out = df
.lazy()
.groupby_dynamic(
[],
DynamicGroupOptions {
index_column: "a".into(),
every: Duration::parse("2i"),
period: Duration::parse("2i"),
offset: Duration::parse("0i"),
truncate: false,
include_boundaries: false,
closed_window: ClosedWindow::Left,
},
)
.agg([col("b").sum().alias("sum")])
.select([col("a"), col("sum")])
.collect()?;

assert_eq!(
out.column("sum")?
.i32()?
.into_no_null_iter()
.collect::<Vec<_>>(),
&[1, 5, 9]
);

Ok(())
}

0 comments on commit b12a2e8

Please sign in to comment.