Skip to content

Commit

Permalink
refactor(rust): rename to IR (#15571)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 10, 2024
1 parent e72b868 commit 89468f7
Show file tree
Hide file tree
Showing 55 changed files with 431 additions and 471 deletions.
10 changes: 5 additions & 5 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,27 +552,27 @@ impl LazyFrame {

pub fn optimize(
self,
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Node> {
self.optimize_with_scratch(lp_arena, expr_arena, &mut vec![], false)
}

pub fn to_alp_optimized(self) -> PolarsResult<(Node, Arena<FullAccessIR>, Arena<AExpr>)> {
pub fn to_alp_optimized(self) -> PolarsResult<(Node, Arena<IR>, Arena<AExpr>)> {
let mut lp_arena = Arena::with_capacity(16);
let mut expr_arena = Arena::with_capacity(16);
let node =
self.optimize_with_scratch(&mut lp_arena, &mut expr_arena, &mut vec![], false)?;
Ok((node, lp_arena, expr_arena))
}

pub fn to_alp(self) -> PolarsResult<(Node, Arena<FullAccessIR>, Arena<AExpr>)> {
pub fn to_alp(self) -> PolarsResult<(Node, Arena<IR>, Arena<AExpr>)> {
self.logical_plan.to_alp()
}

pub(crate) fn optimize_with_scratch(
self,
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
_fmt: bool,
Expand Down Expand Up @@ -641,7 +641,7 @@ impl LazyFrame {

// sink should be replaced
let no_file_sink = if check_sink {
!matches!(lp_arena.get(lp_top), FullAccessIR::Sink { .. })
!matches!(lp_arena.get(lp_top), IR::Sink { .. })
} else {
true
};
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ fn partitionable_gb(

pub fn create_physical_plan(
root: Node,
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Box<dyn Executor>> {
use FullAccessIR::*;
use IR::*;

let logical_plan = lp_arena.take(root);
match logical_plan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ fn to_physical_piped_expr(

fn jit_insert_slice(
node: Node,
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
sink_nodes: &mut Vec<(usize, Node, Rc<RefCell<u32>>)>,
operator_offset: usize,
) {
// if the join/union has a slice, we add a new slice node
// note that we take the offset + 1, because we want to
// slice AFTER the join has happened and the join will be an
// operator
use FullAccessIR::*;
use IR::*;
let (offset, len) = match lp_arena.get(node) {
Join { options, .. } if options.args.slice.is_some() => {
let Some((offset, len)) = options.args.slice else {
Expand Down Expand Up @@ -101,11 +101,11 @@ fn jit_insert_slice(

pub(super) fn construct(
tree: Tree,
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
fmt: bool,
) -> PolarsResult<Option<Node>> {
use FullAccessIR::*;
use IR::*;

let mut pipelines = Vec::with_capacity(tree.len());
let mut callbacks = CallBacks::new();
Expand Down Expand Up @@ -146,7 +146,7 @@ pub(super) fn construct(
// The file sink is always to the top of the tree
// not every branch has a final sink. For instance rhs join branches
if let Some(node) = branch.get_final_sink() {
if matches!(lp_arena.get(node), FullAccessIR::Sink { .. }) {
if matches!(lp_arena.get(node), IR::Sink { .. }) {
final_sink = Some(node)
}
}
Expand Down Expand Up @@ -251,22 +251,22 @@ impl SExecutionContext for ExecutionState {
}

fn get_pipeline_node(
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
mut pipelines: Vec<PipeLine>,
schema: SchemaRef,
original_lp: Option<LogicalPlan>,
) -> FullAccessIR {
) -> IR {
// create a dummy input as the map function will call the input
// so we just create a scan that returns an empty df
let dummy = lp_arena.add(FullAccessIR::DataFrameScan {
let dummy = lp_arena.add(IR::DataFrameScan {
df: Arc::new(DataFrame::empty()),
schema: Arc::new(Schema::new()),
output_schema: None,
projection: None,
selection: None,
});

FullAccessIR::MapFunction {
IR::MapFunction {
function: FunctionNode::Pipeline {
function: Arc::new(move |_df: DataFrame| {
let mut state = ExecutionState::new();
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn process_non_streamable_node(
stack: &mut Vec<StackFrame>,
scratch: &mut Vec<Node>,
pipeline_trees: &mut Vec<Vec<Branch>>,
lp: &FullAccessIR,
lp: &IR,
) {
lp.copy_inputs(scratch);
while let Some(input) = scratch.pop() {
Expand All @@ -69,11 +69,11 @@ fn process_non_streamable_node(
state.streamable = false;
}

fn insert_file_sink(mut root: Node, lp_arena: &mut Arena<FullAccessIR>) -> Node {
fn insert_file_sink(mut root: Node, lp_arena: &mut Arena<IR>) -> Node {
// The pipelines need a final sink, we insert that here.
// this allows us to split at joins/unions and share a sink
if !matches!(lp_arena.get(root), FullAccessIR::Sink { .. }) {
root = lp_arena.add(FullAccessIR::Sink {
if !matches!(lp_arena.get(root), IR::Sink { .. }) {
root = lp_arena.add(IR::Sink {
input: root,
payload: SinkType::Memory,
})
Expand All @@ -85,10 +85,10 @@ fn insert_slice(
root: Node,
offset: i64,
len: IdxSize,
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
state: &mut Branch,
) {
let node = lp_arena.add(FullAccessIR::Slice {
let node = lp_arena.add(IR::Slice {
input: root,
offset,
len: len as IdxSize,
Expand All @@ -98,7 +98,7 @@ fn insert_slice(

pub(crate) fn insert_streaming_nodes(
root: Node,
lp_arena: &mut Arena<FullAccessIR>,
lp_arena: &mut Arena<IR>,
expr_arena: &mut Arena<AExpr>,
scratch: &mut Vec<Node>,
fmt: bool,
Expand Down Expand Up @@ -163,7 +163,7 @@ pub(crate) fn insert_streaming_nodes(
// keep the counter global so that the order will match traversal order
let mut execution_id = 0;

use FullAccessIR::*;
use IR::*;
while let Some(StackFrame {
node: mut root,
mut state,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/streaming/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub(super) fn is_valid_tree(tree: TreeRef) -> bool {

#[cfg(debug_assertions)]
#[allow(unused)]
pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<FullAccessIR>) {
pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<IR>) {
// streamable: bool,
// sources: Vec<Node>,
// // joins seen in whole branch (we count a union as joins with multiple counts)
Expand Down Expand Up @@ -158,7 +158,7 @@ pub(super) fn dbg_branch(b: &Branch, lp_arena: &Arena<FullAccessIR>) {

#[cfg(debug_assertions)]
#[allow(unused)]
pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<FullAccessIR>, expr_arena: &Arena<AExpr>) {
pub(super) fn dbg_tree(tree: Tree, lp_arena: &Arena<IR>, expr_arena: &Arena<AExpr>) {
if tree.is_empty() {
println!("EMPTY TREE");
return;
Expand Down
19 changes: 8 additions & 11 deletions crates/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ fn cached_before_root(q: LazyFrame) {
let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
for input in lp_arena.get(lp).get_inputs() {
assert!(matches!(lp_arena.get(input), FullAccessIR::Cache { .. }));
assert!(matches!(lp_arena.get(input), IR::Cache { .. }));
}
}

fn count_caches(q: LazyFrame) -> usize {
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
(&lp_arena)
.iter(node)
.filter(|(_node, lp)| matches!(lp, FullAccessIR::Cache { .. }))
.filter(|(_node, lp)| matches!(lp, IR::Cache { .. }))
.count()
}

Expand Down Expand Up @@ -55,7 +55,7 @@ fn test_cse_unions() -> PolarsResult<()> {
let lp = lf.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
let mut cache_count = 0;
assert!((&lp_arena).iter(lp).all(|(_, lp)| {
use FullAccessIR::*;
use IR::*;
match lp {
Cache { .. } => {
cache_count += 1;
Expand Down Expand Up @@ -98,7 +98,7 @@ fn test_cse_cache_union_projection_pd() -> PolarsResult<()> {
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
let mut cache_count = 0;
assert!((&lp_arena).iter(lp).all(|(_, lp)| {
use FullAccessIR::*;
use IR::*;
match lp {
Cache { .. } => {
cache_count += 1;
Expand Down Expand Up @@ -154,7 +154,7 @@ fn test_cse_union2_4925() -> PolarsResult<()> {
let cache_ids = (&lp_arena)
.iter(lp)
.flat_map(|(_, lp)| {
use FullAccessIR::*;
use IR::*;
match lp {
Cache { id, cache_hits, .. } => {
assert_eq!(*cache_hits, 1);
Expand Down Expand Up @@ -207,7 +207,7 @@ fn test_cse_joins_4954() -> PolarsResult<()> {
let cache_ids = (&lp_arena)
.iter(lp)
.flat_map(|(_, lp)| {
use FullAccessIR::*;
use IR::*;
match lp {
Cache {
id,
Expand All @@ -216,10 +216,7 @@ fn test_cse_joins_4954() -> PolarsResult<()> {
..
} => {
assert_eq!(*cache_hits, 1);
assert!(matches!(
lp_arena.get(*input),
FullAccessIR::DataFrameScan { .. }
));
assert!(matches!(lp_arena.get(*input), IR::DataFrameScan { .. }));

Some(*id)
},
Expand Down Expand Up @@ -279,7 +276,7 @@ fn test_cache_with_partial_projection() -> PolarsResult<()> {
let cache_ids = (&lp_arena)
.iter(lp)
.flat_map(|(_, lp)| {
use FullAccessIR::*;
use IR::*;
match lp {
Cache { id, .. } => Some(*id),
_ => None,
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,11 @@ fn test_scan_parquet_limit_9001() {
let q = LazyFrame::scan_parquet(path, args).unwrap().limit(3);
let (node, lp_arena, _) = q.to_alp_optimized().unwrap();
(&lp_arena).iter(node).all(|(_, lp)| match lp {
FullAccessIR::Union { options, .. } => {
IR::Union { options, .. } => {
let sliced = options.slice.unwrap();
sliced.1 == 3
},
FullAccessIR::Scan { file_options, .. } => file_options.n_rows == Some(3),
IR::Scan { file_options, .. } => file_options.n_rows == Some(3),
_ => true,
});
}
Expand Down Expand Up @@ -428,9 +428,9 @@ fn test_ipc_globbing() -> PolarsResult<()> {
Ok(())
}

fn slice_at_union(lp_arena: &Arena<FullAccessIR>, lp: Node) -> bool {
fn slice_at_union(lp_arena: &Arena<IR>, lp: Node) -> bool {
(&lp_arena).iter(lp).all(|(_, lp)| {
if let FullAccessIR::Union { options, .. } = lp {
if let IR::Union { options, .. } = lp {
options.slice.is_some()
} else {
true
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod streaming;
#[cfg(all(feature = "strings", feature = "cse"))]
mod tpch;

fn get_arenas() -> (Arena<AExpr>, Arena<FullAccessIR>) {
fn get_arenas() -> (Arena<AExpr>, Arena<IR>) {
let expr_arena = Arena::with_capacity(16);
let lp_arena = Arena::with_capacity(8);
(expr_arena, lp_arena)
Expand Down

0 comments on commit 89468f7

Please sign in to comment.