Skip to content

Commit

Permalink
fix stackoverflow in parquet stats evaluation (#2677)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 17, 2022
1 parent 7617eca commit b92af07
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
4 changes: 4 additions & 0 deletions polars/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ impl ColumnStats {
self.0.data_type().into()
}

pub fn null_count(&self) -> Option<usize> {
self.0.null_count().map(|v| v as usize)
}

pub fn to_min_max(&self) -> Option<Series> {
let name = "";
use DataType::*;
Expand Down
10 changes: 2 additions & 8 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ impl PhysicalExpr for BinaryExpr {
}
}

fn to_field(&self, _input_schema: &Schema) -> Result<Field> {
todo!()
fn to_field(&self, input_schema: &Schema) -> Result<Field> {
self.expr.to_field(input_schema, Context::Default)
}

fn as_agg_expr(&self) -> Result<&dyn PhysicalAggregation> {
Expand Down Expand Up @@ -408,12 +408,6 @@ mod stats {
Operator::Or => Ok(l.should_read(stats)? || r.should_read(stats)?),
_ => Ok(true),
},
// This branch is probably never hit
(Some(other), None) | (None, Some(other)) => match self.op {
Operator::And => Ok(self.should_read(stats)? && other.should_read(stats)?),
Operator::Or => Ok(self.should_read(stats)? || other.should_read(stats)?),
_ => Ok(true),
},
_ => self.impl_should_read(stats),
}
}
Expand Down
27 changes: 27 additions & 0 deletions polars/polars-lazy/src/physical_plan/expressions/is_null.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use crate::utils::expr_to_root_column_name;
use polars_core::frame::groupby::GroupsProxy;
use polars_core::prelude::*;
#[cfg(feature = "parquet")]
use polars_io::predicates::StatsEvaluator;
#[cfg(feature = "parquet")]
use polars_io::prelude::predicates::BatchStats;
use std::sync::Arc;

pub struct IsNullExpr {
Expand Down Expand Up @@ -45,4 +50,26 @@ impl PhysicalExpr for IsNullExpr {
fn to_field(&self, _input_schema: &Schema) -> Result<Field> {
Ok(Field::new("is_null", DataType::Boolean))
}
#[cfg(feature = "parquet")]
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
Some(self)
}
}

#[cfg(feature = "parquet")]
impl StatsEvaluator for IsNullExpr {
fn should_read(&self, stats: &BatchStats) -> Result<bool> {
let root = expr_to_root_column_name(&self.expr)?;

let read = true;
let skip = false;

match stats.get_stats(&root).ok() {
Some(st) => match st.null_count() {
Some(0) => Ok(skip),
_ => Ok(read),
},
None => Ok(read),
}
}
}
28 changes: 24 additions & 4 deletions py-polars/tests/lazy_io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from os import path

import pandas as pd
import pytest

import polars as pl


def test_categorical_parquet_statistics() -> None:
file = path.join(path.dirname(__file__), "books.parquet")
@pytest.fixture
def cwd() -> str:
return path.dirname(__file__)


def test_categorical_parquet_statistics(cwd: str) -> None:
file = path.join(cwd, "books.parquet")
(
pl.DataFrame(
{
Expand Down Expand Up @@ -35,9 +43,21 @@ def test_categorical_parquet_statistics() -> None:
assert df.shape == (4, 3)


def test_null_parquet() -> None:
file = path.join(path.dirname(__file__), "null.parquet")
def test_null_parquet(cwd: str) -> None:
file = path.join(cwd, "null.parquet")
df = pl.DataFrame([pl.Series("foo", [], dtype=pl.Int8)])
df.to_parquet(file)
out = pl.read_parquet(file)
assert out.frame_equal(df)


def test_binary_parquet_stats(cwd: str) -> None:
file = path.join(cwd, "binary_stats.parquet")
df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})
df1.to_parquet(file, engine="pyarrow")
df = (
pl.scan_parquet(file)
.filter(pl.col("a").is_not_null() & (pl.col("a") > 4))
.collect()
)
assert df["a"].to_list() == [5.0, 5.0]

0 comments on commit b92af07

Please sign in to comment.