Skip to content

Commit

Permalink
improve parquet statistics to composite predicates (#2376)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 14, 2022
1 parent 5b90000 commit a3e0a8a
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 31 deletions.
98 changes: 67 additions & 31 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,48 +251,62 @@ mod stats {
use polars_io::parquet::predicates::BatchStats;
use polars_io::predicates::StatsEvaluator;

impl Operator {
fn invert_arguments(&self) -> Option<Self> {
use Operator::*;
let op = match self {
Eq => Eq,
NotEq => NotEq,
Lt => Gt,
Gt => Lt,
LtEq => GtEq,
GtEq => LtEq,
_ => return None,
};
Some(op)
fn apply_operator_stats_rhs_lit(min_max: &Series, literal: &Series, op: Operator) -> bool {
match op {
Operator::Gt => {
// literal is bigger than max value
// selection needs all rows
ChunkCompare::<&Series>::gt(min_max, literal).all()
}
Operator::GtEq => {
// literal is bigger than max value
// selection needs all rows
ChunkCompare::<&Series>::gt_eq(min_max, literal).all()
}
Operator::Lt => {
// literal is smaller than min value
// selection needs all rows
ChunkCompare::<&Series>::lt(min_max, literal).all()
}
Operator::LtEq => {
// literal is smaller than min value
// selection needs all rows
ChunkCompare::<&Series>::lt_eq(min_max, literal).all()
}
// default: read the file
_ => true,
}
}

pub fn apply_operator_stats(literal: &Series, min_max: &Series, op: Operator) -> bool {
fn apply_operator_stats_lhs_lit(literal: &Series, min_max: &Series, op: Operator) -> bool {
match op {
Operator::Eq => {
// if literal equal min and max all are equal
!ChunkCompare::<&Series>::equal(literal, min_max).all()
}
Operator::Gt => {
// literal is bigger than max value
// selection needs all rows
!ChunkCompare::<&Series>::gt(literal, min_max).all()
ChunkCompare::<&Series>::gt(literal, min_max).all()
}
Operator::GtEq => {
// literal is bigger than max value
// selection needs all rows
ChunkCompare::<&Series>::gt_eq(literal, min_max).all()
}
Operator::Lt => {
// literal is smaller than min value
// selection needs all rows
!ChunkCompare::<&Series>::lt(literal, min_max).all()
ChunkCompare::<&Series>::lt(literal, min_max).all()
}
Operator::LtEq => {
// literal is smaller than min value
// selection needs all rows
ChunkCompare::<&Series>::lt_eq(literal, min_max).all()
}
// default: read the file
_ => true,
}
}

impl StatsEvaluator for BinaryExpr {
fn should_read(&self, stats: &BatchStats) -> Result<bool> {
if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
return Ok(true);
}
impl BinaryExpr {
fn impl_should_read(&self, stats: &BatchStats) -> Result<bool> {
let schema = stats.schema();
let fld_l = self.left.to_field(schema)?;
let fld_r = self.right.to_field(schema)?;
Expand All @@ -309,7 +323,7 @@ mod stats {
None => Ok(true),
Some(min_max_s) => {
let lit_s = self.right.evaluate(&dummy, &state).unwrap();
Ok(apply_operator_stats(&lit_s, &min_max_s, self.op))
Ok(apply_operator_stats_rhs_lit(&min_max_s, &lit_s, self.op))
}
}
}
Expand All @@ -319,11 +333,7 @@ mod stats {
None => Ok(true),
Some(min_max_s) => {
let lit_s = self.left.evaluate(&dummy, &state).unwrap();
if let Some(op) = self.op.invert_arguments() {
Ok(apply_operator_stats(&lit_s, &min_max_s, op))
} else {
Ok(true)
}
Ok(apply_operator_stats_lhs_lit(&lit_s, &min_max_s, self.op))
}
}
}
Expand All @@ -340,4 +350,30 @@ mod stats {
})
}
}

impl StatsEvaluator for BinaryExpr {
fn should_read(&self, stats: &BatchStats) -> Result<bool> {
if std::env::var("POLARS_NO_PARQUET_STATISTICS").is_ok() {
return Ok(true);
}

match (
self.left.as_stats_evaluator(),
self.right.as_stats_evaluator(),
) {
(Some(l), Some(r)) => match self.op {
Operator::And => Ok(l.should_read(stats)? && r.should_read(stats)?),
Operator::Or => Ok(l.should_read(stats)? || r.should_read(stats)?),
_ => Ok(true),
},
// This branch is probably never hit
(Some(other), None) | (None, Some(other)) => match self.op {
Operator::And => Ok(self.should_read(stats)? && other.should_read(stats)?),
Operator::Or => Ok(self.should_read(stats)? || other.should_read(stats)?),
_ => Ok(true),
},
_ => self.impl_should_read(stats),
}
}
}
}
48 changes: 48 additions & 0 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ fn test_parquet_statistics_no_skip() {
.collect()
.unwrap();
assert_eq!(out.shape(), (27, 4));

// Or operation
let out = scan_foods_parquet(par)
.filter(
col("sugars_g")
.lt(lit(0i32))
.or(col("fats_g").lt(lit(1000.0))),
)
.collect()
.unwrap();
assert_eq!(out.shape(), (27, 4));
}

#[test]
Expand All @@ -74,6 +85,7 @@ fn test_parquet_statistics() -> Result<()> {
std::env::set_var("POLARS_PANIC_IF_PARQUET_PARSED", "1");
let par = true;

// Test single predicates
let out = scan_foods_parquet(par)
.filter(col("calories").lt(lit(0i32)))
.collect()?;
Expand All @@ -93,6 +105,42 @@ fn test_parquet_statistics() -> Result<()> {
.filter(lit(1000i32).lt(col("calories")))
.collect()?;
assert_eq!(out.shape(), (0, 4));

// Test multiple predicates

// And operation
let out = scan_foods_parquet(par)
.filter(col("calories").lt(lit(0i32)))
.filter(col("calories").gt(lit(1000)))
.collect()?;
assert_eq!(out.shape(), (0, 4));

let out = scan_foods_parquet(par)
.filter(col("calories").lt(lit(0i32)))
.filter(col("calories").gt(lit(1000)))
.filter(col("calories").lt(lit(50i32)))
.collect()?;
assert_eq!(out.shape(), (0, 4));

let out = scan_foods_parquet(par)
.filter(
col("calories")
.lt(lit(0i32))
.and(col("fats_g").lt(lit(0.0))),
)
.collect()?;
assert_eq!(out.shape(), (0, 4));

// Or operation
let out = scan_foods_parquet(par)
.filter(
col("sugars_g")
.lt(lit(0i32))
.or(col("fats_g").gt(lit(1000.0))),
)
.collect()?;
assert_eq!(out.shape(), (0, 4));

std::env::remove_var("POLARS_PANIC_IF_PARQUET_PARSED");

Ok(())
Expand Down

0 comments on commit a3e0a8a

Please sign in to comment.