Skip to content

Commit

Permalink
fix bug in aggscanprojection
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 12, 2022
1 parent 2070233 commit 20b0857
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,8 @@ pub fn ternary_expr(predicate: Expr, truthy: Expr, falsy: Expr) -> Expr {
}

impl Expr {
#[cfg(feature = "private")]
/// overwrite the function name used for formatting
/// this is not intended to be used
pub fn with_fmt(self, name: &'static str) -> Expr {
if let Self::Function {
input,
Expand Down
41 changes: 21 additions & 20 deletions polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl fmt::Debug for LogicalPlan {
if let Some(columns) = &options.with_columns {
n_columns = format!("{}", columns.len());
}
dbg!(options);
write!(
f,
"PARQUET SCAN {}; PROJECT {}/{} COLUMNS; SELECTION: {:?}",
Expand Down Expand Up @@ -53,10 +54,10 @@ impl fmt::Debug for LogicalPlan {
)
}
Selection { predicate, input } => {
write!(f, "FILTER\n\t{:?}\nFROM\n\t{:?}", predicate, input)
write!(f, "FILTER {:?}\nFROM\n{:?}", predicate, input)
}
Melt { input, .. } => {
write!(f, "MELT\n\t{:?}", input)
write!(f, "MELT {:?}", input)
}
#[cfg(feature = "csv-file")]
CsvScan {
Expand Down Expand Up @@ -111,9 +112,9 @@ impl fmt::Debug for LogicalPlan {
Projection { expr, input, .. } => {
write!(
f,
"SELECT {:?} COLUMNS\n\
{:?}
\nFROM\n{:?}",
"SELECT {:?} COLUMNS: {:?}
FROM
{:?}",
expr.len(),
expr,
input
Expand Down Expand Up @@ -154,7 +155,7 @@ impl fmt::Debug for LogicalPlan {
Slice { input, offset, len } => {
write!(f, "{:?}\nSLICE[offset: {}, len: {}]", input, offset, len)
}
Udf { input, .. } => write!(f, "UDF {:?}", input),
Udf { input, .. } => write!(f, "UDF \n{:?}", input),
}
}
}
Expand Down Expand Up @@ -209,20 +210,20 @@ impl fmt::Debug for Expr {
Agg(agg) => {
use AggExpr::*;
match agg {
Min(expr) => write!(f, "AGG MIN {:?}", expr),
Max(expr) => write!(f, "AGG MAX {:?}", expr),
Median(expr) => write!(f, "AGG MEDIAN {:?}", expr),
Mean(expr) => write!(f, "AGG MEAN {:?}", expr),
First(expr) => write!(f, "AGG FIRST {:?}", expr),
Last(expr) => write!(f, "AGG LAST {:?}", expr),
List(expr) => write!(f, "AGG LIST {:?}", expr),
NUnique(expr) => write!(f, "AGG N UNIQUE {:?}", expr),
Sum(expr) => write!(f, "AGG SUM {:?}", expr),
AggGroups(expr) => write!(f, "AGG GROUPS {:?}", expr),
Count(expr) => write!(f, "AGG COUNT {:?}", expr),
Var(expr) => write!(f, "AGG VAR {:?}", expr),
Std(expr) => write!(f, "AGG STD {:?}", expr),
Quantile { expr, .. } => write!(f, "AGG QUANTILE {:?}", expr),
Min(expr) => write!(f, "{:?}.min()", expr),
Max(expr) => write!(f, "{:?}.max()", expr),
Median(expr) => write!(f, "{:?}.median()", expr),
Mean(expr) => write!(f, "{:?}.mean()", expr),
First(expr) => write!(f, "{:?}.first()", expr),
Last(expr) => write!(f, "{:?}.last()", expr),
List(expr) => write!(f, "{:?}.list()", expr),
NUnique(expr) => write!(f, "{:?}.n_unique()", expr),
Sum(expr) => write!(f, "{:?}.sum()", expr),
AggGroups(expr) => write!(f, "{:?}.groups()", expr),
Count(expr) => write!(f, "{:?}.count()", expr),
Var(expr) => write!(f, "{:?}.var()", expr),
Std(expr) => write!(f, "{:?}.var()", expr),
Quantile { expr, .. } => write!(f, "{:?}.quantile()", expr),
}
}
Cast {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ impl OptimizationRule for AggScanProjection {
mut options,
} = lp
{
let with_columns = self
.columns
.get(&path)
.map(|agg| agg.iter().cloned().collect());
let with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|name| schema.index_of(name).ok());
columns
});
// prevent infinite loop
if options.with_columns == with_columns {
let lp = ALogicalPlan::IpcScan {
Expand Down Expand Up @@ -148,10 +150,12 @@ impl OptimizationRule for AggScanProjection {
mut options,
} = lp
{
let mut with_columns = self
.columns
.get(&path)
.map(|agg| agg.iter().cloned().collect());
let mut with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|name| schema.index_of(name).ok());
columns
});
// prevent infinite loop
if options.with_columns == with_columns {
let lp = ALogicalPlan::ParquetScan {
Expand Down Expand Up @@ -192,10 +196,12 @@ impl OptimizationRule for AggScanProjection {
aggregate,
} = lp
{
let with_columns = self
.columns
.get(&path)
.map(|agg| agg.iter().cloned().collect());
let with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|name| schema.index_of(name).ok());
columns
});
if options.with_columns == with_columns {
let lp = ALogicalPlan::CsvScan {
path,
Expand Down
18 changes: 18 additions & 0 deletions polars/polars-lazy/src/tests/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,21 @@ fn test_join_suffix_and_drop() -> Result<()> {

Ok(())
}

#[test]
fn test_union_and_aggscan() -> Result<()> {
// a union vstacks columns and aggscan optimization determines columns to aggregate in a
// hashmap, if that doesn't set them sorted the vstack will panic.
let glob = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.parquet";
let lf = LazyFrame::scan_parquet(glob.into(), Default::default())?;

let lf = lf.filter(col("category").eq(lit("vegetables"))).select([
col("fats_g").sum().alias("sum"),
col("fats_g").cast(DataType::Float64).mean().alias("mean"),
]);

let out = lf.collect()?;
assert_eq!(out.shape(), (1, 2));

Ok(())
}

0 comments on commit 20b0857

Please sign in to comment.