Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new scheduler + exchange rules to support multiple table scans in a single stage #23476

Open
aaneja opened this issue Aug 20, 2024 · 0 comments

Comments

@aaneja
Copy link
Contributor

aaneja commented Aug 20, 2024

For use cases like a JOIN after a UNION ALL, it can be beneficial to have the UNION ALL run in a single stage.

Existing behavior

For example, consider this simplified fragment of the TPCDS Q5 query -

select 1
from (
              select cs_catalog_page_sk as page_sk,
                     cs_sold_date_sk as date_sk
                     from catalog_sales
              union all
              select cr_catalog_page_sk as page_sk,
                     cr_returned_date_sk as date_sk
                     from catalog_returns
       ) salesreturns,
       date_dim,
       catalog_page
where date_sk = d_date_sk
       and d_date between cast('2001-08-04' as date)
       and date_add('day', 14, cast('2001-08-04' as date))
       and page_sk = cp_catalog_page_sk

The distributed Presto plan for this is -

 Fragment 0 [SINGLE]                                                                                                                                                                                                                                                                    >
     Output layout: [expr_25]                                                                                                                                                                                                                                                           >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                      >
     - Output[PlanNodeId 18][_col0] => [expr_25:integer]                                                                                                                                                                                                                                >
             Estimates: {source: CostBasedSourceInfo, rows: 10,450,222 (49.83MB), cpu: 91,476,243,213.87, memory: 420,196.00, network: 29,768,258,357.28}                                                                                                                               >
             _col0 := expr_25 (1:35)                                                                                                                                                                                                                                                    >
         - Project[PlanNodeId 470][projectLocality = LOCAL] => [expr_25:integer]                                                                                                                                                                                                        >
                 Estimates: {source: CostBasedSourceInfo, rows: 10,450,222 (49.83MB), cpu: 91,476,243,213.87, memory: 420,196.00, network: 29,768,258,357.28}                                                                                                                           >
                 expr_25 := INTEGER'1'                                                                                                                                                                                                                                                  >
             - InnerJoin[PlanNodeId 768][("cs_catalog_page_sk_12" = "cp_catalog_page_sk")][$hashvalue_38, $hashvalue_39] => []                                                                                                                                                          >
                     Estimates: {source: CostBasedSourceInfo, rows: 10,450,222 (49.83MB), cpu: 91,423,992,105.86, memory: 420,196.00, network: 29,768,258,357.28}                                                                                                                       >
                     Distribution: REPLICATED                                                                                                                                                                                                                                           >
                 - Project[PlanNodeId 1033][projectLocality = LOCAL] => [cs_catalog_page_sk_12:integer, $hashvalue_38:bigint]                                                                                                                                                           >
                         Estimates: {source: CostBasedSourceInfo, rows: 10,425,141 (49.71MB), cpu: 91,276,480,130.88, memory: 196.00, network: 29,767,838,357.28}                                                                                                                       >
                         $hashvalue_38 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(cs_catalog_page_sk_12), BIGINT'0')) (1:52)                                                                                                                                               >
                     - InnerJoin[PlanNodeId 767][("cs_sold_date_sk_13" = "d_date_sk")][$hashvalue, $hashvalue_35] => [cs_catalog_page_sk_12:integer]                                                                                                                                    >
                             Estimates: {source: CostBasedSourceInfo, rows: 10,425,141 (49.71MB), cpu: 91,130,528,155.90, memory: 196.00, network: 29,767,838,357.28}                                                                                                                   >
                             Distribution: REPLICATED                                                                                                                                                                                                                                   >
                         - RemoteSource[1,2] => [cs_catalog_page_sk_12:integer, cs_sold_date_sk_13:integer, $hashvalue:bigint]                                                                                                                                                          >
                         - LocalExchange[PlanNodeId 961][HASH][$hashvalue_35] (d_date_sk) => [d_date_sk:integer, $hashvalue_35:bigint]                                                                                                                                                  >
                                 Estimates: {source: CostBasedSourceInfo, rows: 14 (70B), cpu: 1,461,372.01, memory: 0.00, network: 196.00}                                                                                                                                             >
                             - RemoteSource[3] => [d_date_sk:integer, $hashvalue_36:bigint]                                                                                                                                                                                             >
                 - LocalExchange[PlanNodeId 962][HASH][$hashvalue_39] (cp_catalog_page_sk) => [cp_catalog_page_sk:integer, $hashvalue_39:bigint]                                                                                                                                        >
                         Estimates: {source: CostBasedSourceInfo, rows: 30,000 (146.48kB), cpu: 1,140,000.00, memory: 0.00, network: 420,000.00}                                                                                                                                        >
                     - RemoteSource[4] => [cp_catalog_page_sk:integer, $hashvalue_40:bigint]                                                                                                                                                                                            >
                                                                                                                                                                                                                                                                                        >
 Fragment 1 [SOURCE]                                                                                                                                                                                                                                                                    >
     Output layout: [cs_catalog_page_sk, cs_sold_date_sk, $hashvalue_33]                                                                                                                                                                                                                >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                      >
     - ScanFilterProject[PlanNodeId 0,865,1031][table = TableHandle {connectorId='local_hms', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_v2, tableName=catalog_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_v2.catalog_sa>
             Estimates: {source: CostBasedSourceInfo, rows: 1,439,980,416 (25.43GB), cpu: 14,342,194,728.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 1,425,614,070 (25.23GB), cpu: 28,684,389,456.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo>
             $hashvalue_33 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(cs_sold_date_sk), BIGINT'0')) (1:82)                                                                                                                                                                 >
             LAYOUT: tpcds_sf1000_parquet_v2.catalog_sales{domains={cs_sold_date_sk=[ [(<min>, <max>)] ], cs_catalog_page_sk=[ [(<min>, <max>)] ]}}                                                                                                                                     >
             cs_catalog_page_sk := cs_catalog_page_sk:int:12:REGULAR (1:114)                                                                                                                                                                                                            >
             cs_sold_date_sk := cs_sold_date_sk:int:0:REGULAR (1:114)                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                                                        >
 Fragment 2 [SOURCE]                                                                                                                                                                                                                                                                    >
     Output layout: [cr_catalog_page_sk, cr_returned_date_sk, $hashvalue_34]                                                                                                                                                                                                            >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                      >
     - ScanFilterProject[PlanNodeId 3,866,1032][table = TableHandle {connectorId='local_hms', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_v2, tableName=catalog_returns, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_v2.catalog_>
             Estimates: {source: CostBasedSourceInfo, rows: 143,996,756 (2.54GB), cpu: 1,428,437,552.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 141,114,254 (2.50GB), cpu: 2,856,875,104.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: >
             $hashvalue_34 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(cr_returned_date_sk), BIGINT'0')) (1:176)                                                                                                                                                            >
             LAYOUT: tpcds_sf1000_parquet_v2.catalog_returns{domains={cr_catalog_page_sk=[ [(<min>, <max>)] ], cr_returned_date_sk=[ [(<min>, <max>)] ]}}                                                                                                                               >
             cr_catalog_page_sk := cr_catalog_page_sk:int:12:REGULAR (1:212)                                                                                                                                                                                                            >
             cr_returned_date_sk := cr_returned_date_sk:int:0:REGULAR (1:212)   
...

where we see that we have large data transfers from Fragment 1 & 2 to perform the UNION, only to have the very selective JOIN execute next and discard most rows

If instead, we could execute the table scans of catalog_sales and catalog_returns in a single stage, we would save the network cost of data transfer

Proposed behavior

Trino added a MultiSourcePartitionedScheduler in trinodb/trino#17265, that allows the scheduler to run multiple source partitioned table scans in a single stage. This along with changes to how exchanges are added, improved the performance of TPCDS Q02 and Q05 significantly

The Trino distributed plan for the above query is -

Trino version: 453                                                                                                                                                                            
 Fragment 0 [SOURCE]                                                                                                                                                                           
     Output layout: [expr]                                                                                                                                                                     
     Output partitioning: SINGLE []                                                                                                                                                            
     Output[columnNames = [_col0]]                                                                                                                                                             
     │   Layout: [expr:integer]                                                                                                                                                                
     │   Estimates: {rows: 10450287 (49.83MB), cpu: 0, memory: 0B, network: 0B}                                                                                                                
     │   _col0 := expr                                                                                                                                                                         
     └─ Project[]                                                                                                                                                                              
        │   Layout: [expr:integer]                                                                                                                                                             
        │   Estimates: {rows: 10450287 (49.83MB), cpu: 49.83M, memory: 0B, network: 0B}                                                                                                        
        │   expr := integer '1'                                                                                                                                                                
        └─ InnerJoin[criteria = (page_sk = cp_catalog_page_sk), distribution = REPLICATED]                                                                                                     
           │   Layout: []                                                                                                                                                                      
           │   Estimates: {rows: 10450287 (0B), cpu: 49.92M, memory: 146.48kB, network: 0B}                                                                                                    
           │   Distribution: REPLICATED                                                                                                                                                        
           │   dynamicFilterAssignments = {cp_catalog_page_sk -> #df_705}                                                                                                                      
           ├─ InnerJoin[criteria = (date_sk = d_date_sk), distribution = REPLICATED]                                                                                                           
           │  │   Layout: [page_sk:integer]                                                                                                                                                    
           │  │   Estimates: {rows: 10491985 (49.77MB), cpu: 14.74G, memory: 70B, network: 0B}                                                                                                 
           │  │   Distribution: REPLICATED                                                                                                                                                     
           │  │   dynamicFilterAssignments = {d_date_sk -> #df_706}                                                                                                                            
           │  ├─ LocalExchange[partitioning = ROUND_ROBIN]                                                                                                                                     
           │  │  │   Layout: [page_sk:integer, date_sk:integer]                                                                                                                                
           │  │  │   Estimates: {rows: 1583977172 (14.69GB), cpu: 14.69G, memory: 0B, network: 0B}                                                                                             
           │  │  ├─ ScanFilter[table = local_hms:tpcds_sf1000_parquet_v2:catalog_sales, dynamicFilters = {cs_catalog_page_sk = #df_705, cs_sold_date_sk = #df_706}]                            
           │  │  │      Layout: [cs_sold_date_sk:integer, cs_catalog_page_sk:integer]                                                                                                          
           │  │  │      Estimates: {rows: 1439980416 (13.36GB), cpu: 13.36G, memory: 0B, network: 0B}/{rows: 1439980416 (13.36GB), cpu: 13.36G, memory: 0B, network: 0B}                       
           │  │  │      cs_sold_date_sk := cs_sold_date_sk:int:REGULAR                                                                                                                         
           │  │  │      cs_catalog_page_sk := cs_catalog_page_sk:int:REGULAR                                                                                                                   
           │  │  └─ ScanFilter[table = local_hms:tpcds_sf1000_parquet_v2:catalog_returns, dynamicFilters = {cr_catalog_page_sk = #df_705, cr_returned_date_sk = #df_706}]                      
           │  │         Layout: [cr_returned_date_sk:integer, cr_catalog_page_sk:integer]                                                                                                      
           │  │         Estimates: {rows: 143996756 (1.33GB), cpu: 1.33G, memory: 0B, network: 0B}/{rows: 143996756 (1.33GB), cpu: 1.33G, memory: 0B, network: 0B}                             
           │  │         cr_catalog_page_sk := cr_catalog_page_sk:int:REGULAR                                                                                                                   
           │  │         cr_returned_date_sk := cr_returned_date_sk:int:REGULAR                                                                                                                 
           │  └─ LocalExchange[partitioning = SINGLE]                                                                                                                                          
           │     │   Layout: [d_date_sk:integer]                                                                                                                                               
           │     │   Estimates: {rows: 14 (70B), cpu: 0, memory: 0B, network: 0B}                                                                                                              
           │     └─ RemoteSource[sourceFragmentIds = [1]]                                                                                                                                      
           │            Layout: [d_date_sk:integer]                                                                                                                                            
           └─ LocalExchange[partitioning = SINGLE]                                                                                                                                             
              │   Layout: [cp_catalog_page_sk:integer]                                                                                                                                         
              │   Estimates: {rows: 30000 (146.48kB), cpu: 0, memory: 0B, network: 0B}                                                                                                         
              └─ RemoteSource[sourceFragmentIds = [2]]                                                                                                                                         
                     Layout: [cp_catalog_page_sk:integer] 
...

We can experiment with this to see if this is feasible to implement in Presto as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 🆕 Unprioritized
Development

No branches or pull requests

1 participant