Skip to content

Commit

Permalink
fix(rust, python): don't recurse assign uniuns as it SO > 5k files (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 5, 2022
1 parent 6bede93 commit 2989a0f
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 130 deletions.
262 changes: 134 additions & 128 deletions polars/polars-lazy/polars-plan/src/logical_plan/optimizer/file_caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,155 +264,161 @@ impl FileCacher {
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
// if behind cache we should not add a projection
behind_cache: bool,
) {
let lp = lp_arena.take(root);
match lp {
#[cfg(feature = "parquet")]
ALogicalPlan::ParquetScan {
path,
schema,
output_schema,
predicate,
aggregate,
mut options,
} => {
let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena));
let finger_print = FileFingerPrint {
path,
predicate: predicate_expr,
slice: (0, options.n_rows),
};

let with_columns = self.extract_columns_and_count(&finger_print);
options.file_counter = with_columns.as_ref().map(|t| t.0).unwrap_or(0);
let with_columns = with_columns.and_then(|t| {
if t.1.len() != schema.len() {
Some(t.1)
} else {
None
}
});
scratch.clear();
let mut stack = Vec::with_capacity(lp_arena.len());
stack.push((root, false));

options.with_columns = with_columns;
let lp = ALogicalPlan::ParquetScan {
path: finger_print.path.clone(),
// if behind cache we should not add a projection
while let Some((root, behind_cache)) = stack.pop() {
let lp = lp_arena.take(root);
match lp {
#[cfg(feature = "parquet")]
ALogicalPlan::ParquetScan {
path,
schema,
output_schema,
predicate,
aggregate,
options: options.clone(),
};
let lp = self.finish_rewrite(
lp,
expr_arena,
lp_arena,
&finger_print,
options.with_columns,
behind_cache,
);
lp_arena.replace(root, lp);
}
#[cfg(feature = "csv-file")]
ALogicalPlan::CsvScan {
path,
schema,
output_schema,
predicate,
aggregate,
mut options,
} => {
let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena));
let finger_print = FileFingerPrint {
path,
predicate: predicate_expr,
slice: (options.skip_rows, options.n_rows),
};
mut options,
} => {
let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena));
let finger_print = FileFingerPrint {
path,
predicate: predicate_expr,
slice: (0, options.n_rows),
};

let with_columns = self.extract_columns_and_count(&finger_print);
options.file_counter = with_columns.as_ref().map(|t| t.0).unwrap_or(0);
let with_columns = with_columns.and_then(|t| {
if t.1.len() != schema.len() {
Some(t.1)
} else {
None
}
});
let with_columns = self.extract_columns_and_count(&finger_print);
options.file_counter = with_columns.as_ref().map(|t| t.0).unwrap_or(0);
let with_columns = with_columns.and_then(|t| {
if t.1.len() != schema.len() {
Some(t.1)
} else {
None
}
});

options.with_columns = with_columns;
let lp = ALogicalPlan::CsvScan {
path: finger_print.path.clone(),
options.with_columns = with_columns;
let lp = ALogicalPlan::ParquetScan {
path: finger_print.path.clone(),
schema,
output_schema,
predicate,
aggregate,
options: options.clone(),
};
let lp = self.finish_rewrite(
lp,
expr_arena,
lp_arena,
&finger_print,
options.with_columns,
behind_cache,
);
lp_arena.replace(root, lp);
}
#[cfg(feature = "csv-file")]
ALogicalPlan::CsvScan {
path,
schema,
output_schema,
predicate,
aggregate,
options: options.clone(),
};
let lp = self.finish_rewrite(
lp,
expr_arena,
lp_arena,
&finger_print,
options.with_columns,
behind_cache,
);
lp_arena.replace(root, lp);
}
#[cfg(feature = "ipc")]
ALogicalPlan::IpcScan {
path,
schema,
output_schema,
predicate,
aggregate,
mut options,
} => {
let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena));
let finger_print = FileFingerPrint {
path,
predicate: predicate_expr,
slice: (0, options.n_rows),
};
mut options,
} => {
let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena));
let finger_print = FileFingerPrint {
path,
predicate: predicate_expr,
slice: (options.skip_rows, options.n_rows),
};

let with_columns = self.extract_columns_and_count(&finger_print);
options.file_counter = with_columns.as_ref().map(|t| t.0).unwrap_or(0);
let with_columns = with_columns.and_then(|t| {
if t.1.len() != schema.len() {
Some(t.1)
} else {
None
}
});
let with_columns = self.extract_columns_and_count(&finger_print);
options.file_counter = with_columns.as_ref().map(|t| t.0).unwrap_or(0);
let with_columns = with_columns.and_then(|t| {
if t.1.len() != schema.len() {
Some(t.1)
} else {
None
}
});

options.with_columns = with_columns;
let lp = ALogicalPlan::IpcScan {
path: finger_print.path.clone(),
options.with_columns = with_columns;
let lp = ALogicalPlan::CsvScan {
path: finger_print.path.clone(),
schema,
output_schema,
predicate,
aggregate,
options: options.clone(),
};
let lp = self.finish_rewrite(
lp,
expr_arena,
lp_arena,
&finger_print,
options.with_columns,
behind_cache,
);
lp_arena.replace(root, lp);
}
#[cfg(feature = "ipc")]
ALogicalPlan::IpcScan {
path,
schema,
output_schema,
predicate,
aggregate,
options: options.clone(),
};
let lp = self.finish_rewrite(
lp,
expr_arena,
lp_arena,
&finger_print,
options.with_columns,
behind_cache,
);
lp_arena.replace(root, lp);
}
lp => {
let behind_cache = behind_cache || matches!(&lp, ALogicalPlan::Cache { .. });
mut options,
} => {
let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena));
let finger_print = FileFingerPrint {
path,
predicate: predicate_expr,
slice: (0, options.n_rows),
};

let with_columns = self.extract_columns_and_count(&finger_print);
options.file_counter = with_columns.as_ref().map(|t| t.0).unwrap_or(0);
let with_columns = with_columns.and_then(|t| {
if t.1.len() != schema.len() {
Some(t.1)
} else {
None
}
});

lp.copy_inputs(scratch);
while let Some(input) = scratch.pop() {
self.assign_unions(input, lp_arena, expr_arena, scratch, behind_cache)
options.with_columns = with_columns;
let lp = ALogicalPlan::IpcScan {
path: finger_print.path.clone(),
schema,
output_schema,
predicate,
aggregate,
options: options.clone(),
};
let lp = self.finish_rewrite(
lp,
expr_arena,
lp_arena,
&finger_print,
options.with_columns,
behind_cache,
);
lp_arena.replace(root, lp);
}
lp => {
let behind_cache = behind_cache || matches!(&lp, ALogicalPlan::Cache { .. });

lp.copy_inputs(scratch);
while let Some(input) = scratch.pop() {
stack.push((input, behind_cache))
}
lp_arena.replace(root, lp);
}
lp_arena.replace(root, lp);
}
}
scratch.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ pub fn optimize(
);

let mut file_cacher = FileCacher::new(file_predicate_to_columns_and_count);
file_cacher.assign_unions(lp_top, lp_arena, expr_arena, &mut scratch, false);
scratch.clear();
file_cacher.assign_unions(lp_top, lp_arena, expr_arena, &mut scratch);

#[cfg(feature = "cse")]
if cse_changed {
Expand Down

0 comments on commit 2989a0f

Please sign in to comment.