Skip to content

Commit

Permalink
test[rust]: assert file chaches are emptied (#4561)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 25, 2022
1 parent 8fbee3d commit 40e4e77
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 31 deletions.
67 changes: 37 additions & 30 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,38 @@ impl LazyFrame {
lp_arena.replace(lp_top, alp);
}

// make sure its before slice pushdown.
if projection_pushdown {
rules.push(Box::new(FastProjection {}));
}
rules.push(Box::new(DelayRechunk {}));

if slice_pushdown {
let slice_pushdown_opt = SlicePushDown {};
let alp = lp_arena.take(lp_top);
let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;

lp_arena.replace(lp_top, alp);

// expressions use the stack optimizer
rules.push(Box::new(slice_pushdown_opt));
}
if type_coercion {
rules.push(Box::new(TypeCoercionRule {}))
}
// this optimization removes branches, so we must do it when type coercion
// is completed
if simplify_expr {
rules.push(Box::new(SimplifyBooleanRule {}));
}

if aggregate_pushdown {
rules.push(Box::new(AggregatePushdown::new()))
}

// make sure that we do that once slice pushdown
// and predicate pushdown are done. At that moment
// the file fingerprints are finished.
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
if agg_scan_projection {
// we do this so that expressions are simplified created by the pushdown optimizations
Expand Down Expand Up @@ -623,36 +655,6 @@ impl LazyFrame {
);
}

// make sure its before slice pushdown.
if projection_pushdown {
rules.push(Box::new(FastProjection {}));
}
rules.push(Box::new(DelayRechunk {}));

if slice_pushdown {
let slice_pushdown_opt = SlicePushDown {};
let alp = lp_arena.take(lp_top);
let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;

lp_arena.replace(lp_top, alp);

// expressions use the stack optimizer
rules.push(Box::new(slice_pushdown_opt));
}

if type_coercion {
rules.push(Box::new(TypeCoercionRule {}))
}
// this optimization removes branches, so we must do it when type coercion
// is completed
if simplify_expr {
rules.push(Box::new(SimplifyBooleanRule {}));
}

if aggregate_pushdown {
rules.push(Box::new(AggregatePushdown::new()))
}

rules.push(Box::new(ReplaceDropNulls {}));

lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top);
Expand Down Expand Up @@ -729,6 +731,11 @@ impl LazyFrame {

let mut state = ExecutionState::with_finger_prints(finger_prints);
let out = physical_plan.execute(&mut state);
#[cfg(debug_assertions)]
{
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
state.file_cache.assert_empty();
}
#[cfg(feature = "dtype-categorical")]
if use_string_cache {
toggle_string_cache(!use_string_cache);
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-lazy/src/physical_plan/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ impl FileCache {

Self { inner }
}

#[cfg(debug_assertions)]
pub(crate) fn assert_empty(&self) {
for (_, guard) in self.inner.iter() {
let state = guard.lock();
assert!(state.1.is_empty());
}
}

pub(crate) fn read<F>(
&self,
finger_print: FileFingerPrint,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ExecutionState {
df_cache: Arc<Mutex<PlHashMap<usize, DataFrame>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
pub(super) file_cache: FileCache,
pub(crate) file_cache: FileCache,
pub(super) schema_cache: Option<SchemaRef>,
/// Used by Window Expression to prevent redundant grouping
pub(super) group_tuples: GroupsProxyCache,
Expand Down

0 comments on commit 40e4e77

Please sign in to comment.