Skip to content

Commit

Permalink
Ready for review
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Apr 29, 2024
1 parent 213119d commit 16d5e5e
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 7 deletions.
14 changes: 7 additions & 7 deletions datafusion/core/src/physical_optimizer/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3368,13 +3368,12 @@ impl PhysicalOptimizerRule for OptimizeProjections {
// optimized plan satisfies the initial schema order.
optimized = optimized
.map_data(|node| satisfy_initial_schema(node, initial_requirements))?;

let new_child = optimized.data.plan;

if is_plan_schema_determinant(&plan) {
Ok(new_child)
} else {
let x = update_children(plan, new_child)?;
Ok(x)
update_children(plan, new_child)
}
}

Expand All @@ -3387,6 +3386,8 @@ impl PhysicalOptimizerRule for OptimizeProjections {
}
}

/// If the schema of the plan can be different than its input schema,
/// then the function returns true; otherwise, false.
fn is_plan_schema_determinant(plan: &Arc<dyn ExecutionPlan>) -> bool {
let plan_any = plan.as_any();

Expand All @@ -3403,6 +3404,7 @@ fn is_plan_schema_determinant(plan: &Arc<dyn ExecutionPlan>) -> bool {
| plan_any.downcast_ref::<InterleaveExec>().is_some()
}

/// Given a plan, the function returns the closest node to the root which updates the schema.
fn find_final_schema_determinant(
plan: &Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Expand All @@ -3416,6 +3418,8 @@ fn find_final_schema_determinant(
}
}

/// Given a plan and a child plan, the function updates the child of the node
/// which is the closest node to the root and modifing the schema.
fn update_children(
plan: Arc<dyn ExecutionPlan>,
new_child: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -5266,7 +5270,6 @@ mod tests {
],
DataType::Int32,
None,
false,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -5335,7 +5338,6 @@ mod tests {
],
DataType::Int32,
None,
false,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
Expand Down Expand Up @@ -5407,7 +5409,6 @@ mod tests {
],
DataType::Int32,
None,
false,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -5476,7 +5477,6 @@ mod tests {
],
DataType::Int32,
None,
false,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
Expand Down
1 change: 1 addition & 0 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,7 @@ physical_plan
04)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true


statement ok
drop table annotated_data_finite2;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/tpch/q3.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ physical_plan
31)----------------------------FilterExec: l_shipdate@3 > 9204
32)------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_orderkey, l_extendedprice, l_discount, l_shipdate], has_header=false



query IRDI
select
l_orderkey,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/tpch/q8.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ physical_plan
71)------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
72)--------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/region.tbl]]}, projection=[r_regionkey, r_name], has_header=false



query RR
select
o_year,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/tpch/q9.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ physical_plan
50)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
51)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/nation.tbl]]}, projection=[n_nationkey, n_name], has_header=false



query TRR
select
nation,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,7 @@ physical_plan
05)--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@1 DESC]
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], has_header=true


query III
SELECT
c9,
Expand Down Expand Up @@ -1704,6 +1705,7 @@ physical_plan
09)----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c4, c9], has_header=true


query III
SELECT c3,
SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
Expand Down Expand Up @@ -1804,6 +1806,7 @@ physical_plan
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3, c9], has_header=true



query III
SELECT c3,
SUM(c9) OVER(ORDER BY c3 DESC, c9 DESC, c2 ASC) as sum1,
Expand Down Expand Up @@ -2090,6 +2093,7 @@ physical_plan
10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c8, c9], has_header=true



query IIIII
SELECT c9,
SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1,
Expand Down

0 comments on commit 16d5e5e

Please sign in to comment.