Skip to content

Commit

Permalink
Investigation
Browse files Browse the repository at this point in the history
Projections are preferred to be on top of
1) GlobalLimitExec
2) LocalLimitExec
3) CoalesceBatchesExec
Only slt tests are modified. Unit test results have not been updated.
  • Loading branch information
berkaysynnada committed Apr 29, 2024
1 parent 16d5e5e commit c87dbde
Show file tree
Hide file tree
Showing 35 changed files with 894 additions and 828 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4730,10 +4730,10 @@ fn rewrite_bounded_window_aggregate(
/// the order with projections without any further adaptation.
fn is_plan_schema_agnostic(plan: &Arc<dyn ExecutionPlan>) -> bool {
let plan_any = plan.as_any();
plan_any.downcast_ref::<GlobalLimitExec>().is_some()
|| plan_any.downcast_ref::<LocalLimitExec>().is_some()
|| plan_any.downcast_ref::<CoalesceBatchesExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
// plan_any.downcast_ref::<GlobalLimitExec>().is_some()
// || plan_any.downcast_ref::<LocalLimitExec>().is_some()
// || plan_any.downcast_ref::<CoalesceBatchesExec>().is_some()||
plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
}

/// Checks if the given expression is trivial.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,8 @@ physical_plan
04)--ProjectionExec: expr=[2 as val]
05)----CrossJoinExec
06)------CoalescePartitionsExec
07)--------CoalesceBatchesExec: target_batch_size=8182
08)----------ProjectionExec: expr=[]
07)--------ProjectionExec: expr=[]
08)----------CoalesceBatchesExec: target_batch_size=8182
09)------------FilterExec: val@0 < 2
10)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)----------------WorkTableExec: name=recursive_cte
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ logical_plan
02)--Filter: aggregate_test_100.c2 > Int8(10)
03)----TableScan: aggregate_test_100 projection=[c1, c2], partial_filters=[aggregate_test_100.c2 > Int8(10)]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--ProjectionExec: expr=[c1@0 as c1]
01)ProjectionExec: expr=[c1@0 as c1]
02)--CoalesceBatchesExec: target_batch_size=8192
03)----FilterExec: c2@1 > 10
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2], has_header=true
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/expr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1897,7 +1897,7 @@ SELECT substring('alphabet' for 1);
----
a

# The 'from' and 'for' parameters don't support string types, because they should be treated as
# The 'from' and 'for' parameters don't support string types, because they should be treated as
# regular expressions, which we have not implemented yet.
query error DataFusion error: Error during planning: No function matches the given name and argument types
SELECT substring('alphabet' FROM '3')
Expand Down
1 change: 0 additions & 1 deletion datafusion/sqllogictest/test_files/functions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1158,4 +1158,3 @@ drop table uuid_table

statement ok
drop table t

25 changes: 13 additions & 12 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2875,9 +2875,9 @@ physical_plan
01)SortExec: expr=[sn@2 ASC NULLS LAST]
02)--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate]
03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------ProjectionExec: expr=[zip_code@2 as zip_code, country@3 as country, sn@4 as sn, ts@5 as ts, currency@6 as currency, sn@0 as sn, amount@1 as amount]
06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1, projection=[sn@0, amount@3, zip_code@4, country@5, sn@6, ts@7, currency@8]
04)------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1
07)------------MemoryExec: partitions=1, partition_sizes=[1]
08)------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -3871,12 +3871,13 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as amount_usd]
02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted
03)----CoalesceBatchesExec: target_batch_size=2
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1, row_n@4]
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
06)--------ProjectionExec: expr=[a@0 as a, d@1 as d, ROW_NUMBER() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as row_n]
07)----------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
08)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
03)----ProjectionExec: expr=[a@0 as a, d@1 as d, row_n@4 as row_n]
04)------CoalesceBatchesExec: target_batch_size=2
05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
07)----------ProjectionExec: expr=[a@0 as a, d@1 as d, ROW_NUMBER() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as row_n]
08)------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
09)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true

# reset partition number to 8.
statement ok
Expand Down Expand Up @@ -4026,9 +4027,9 @@ logical_plan
09)--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
10)----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
physical_plan
01)CoalesceBatchesExec: target_batch_size=2
02)--ProjectionExec: expr=[c@0 as c, c@2 as c, sum1@1 as sum1, sum1@3 as sum1]
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, b@1)], projection=[c@0, sum1@2, c@3, sum1@5]
01)ProjectionExec: expr=[c@0 as c, c@3 as c, sum1@2 as sum1, sum1@5 as sum1]
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, b@1)]
04)------ProjectionExec: expr=[c@0 as c, b@1 as b, SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
05)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ logical_plan
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--SortPreservingMergeExec: [a@0 ASC NULLS LAST], fetch=5
03)----CoalesceBatchesExec: target_batch_size=8192
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)], projection=[a@1]
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true
06)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true
03)----ProjectionExec: expr=[a@1 as a]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], has_header=true
07)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true

# preserve_inner_join
query IIII nosort
Expand Down Expand Up @@ -99,9 +100,9 @@ logical_plan
physical_plan
01)GlobalLimitExec: skip=0, fetch=10
02)--SortPreservingMergeExec: [a2@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10
03)----CoalesceBatchesExec: target_batch_size=8192
04)------ProjectionExec: expr=[a@0 as a2, b@1 as b]
05)--------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1]
03)----ProjectionExec: expr=[a@0 as a2, b@1 as b]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)]
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], has_header=true
07)----------CoalesceBatchesExec: target_batch_size=8192
08)------------FilterExec: d@3 = 3
Expand Down
Loading

0 comments on commit c87dbde

Please sign in to comment.