Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native seem to generate larger data size than Java #22184

Open
ZacBlanco opened this issue Mar 13, 2024 · 12 comments
Open

Native seem to generate larger data size than Java #22184

ZacBlanco opened this issue Mar 13, 2024 · 12 comments
Assignees
Labels
bug prestissimo Presto Native Execution

Comments

@ZacBlanco
Copy link
Contributor

ZacBlanco commented Mar 13, 2024

Environment

These tests were performed against presto native and java on 0.287 with hive and TPC-DS SF10k on a 16-node cluster

Issue

Originally, we found this issue using TPC-DS Q23. We noticed that the Presto native execution took significantly longer to complete compared to Java. Baseline here is native while Target is java. wall_ms here for the native query is over 5x longer.

image

This discrepancy was traced in the Q23 query plan to a join in native which has significantly more data input than the corresponding java execution. Notice that the data in the native execution generated by fragment 10 is small (519GB) but when read by the remote source as the input to the inner join the input is 5TB is size! Java in comparison has fragment 10 outputting 734GB in fragment 10 and calculating RemoteSource[10] as having 858GB input.

Java Execution


...
 - InnerJoin[PlanNodeId 6008][("ss_customer_sk_72" = "c_customer_sk")][$hashvalue_590, $hashvalue_592] => [ss_quantity_79:integer, ss_sales_price_82:decimal(7,2), c_customer_sk:bigint]
                                            Estimates: {source: CostBasedSourceInfo, rows: 27,651,160,233 (463.54GB), cpu: 3,949,818,895,340.49, memory: 1,170,000,000.00, network: 896,846,758,984.00}
                                            CPU: 1.59h (2.47%), Scheduled: 2.28h (1.50%), Output: 27,503,916,103 rows (601.96GB)
                                            Left (probe) Input avg.: 112,499,471.15 rows, Input std.dev.: 89.31%
                                            Right (build) Input avg.: 253,906.25 rows, Input std.dev.: 0.20%
                                            Distribution: PARTITIONED
                                        - RemoteSource[10] => [ss_customer_sk_72:bigint, ss_quantity_79:integer, ss_sales_price_82:decimal(7,2), $hashvalue_590:bigint]
                                                CPU: 10.91m (0.28%), Scheduled: 18.53m (0.20%), Output: 28,799,864,615 rows (858.30GB)
                                                Input avg.: 112,499,471.15 rows, Input std.dev.: 89.31%
                                        - LocalExchange[PlanNodeId 7205][HASH][$hashvalue_592] (c_customer_sk) => [c_customer_sk:bigint, $hashvalue_592:bigint]
                                                Estimates: {source: CostBasedSourceInfo, rows: 65,000,000 (1.09GB), cpu: 4,095,000,000.00, memory: 0.00, network: 1,170,000,000.00}
                                                CPU: 2.44s (0.00%), Scheduled: 2.57s (0.00%), Output: 65,000,000 rows (1.09GB)
                                                Input avg.: 253,906.25 rows, Input std.dev.: 153.41%
                                            - RemoteSource[11] => [c_customer_sk:bigint, $hashvalue_593:bigint]
                                                    CPU: 323.00ms (0.00%), Scheduled: 340.00ms (0.00%), Output: 65,000,000 rows (1.09GB)
                                                    Input avg.: 253,906.25 rows, Input std.dev.: 153.41%

Fragment 10 [SOURCE]
    CPU: 3.00h, Scheduled: 7.09h, Input: 28,799,864,615 rows (200.62GB); per task: avg.: 1,799,991,538.44 std.dev.: 1,339,212,806.25, Output: 28,799,864,615 rows (733.99GB), 16 tasks
    Output layout: [ss_customer_sk_72, ss_quantity_79, ss_sales_price_82, $hashvalue_591]
    Output partitioning: HASH [ss_customer_sk_72][$hashvalue_591]
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - ScanProject[PlanNodeId 27,7993][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf10000_parquet_varchar.store_sales{}]'}, grouped = false, projectLocality = LOCAL] => [ss_customer_sk_72:bigint, ss_quantity_79:integer, ss_sales_price_82:decimal(7,2), $hashvalue_591:bigint]
            Estimates: {source: CostBasedSourceInfo, rows: 28,799,864,615 (834.16GB), cpu: 636,477,977,449.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 28,799,864,615 (834.16GB), cpu: 1,532,154,736,433.00, memory: 0.00, network: 0.00}
            CPU: 3.00h (4.66%), Scheduled: 9.52h (6.25%), Output: 28,799,864,615 rows (734.00GB)
            Input avg.: 3,714,673.63 rows, Input std.dev.: 19.08%
            $hashvalue_591 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(ss_customer_sk_72), BIGINT'0')) (24:16)
            LAYOUT: tpcds_sf10000_parquet_varchar.store_sales{}
            ss_customer_sk_72 := ss_customer_sk:bigint:3:REGULAR (24:16)
            ss_sales_price_82 := ss_sales_price:decimal(7,2):13:REGULAR (24:16)
            ss_quantity_79 := ss_quantity:int:10:REGULAR (24:16)
            Input: 28,799,864,615 rows (200.58GB), Filtered: 0.00%

Native Execution

- InnerJoin[PlanNodeId 5900][("ss_customer_sk_72" = "c_customer_sk")] => [ss_quantity_79:integer, ss_sales_price_82:decimal(7,2), c_customer_sk:bigint]
                                        Estimates: {source: CostBasedSourceInfo, rows: 27,651,160,233 (231.77GB), cpu: 2,532,819,573,286.49, memory: 585,000,000.00, network: 637,062,977,449.00}
                                        CPU: 12.64m (0.71%), Scheduled: 12.71m (0.07%), Output: 27,503,916,103 rows (3.32TB)
                                        Distribution: PARTITIONED
                                    - RemoteSource[10] => [ss_customer_sk_72:bigint, ss_quantity_79:integer, ss_sales_price_82:decimal(7,2)]
                                            CPU: 8.64m (0.48%), Scheduled: 16.17m (0.09%), Output: 28,799,864,615 rows (5.02TB)
                                            Input avg.: 56,249,735.58 rows, Input std.dev.: 565.42%
                                    - LocalExchange[PlanNodeId 6953][HASH] (c_customer_sk) => [c_customer_sk:bigint]
                                            Estimates: {source: CostBasedSourceInfo, rows: 65,000,000 (557.90MB), cpu: 1,755,000,000.00, memory: 0.00, network: 585,000,000.00}
                                            CPU: 4.56s (0.00%), Scheduled: 6.17s (0.00%), Output: 65,000,000 rows (929.50MB)
                                            Input avg.: 126,953.13 rows, Input std.dev.: 556.78%
                                        - RemoteSource[11] => [c_customer_sk:bigint]
                                                CPU: 690.00ms (0.00%), Scheduled: 733.00ms (0.00%), Output: 65,000,000 rows (929.50MB)

Fragment 10 [SOURCE]
    CPU: 1.07h, Scheduled: 3.01h, Input: 28,799,864,615 rows (669.99GB); per task: avg.: 1,799,991,538.44 std.dev.: 1,332,162,619.44, Output: 28,799,864,615 rows (519.62GB), 16 tasks
    Output layout: [ss_customer_sk_72, ss_quantity_79, ss_sales_price_82]
    Output partitioning: HASH [ss_customer_sk_72]
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - TableScan[PlanNodeId 27][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf10000_parquet_varchar.store_sales{}]'}, grouped = false] => [ss_customer_sk_72:bigint, ss_quantity_79:integer, ss_sales_price_82:decimal(7,2)]
            Estimates: {source: CostBasedSourceInfo, rows: 28,799,864,615 (592.77GB), cpu: 636,477,977,449.00, memory: 0.00, network: 0.00}
            CPU: 1.07h (3.57%), Scheduled: 3.01h (1.02%), Output: 28,799,864,615 rows (519.62GB)
            Input avg.: 56,249,735.58 rows, Input std.dev.: 696.62%
            LAYOUT: tpcds_sf10000_parquet_varchar.store_sales{}
            ss_quantity_79 := ss_quantity:int:10:REGULAR (24:16)
            ss_customer_sk_72 := ss_customer_sk:bigint:3:REGULAR (24:16)
            ss_sales_price_82 := ss_sales_price:decimal(7,2):13:REGULAR (24:16)
            Input: 28,799,864,615 rows (669.99GB), Filtered: 0.00%

Our suspect is the severe increase in data is likely the cause of the query slowdown. I was able to extract the relevant query from the plan and confirm that this issue persists even in simpler queries.

SELECT CAST(ss_quantity AS decimal(10,0))) * (ss_sales_price),
        ss_customer_sk,
        ss_sales_price,
        ss_quantity
FROM store_sales, customer
WHERE ss_customer_sk = c_customer_sk;

I ran this query on SF1k with a 2-node cluster and found that there is still an issue with large data transfer size. You'll see here the Java execution has an input size of (86.03GB) for fragment 1 while native has input of (335.08GB)

Java EXPLAIN ANALYZE

 Fragment 1 [HASH]                                                                                                                                                                                                                                
     CPU: 19.63m, Scheduled: 35.02m, Input: 2,891,987,999 rows (86.03GB); per task: avg.: 1,445,993,999.50 std.dev.: 64,709,792.50, Output: 2,750,397,233 rows (97.34GB), 2 tasks                                                                 
     Output layout: [expr, ss_customer_sk, ss_sales_price, ss_quantity]                                                                                                                                                                           
     Output partitioning: SINGLE []                                                                                                                                                                                                               
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                
     - Project[PlanNodeId 4][projectLocality = LOCAL] => [expr:decimal(17,2), ss_customer_sk:bigint, ss_sales_price:decimal(7,2), ss_quantity:integer]                                                                                            
             Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (78.78GB), cpu: 479,689,129,163.18, memory: 216,000,000.00, network: 89,783,768,320.00}                                                                                 
             CPU: 9.34m (24.71%), Scheduled: 16.64m (12.53%), Output: 2,750,397,233 rows (97.34GB)                                                                                                                                                
             Input avg.: 85,949,913.53 rows, Input std.dev.: 58.48%                                                                                                                                                                               
             expr := (CAST(ss_quantity AS decimal(10,0))) * (ss_sales_price) (1:129)                                                                                                                                                              
         - InnerJoin[PlanNodeId 359][("ss_customer_sk" = "c_customer_sk")][$hashvalue, $hashvalue_19] => [ss_customer_sk:bigint, ss_quantity:integer, ss_sales_price:decimal(7,2)]                                                                
                 Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (80.59GB), cpu: 395,097,171,901.88, memory: 216,000,000.00, network: 89,783,768,320.00}                                                                             
                 CPU: 8.84m (23.36%), Scheduled: 15.79m (11.89%), Output: 2,750,397,233 rows (74.28GB)                                                                                                                                            
                 Left (probe) Input avg.: 89,999,624.97 rows, Input std.dev.: 57.99%                                                                                                                                                              
                 Right (build) Input avg.: 375,000.00 rows, Input std.dev.: 0.16%                                                                                                                                                                 
                         Collisions avg.: 235,810.06 (100.23% est.), Collisions std.dev.: 100.01%                                                                                                                                                 
                 Distribution: PARTITIONED                                                                                                                                                                                                        
             - RemoteSource[2] => [ss_customer_sk:bigint, ss_quantity:integer, ss_sales_price:decimal(7,2), $hashvalue:bigint]                                                                                                                    
                     CPU: 1.43m (3.78%), Scheduled: 2.59m (1.95%), Output: 2,879,987,999 rows (85.83GB)                                                                                                                                           
                     Input avg.: 89,999,624.97 rows, Input std.dev.: 57.99%                                                                                                                                                                       
             - LocalExchange[PlanNodeId 435][HASH][$hashvalue_19] (c_customer_sk) => [c_customer_sk:bigint, $hashvalue_19:bigint]                                                                                                                 
                     Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (366.21MB), cpu: 756,000,000.00, memory: 0.00, network: 216,000,000.00}                                                                                            
                     CPU: 558.00ms (0.02%), Scheduled: 709.00ms (0.01%), Output: 12,000,000 rows (206MB)                                                                                                                                          
                     Input avg.: 375,000.00 rows, Input std.dev.: 268.30%                                                                                                                                                                         
                 - RemoteSource[3] => [c_customer_sk:bigint, $hashvalue_20:bigint]                                                                                                                                                                
                         CPU: 59.00ms (0.00%), Scheduled: 83.00ms (0.00%), Output: 12,000,000 rows (206MB)                                                                                                                                        
                         Input avg.: 375,000.00 rows, Input std.dev.: 268.30%                                                                                                                                                                     
                                                                                                                                                                                                                                                  
 Fragment 2 [SOURCE]                                                                                                                                                                                                                              
     CPU: 18.15m, Scheduled: 1.04h, Input: 2,879,987,999 rows (20.00GB); per task: avg.: 1,439,993,999.50 std.dev.: 82,255,679.50, Output: 2,879,987,999 rows (73.54GB), 2 tasks                                                                  
     Output layout: [ss_customer_sk, ss_quantity, ss_sales_price, $hashvalue_18]                                                                                                                                                                  
     Output partitioning: HASH [ss_customer_sk][$hashvalue_18]                                                                                                                                                                                    
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                
     - ScanProject[PlanNodeId 0,477][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1
             Estimates: {source: CostBasedSourceInfo, rows: 2,879,987,999 (83.42GB), cpu: 63,647,876,329.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 2,879,987,999 (83.42GB), cpu: 153,215,644,649.00, memory: 0.00, netw
             CPU: 18.15m (47.98%), Scheduled: 1.63h (73.46%), Output: 2,879,987,999 rows (73.54GB)                                                                                                                                                
             Input avg.: 2,742,845.71 rows, Input std.dev.: 37.70%                                                                                                                                                                                
             $hashvalue_18 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(ss_customer_sk), BIGINT'0')) (1:128)                                                                                                                           
             LAYOUT: tpcds_sf1000_parquet_varchar.store_sales{}                                                                                                                                                                                   
             ss_sales_price := ss_sales_price:decimal(7,2):13:REGULAR (1:128)                                                                                                                                                                     
             ss_quantity := ss_quantity:int:10:REGULAR (1:128)                                                                                                                                                                                    
             ss_customer_sk := ss_customer_sk:bigint:3:REGULAR (1:128)                                                                                                                                                                            
             Input: 2,879,987,999 rows (20.00GB), Filtered: 0.00%                                                                                                                                                                                 
                                                                                                                                                                                                                                                  
 Fragment 3 [SOURCE]                                                                                                                                                                                                                              
     CPU: 3.28s, Scheduled: 9.10s, Input: 12,000,000 rows (91.70MB); per task: avg.: 6,000,000.00 std.dev.: 2,952,680.00, Output: 12,000,000 rows (183.11MB), 2 tasks                                                                             
     Output layout: [c_customer_sk, $hashvalue_21]                                                                                                                                                                                                
     Output partitioning: HASH [c_customer_sk][$hashvalue_21]                                                                                                                                                                                     
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                
     - ScanProject[PlanNodeId 1,478][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=customer, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000
             Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (205.99MB), cpu: 108,000,000.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 12,000,000 (205.99MB), cpu: 324,000,000.00, memory: 0.00, network: 0.00} 
             CPU: 3.28s (0.14%), Scheduled: 12.69s (0.16%), Output: 12,000,000 rows (183.11MB)                                                                                                                                                    
             Input avg.: 1,090,909.09 rows, Input std.dev.: 60.68%                                                                                                                                                                                
             $hashvalue_21 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(c_customer_sk), BIGINT'0')) (1:141)                                                                                                                            
             LAYOUT: tpcds_sf1000_parquet_varchar.customer{}                                                                                                                                                                                      
             c_customer_sk := c_customer_sk:bigint:0:REGULAR (1:141)                                                                                                                                                                              
             Input: 12,000,000 rows (91.70MB), Filtered: 0.00%                                                                                                                                                                                    

Native EXLPAIN ANALYZE

Fragment 1 [HASH]                                                                                                                                                                                                                                
     CPU: 56.06m, Scheduled: 1.47h, Input: 2,891,987,999 rows (335.08GB); per task: avg.: 1,445,993,999.50 std.dev.: 64,361,414.50, Output: 2,750,397,233 rows (71.26GB), 2 tasks                                                                 
     Output layout: [expr, ss_customer_sk, ss_sales_price, ss_quantity]                                                                                                                                                                           
     Output partitioning: SINGLE []                                                                                                                                                                                                               
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                
     - Project[PlanNodeId 4][projectLocality = LOCAL] => [expr:decimal(17,2), ss_customer_sk:bigint, ss_sales_price:decimal(7,2), ss_quantity:integer]                                                                                            
             Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (78.78GB), cpu: 337,741,576,861.18, memory: 108,000,000.00, network: 63,755,876,329.00}                                                                                 
             CPU: 53.25m (87.27%), Scheduled: 1.42h (78.92%), Output: 2,750,397,233 rows (71.26GB)                                                                                                                                                
             Input avg.: 343,799,654.13 rows, Input std.dev.: 173.21%                                                                                                                                                                             
             expr := (CAST(ss_quantity AS decimal(10,0))) * (ss_sales_price) (1:129)                                                                                                                                                              
         - InnerJoin[PlanNodeId 359][("ss_customer_sk" = "c_customer_sk")] => [ss_customer_sk:bigint, ss_quantity:integer, ss_sales_price:decimal(7,2)]                                                                                           
                 Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (80.59GB), cpu: 253,149,619,599.88, memory: 108,000,000.00, network: 63,755,876,329.00}                                                                             
                 CPU: 1.48m (2.42%), Scheduled: 1.59m (1.47%), Output: 2,750,397,233 rows (329.85GB)                                                                                                                                              
                 Distribution: PARTITIONED                                                                                                                                                                                                        
             - RemoteSource[2] => [ss_customer_sk:bigint, ss_quantity:integer, ss_sales_price:decimal(7,2)]                                                                                                                                       
                     CPU: 1.32m (2.17%), Scheduled: 1.51m (1.40%), Output: 2,879,987,999 rows (334.90GB)                                                                                                                                          
                     Input avg.: 359,998,499.88 rows, Input std.dev.: 173.44%                                                                                                                                                                     
             - LocalExchange[PlanNodeId 435][HASH] (c_customer_sk) => [c_customer_sk:bigint]                                                                                                                                                      
                     Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (366.21MB), cpu: 324,000,000.00, memory: 0.00, network: 108,000,000.00}                                                                                            
                     CPU: 295.00ms (0.01%), Scheduled: 953.00ms (0.01%), Output: 12,000,000 rows (181.48MB)                                                                                                                                       
                     Input avg.: 1,500,000.00 rows, Input std.dev.: 173.21%                                                                                                                                                                       
                 - RemoteSource[3] => [c_customer_sk:bigint]                                                                                                                                                                                      
                         CPU: 87.00ms (0.00%), Scheduled: 92.00ms (0.00%), Output: 12,000,000 rows (181.48MB)                                                                                                                                     
                         Input avg.: 1,500,000.00 rows, Input std.dev.: 173.21%                                                                                                                                                                   
                                                                                                                                                                                                                                                  
 Fragment 2 [SOURCE]                                                                                                                                                                                                                              
     CPU: 4.95m, Scheduled: 19.48m, Input: 2,879,987,999 rows (67.00GB); per task: avg.: 1,439,993,999.50 std.dev.: 145,805,473.50, Output: 2,879,987,999 rows (52.10GB), 2 tasks                                                                 
     Output layout: [ss_customer_sk, ss_quantity, ss_sales_price]                                                                                                                                                                                 
     Output partitioning: HASH [ss_customer_sk]                                                                                                                                                                                                   
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                
     - TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_va
             Estimates: {source: CostBasedSourceInfo, rows: 2,879,987,999 (59.28GB), cpu: 63,647,876,329.00, memory: 0.00, network: 0.00}                                                                                                         
             CPU: 4.95m (8.11%), Scheduled: 19.48m (18.03%), Output: 2,879,987,999 rows (52.10GB)                                                                                                                                                 
             Input avg.: 359,998,499.88 rows, Input std.dev.: 174.38%                                                                                                                                                                             
             LAYOUT: tpcds_sf1000_parquet_varchar.store_sales{}                                                                                                                                                                                   
             ss_customer_sk := ss_customer_sk:bigint:3:REGULAR (1:128)                                                                                                                                                                            
             ss_sales_price := ss_sales_price:decimal(7,2):13:REGULAR (1:128)                                                                                                                                                                     
             ss_quantity := ss_quantity:int:10:REGULAR (1:128)                                                                                                                                                                                    
             Input: 2,879,987,999 rows (67.00GB), Filtered: 0.00%                                                                                                                                                                                 
                                                                                                                                                                                                                                                  
 Fragment 3 [SOURCE]                                                                                                                                                                                                                              
     CPU: 838.71ms, Scheduled: 10.24s, Input: 12,000,000 rows (112.39MB); per task: avg.: 6,000,000.00 std.dev.: 2,952,680.00, Output: 12,000,000 rows (91.61MB), 2 tasks                                                                         
     Output layout: [c_customer_sk]                                                                                                                                                                                                               
     Output partitioning: HASH [c_customer_sk]                                                                                                                                                                                                    
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                
     - TableScan[PlanNodeId 1][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=customer, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_varch
             Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (103.00MB), cpu: 108,000,000.00, memory: 0.00, network: 0.00}                                                                                                              
             CPU: 838.00ms (0.02%), Scheduled: 10.24s (0.16%), Output: 12,000,000 rows (91.61MB)                                                                                                                                                  
             Input avg.: 1,500,000.00 rows, Input std.dev.: 199.22%                                                                                                                                                                               
             LAYOUT: tpcds_sf1000_parquet_varchar.customer{}                                                                                                                                                                                      
             c_customer_sk := c_customer_sk:bigint:0:REGULAR (1:141)                                                                                                                                                                              
             Input: 12,000,000 rows (112.39MB), Filtered: 0.00%                                                                                                                                                                                   

My first thought is that there's something wonky with the block encodings since the query results are correct, but haven't been able to confirm.

cc: @aditi-pandit @majetideepak @yingsu00

@ZacBlanco ZacBlanco added the bug label Mar 13, 2024
@ZacBlanco ZacBlanco changed the title Native seem to generate more network traffic Native seem to generate larger data size than Java Mar 13, 2024
@ZacBlanco ZacBlanco added the prestissimo Presto Native Execution label Mar 13, 2024
@aaneja
Copy link
Contributor

aaneja commented Mar 14, 2024

Experiment with and without CAST

I was looking at possible causes for the latency difference observed between Native & Java clusters for Q23 and observed the below, w.r.t performance of the CAST operator

On a Native cluster

Measure read speed of the integer column ss_quantity read as-is

presto:tpcds_sf10000_parquet_varchar> explain analyze SELECT ss_quantity FROM store_sales;                                   -> ;
                                                                                                                                                      Query Plan                                                                                   >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 1 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 16.68m, Scheduled: 1.19h, Input: 28,799,864,615 rows (135.44GB); per task: avg.: 1,799,991,538.44 std.dev.: 1,135,109,560.01, Output: 28,799,864,615 rows (105.94GB), 16 tasks                                                           >
     Output layout: [ss_quantity]                                                                                                                                                                                                                  >
     Output partitioning: SINGLE []                                                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf10000_parquet_v>
             CPU: 16.68m (100.00%), Scheduled: 1.19h (100.00%), Output: 28,799,864,615 rows (105.94GB)                                                                                                                                             >
             Input avg.: 56,249,735.58 rows, Input std.dev.: 661.25%                                                                                                                                                                               >
             LAYOUT: tpcds_sf10000_parquet_varchar.store_sales{}                                                                                                                                                                                   >
             ss_quantity := ss_quantity:int:10:REGULAR (1:41)                                                                                                                                                                                      >
             Input: 28,799,864,615 rows (135.44GB), Filtered: 0.00%                                                                                                                                                                                >
                                                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                   >
(1 row)

Query 20240314_043226_00018_kvcgs, FINISHED, 17 nodes
Splits: 7,771 total, 7,771 done (100.00%)
[Latency: client-side: 1:42, server-side: 1:42] [28.8B rows, 1.49GB] [283M rows/s, 14.9MB/s]

Compare this against the read speed when we are forced to CAST the column to a decimal

presto:tpcds_sf10000_parquet_varchar> explain analyze SELECT CAST(ss_quantity AS decimal(10,0)) FROM store_sales;
                                                                                                                                                                        Query Plan                                                                 >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 1 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 25.40m, Scheduled: 1.31h, Input: 28,799,864,615 rows (135.44GB); per task: avg.: 1,799,991,538.44 std.dev.: 1,500,572,958.27, Output: 28,799,864,615 rows (208.40GB), 16 tasks                                                           >
     Output layout: [expr]                                                                                                                                                                                                                         >
     Output partitioning: SINGLE []                                                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - ScanProject[PlanNodeId 0,1][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100>
             CPU: 15.87m (62.49%), Scheduled: 16.41m (20.92%), Output: 28,799,864,615 rows (208.40GB)                                                                                                                                              >
             Input avg.: 56,249,735.58 rows, Input std.dev.: 729.65%                                                                                                                                                                               >
             expr := CAST(ss_quantity AS decimal(10,0)) (1:65)                                                                                                                                                                                     >
             LAYOUT: tpcds_sf10000_parquet_varchar.store_sales{}                                                                                                                                                                                   >
             ss_quantity := ss_quantity:int:10:REGULAR (1:64)                                                                                                                                                                                      >
             Input: 28,799,864,615 rows (135.44GB), Filtered: 0.00%                                                                                                                                                                                >
                                                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                   >
(1 row)

Query 20240314_043432_00019_kvcgs, FINISHED, 17 nodes
Splits: 7,770 total, 7,770 done (100.00%)
[Latency: client-side: 3:24, server-side: 3:23] [28.8B rows, 1.51GB] [142M rows/s, 7.61MB/s]
  • Latency increased 2x
  • Read speed decreased 2x

On a Java cluster

Measure read speed of the integer column ss_quantity read as-is

presto> use hive.tpcds_sf10000_parquet_varchar;
USE
presto:tpcds_sf10000_parquet_varchar> explain analyze SELECT ss_quantity FROM store_sales;
                                                                                                                                                      Query Plan                                                                                   >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 1 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 32.10m, Scheduled: 3.06h, Input: 28,799,864,615 rows (29.70GB); per task: avg.: 1,799,991,538.44 std.dev.: 158,762,556.69, Output: 28,799,864,615 rows (134.11GB), 16 tasks                                                              >
     Output layout: [ss_quantity]                                                                                                                                                                                                                  >
     Output partitioning: SINGLE []                                                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf10000_parquet_v>
             CPU: 32.09m (100.00%), Scheduled: 4.51h (100.00%), Output: 28,799,864,615 rows (134.11GB)                                                                                                                                             >
             Input avg.: 3,714,194.56 rows, Input std.dev.: 19.14%                                                                                                                                                                                 >
             LAYOUT: tpcds_sf10000_parquet_varchar.store_sales{}                                                                                                                                                                                   >
             ss_quantity := ss_quantity:int:10:REGULAR (1:41)                                                                                                                                                                                      >
             Input: 28,799,864,615 rows (29.69GB), Filtered: 0.00%                                                                                                                                                                                 >
                                                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                   >
(1 row)

Query 20240314_045541_00011_epfyg, FINISHED, 17 nodes
Splits: 7,771 total, 7,771 done (100.00%)
[Latency: client-side: 3:41, server-side: 3:41] [28.8B rows, 29.7GB] [131M rows/s, 138MB/s]

Compare this against the read speed when we are forced to CAST the column to a decimal

presto:tpcds_sf10000_parquet_varchar> explain analyze SELECT CAST(ss_quantity AS decimal(10,0)) FROM store_sales;
                                                                                                                                                                        Query Plan                                                                 >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 1 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 33.14m, Scheduled: 1.91h, Input: 28,799,864,615 rows (29.71GB); per task: avg.: 1,799,991,538.44 std.dev.: 92,245,103.31, Output: 28,799,864,615 rows (241.40GB), 16 tasks                                                               >
     Output layout: [expr]                                                                                                                                                                                                                         >
     Output partitioning: SINGLE []                                                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - ScanProject[PlanNodeId 0,1][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf10000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100>
             CPU: 33.11m (100.00%), Scheduled: 3.13h (100.00%), Output: 28,799,864,615 rows (241.41GB)                                                                                                                                             >
             Input avg.: 3,714,673.63 rows, Input std.dev.: 18.99%                                                                                                                                                                                 >
             expr := CAST(ss_quantity AS decimal(10,0)) (1:65)                                                                                                                                                                                     >
             LAYOUT: tpcds_sf10000_parquet_varchar.store_sales{}                                                                                                                                                                                   >
             ss_quantity := ss_quantity:int:10:REGULAR (1:64)                                                                                                                                                                                      >
             Input: 28,799,864,615 rows (29.69GB), Filtered: 0.00%                                                                                                                                                                                 >
                                                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                   >
(1 row)

Query 20240314_045950_00012_epfyg, FINISHED, 17 nodes
Splits: 7,770 total, 7,770 done (100.00%)
[Latency: client-side: 3:29, server-side: 3:28] [28.8B rows, 29.7GB] [138M rows/s, 146MB/s]
  • No impact in latency or read speed.
  • For the query with CAST - the latency is similar to what we observe in Native cluster

Possible cause(s)

  • CAST is slower on Native clusters
  • The Native parquet reader (as compared against the Java reader) can read parquet into blocks with fewer overheads so is much faster for a straight read.
    However, since we are forced to CAST each value in the second query, we lose any benefits we gain from the reader's better performance

@yingsu00 @aditi-pandit @majetideepak

@aditi-pandit
Copy link
Contributor

@karteekmurthys : Lets write a micro-benchmark for this code. https://github.com/facebookincubator/velox/blob/main/velox/expression/CastExpr-inl.h#L424

@majetideepak
Copy link
Collaborator

@ZacBlanco the bug seems specific to decimal types. A related bug was fixed in Velox recently. facebookincubator/velox#8859
519GB vs. 5TB ~ 10X and this is what the issue reported as well.
Do you have this fix? Thanks!

@ZacBlanco
Copy link
Contributor Author

@karteekmurthys
Copy link
Contributor

karteekmurthys commented Mar 14, 2024

@ZacBlanco the bug seems specific to decimal types. A related bug was fixed in Velox recently. facebookincubator/velox#8859
519GB vs. 5TB ~ 10X and this is what the issue reported as well.
Do you have this fix? Thanks!

This fix is in aggregates @majetideepak. Zac is referring to the size of Data being read to be too big:
Prestissimo:

ss_quantity := ss_quantity:int:10:REGULAR (1:64)                                                                                                                                                                                      >
             Input: 28,799,864,615 rows (135.44GB), Filtered: 0.00%   

Presto:

ss_quantity := ss_quantity:int:10:REGULAR (1:64)                                                                                                                                                                                      >
             Input: 28,799,864,615 rows (29.69GB), Filtered: 0.00%

@majetideepak
Copy link
Collaborator

Zac is referring to this

RemoteSource[10] => [ss_customer_sk_72:bigint, ss_quantity_79:integer, ss_sales_price_82:decimal(7,2)]
                                             Output: 28,799,864,615 rows (5.02TB)

@kewang1024
Copy link
Collaborator

kewang1024 commented Mar 18, 2024

I found this similar to a query we saw where exchange received 15TB and turned that into 1.69PB.

Internally we have a task tracking this issue:
It appears to be a problem in Exchange reporting output bytes.
Exchange re-uses 'result_' vector. It may receive different number of rows each time. If it receives 100K rows, it produces large result_ vector. If it then receives 10K rows it produces the same vector and ends up inflating the output bytes by 10x.
if it then receives 1K rows, it produces same vector again inflating output bytes 100x now.
Hence, if we mostly produce short vectors, but sometimes produce much longer vectors we'll end up inflating output bytes by a lot.

exchange->getOutput() would return a reusable vector which has an "inaccurate" size. we should read and register 'allocated' instead of 'used' bytes from it.

cc: @mbasmanova

@yingsu00
Copy link
Contributor

@aaneja Are the CAST perf on native clusters stable? Can you please run it a few times and pick the stable numbers? I see the CPU time is about the same on decimal but elapsed time doubled. We need to rule out the perf invariance.

@aaneja
Copy link
Contributor

aaneja commented Mar 19, 2024

@yingsu00 Yes I could repro this consistently, see https://gist.github.com/aaneja/dc70f655695933b6ff11978120bbebab

@oerling
Copy link

oerling commented Mar 19, 2024

We will add encoding in shuffle around April 15. We have tried preserving constants, replacing flat values with constants when the values are all the same and making string dictionaries in the case of few distinct values. These together drop the data on the wire by 25% on a slice of our batch workload. Adding LZ4 to that drops another 25%, so we end up at half the network traffic.

A slightly different question is the reporting of operator output data size. Presto uses retainedSize(). Velox uses estimatedFlatSize. These mean different things. Neither is really representative. We could define a used retained size by taking estimated flat size but counting non-scalar data wrapped in dictionaries for distinct uses, not all uses. So, if there is a column where all rows refer to the same wide value via dictionary, the wide value should be counted once, not for every row. This is Ke’s example. The DS examples do not involve encoding opportunities, they are high cardinality fixed width data.

The combination of reencoding and more intuitive size accounting will resolve this. We’ll see if we get there in mid-April.

@aditi-pandit
Copy link
Contributor

@oerling : Do you have any open PRs for the improvements you are suggesting ? Would be great to follow them.

@aaneja
Copy link
Contributor

aaneja commented Mar 27, 2024

Created #22346 to decouple the CAST issue from this one cc: @karteekmurthys

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug prestissimo Presto Native Execution
Projects
Status: 🆕 Unprioritized
Status: Backlog
Development

No branches or pull requests

9 participants