Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve tpc-h q20 performance (single-topic) #14797

Open
Tracked by #15036
lmatz opened this issue Jan 25, 2024 · 18 comments
Open
Tracked by #15036

perf: improve tpc-h q20 performance (single-topic) #14797

lmatz opened this issue Jan 25, 2024 · 18 comments

Comments

@lmatz
Copy link
Contributor

lmatz commented Jan 25, 2024

See performance numbers at https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1

@github-actions github-actions bot added this to the release-1.7 milestone Jan 25, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Jan 25, 2024

Query:

    create sink tpch_q20 as
    select
    	s_name,
    	s_address
    from
    	supplier,
    	nation
    where
    	s_suppkey in (
    		select
    			ps_suppkey
    		from
    			partsupp
    		where
    			ps_partkey in (
    				select
    					p_partkey
    				from
    					part
    			)
    			and ps_availqty > (
    				select
    					0.005 * sum(l_quantity)
    				from
    					lineitem
    				where
    					l_partkey = ps_partkey
    					and l_suppkey = ps_suppkey
    			)
    	)
    	and s_nationkey = n_nationkey
    --order by
    --	s_name
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Plan:

 StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10490(hidden), $expr10487(hidden)] }
 └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
   ├─StreamExchange { dist: HashShard($expr1) }
   │ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
   │   ├─StreamExchange { dist: HashShard($expr4) }
   │   │ └─StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
   │   │   └─StreamFilter { predicate: (eventType = 'supplier':Varchar) }
   │   │     └─StreamShare { id: 4 }
   │   │       └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
   │   │         └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
   │   │           └─StreamRowIdGen { row_id_index: 10 }
   │   │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
   │   └─StreamExchange { dist: HashShard($expr5) }
   │     └─StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
   │       └─StreamFilter { predicate: (eventType = 'nation':Varchar) }
   │         └─StreamShare { id: 4 }
   │           └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
   │             └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
   │               └─StreamRowIdGen { row_id_index: 10 }
   │                 └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
   └─StreamExchange { dist: HashShard($expr7) }
     └─StreamProject { exprs: [$expr7, _row_id, $expr6, $expr6, $expr7] }
       └─StreamFilter { predicate: ($expr10 > $expr14) }
         └─StreamHashJoin { type: Inner, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
           ├─StreamExchange { dist: HashShard($expr6, $expr7) }
           │ └─StreamProject { exprs: [$expr6, $expr7, $expr8::Decimal as $expr10, _row_id] }
           │   └─StreamShare { id: 20 }
           │     └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
           │       ├─StreamExchange { dist: HashShard($expr6) }
           │       │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
           │       │   └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
           │       │     └─StreamShare { id: 4 }
           │       │       └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
           │       │         └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
           │       │           └─StreamRowIdGen { row_id_index: 10 }
           │       │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
           │       └─StreamExchange { dist: HashShard($expr9) }
           │         └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
           │           └─StreamFilter { predicate: (eventType = 'part':Varchar) }
           │             └─StreamShare { id: 4 }
           │               └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
           │                 └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
           │                   └─StreamRowIdGen { row_id_index: 10 }
           │                     └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
           └─StreamProject { exprs: [$expr6, $expr7, (0.005:Decimal * sum($expr13)) as $expr14] }
             └─StreamHashAgg { group_key: [$expr6, $expr7], aggs: [sum($expr13), count] }
               └─StreamHashJoin { type: LeftOuter, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
                 ├─StreamExchange { dist: HashShard($expr6, $expr7) }
                 │ └─StreamProject { exprs: [$expr6, $expr7] }
                 │   └─StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] }
                 │     └─StreamShare { id: 20 }
                 │       └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
                 │         ├─StreamExchange { dist: HashShard($expr6) }
                 │         │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
                 │         │   └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
                 │         │     └─StreamShare { id: 4 }
                 │         │       └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
                 │         │         └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
                 │         │           └─StreamRowIdGen { row_id_index: 10 }
                 │         │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
                 │         └─StreamExchange { dist: HashShard($expr9) }
                 │           └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
                 │             └─StreamFilter { predicate: (eventType = 'part':Varchar) }
                 │               └─StreamShare { id: 4 }
                 │                 └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
                 │                   └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
                 │                     └─StreamRowIdGen { row_id_index: 10 }
                 │                       └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
                 └─StreamProject { exprs: [$expr6, $expr7, Field(lineitem, 4:Int32) as $expr13, _row_id] }
                   └─StreamHashJoin { type: Inner, predicate: $expr6 = $expr11 AND $expr7 = $expr12 }
                     ├─StreamExchange { dist: HashShard($expr6, $expr7) }
                     │ └─StreamProject { exprs: [$expr6, $expr7] }
                     │   └─StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] }
                     │     └─StreamShare { id: 20 }
                     │       └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
                     │         ├─StreamExchange { dist: HashShard($expr6) }
                     │         │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
                     │         │   └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
                     │         │     └─StreamShare { id: 4 }
                     │         │       └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
                     │         │         └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
                     │         │           └─StreamRowIdGen { row_id_index: 10 }
                     │         │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
                     │         └─StreamExchange { dist: HashShard($expr9) }
                     │           └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
                     │             └─StreamFilter { predicate: (eventType = 'part':Varchar) }
                     │               └─StreamShare { id: 4 }
                     │                 └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
                     │                   └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
                     │                     └─StreamRowIdGen { row_id_index: 10 }
                     │                       └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
                     └─StreamExchange { dist: HashShard($expr11, $expr12) }
                       └─StreamProject { exprs: [lineitem, Field(lineitem, 1:Int32) as $expr11, Field(lineitem, 2:Int32) as $expr12, _row_id] }
                         └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
                           └─StreamShare { id: 4 }
                             └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
                               └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
                                 └─StreamRowIdGen { row_id_index: 10 }
                                   └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(99 rows)

Dist Plan:

 Fragment 0
 StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10490(hidden), $expr10487(hidden)] }
 ├── tables: [ Sink: 0 ]
 └── StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
     ├── tables: [ HashJoinLeft: 1, HashJoinDegreeLeft: 2, HashJoinRight: 3, HashJoinDegreeRight: 4 ]
     ├── StreamExchange Hash([0]) from 1
     └── StreamExchange Hash([0]) from 5
 
 Fragment 1
 StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
 ├── tables: [ HashJoinLeft: 5, HashJoinDegreeLeft: 6, HashJoinRight: 7, HashJoinDegreeRight: 8 ]
 ├── StreamExchange Hash([3]) from 2
 └── StreamExchange Hash([0]) from 4
 
 Fragment 2
 StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
 └── StreamFilter { predicate: (eventType = 'supplier':Varchar) }
     └── StreamExchange NoShuffle from 3
 
 Fragment 3
 StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
 └── StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
     └── StreamRowIdGen { row_id_index: 10 }
         └── StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 9 ] }
 
 Fragment 4
 StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
 └── StreamFilter { predicate: (eventType = 'nation':Varchar) }
     └── StreamExchange NoShuffle from 3
 
 Fragment 5
 StreamProject { exprs: [$expr7, _row_id, $expr6, $expr6, $expr7] }
 └── StreamFilter { predicate: ($expr10 > $expr14) }
     └── StreamHashJoin { type: Inner, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
         ├── tables: [ HashJoinLeft: 10, HashJoinDegreeLeft: 11, HashJoinRight: 12, HashJoinDegreeRight: 13 ]
         ├── StreamExchange Hash([0, 1]) from 6
         └── StreamProject { exprs: [$expr6, $expr7, (0.005:Decimal * sum($expr13)) as $expr14] }
             └── StreamHashAgg { group_key: [$expr6, $expr7], aggs: [sum($expr13), count] } { tables: [ HashAggState: 18 ] }
                 └── StreamHashJoin { type: LeftOuter, predicate: $expr6 IS NOT DISTINCT FROM $expr6 AND $expr7 IS NOT DISTINCT FROM $expr7 }
                     ├── tables: [ HashJoinLeft: 19, HashJoinDegreeLeft: 20, HashJoinRight: 21, HashJoinDegreeRight: 22 ]
                     ├── StreamExchange Hash([0, 1]) from 10
                     └── StreamProject { exprs: [$expr6, $expr7, Field(lineitem, 4:Int32) as $expr13, _row_id] }
                         └── StreamHashJoin { type: Inner, predicate: $expr6 = $expr11 AND $expr7 = $expr12 } { tables: [ HashJoinLeft: 24, HashJoinDegreeLeft: 25, HashJoinRight: 26, HashJoinDegreeRight: 27 ] }
                             ├── StreamExchange Hash([0, 1]) from 11
                             └── StreamExchange Hash([1, 2]) from 12
 
 Fragment 6
 StreamProject { exprs: [$expr6, $expr7, $expr8::Decimal as $expr10, _row_id] }
 └── StreamExchange NoShuffle from 7
 
 Fragment 7
 StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 } { tables: [ HashJoinLeft: 14, HashJoinDegreeLeft: 15, HashJoinRight: 16, HashJoinDegreeRight: 17 ] }
 ├── StreamExchange Hash([0]) from 8
 └── StreamExchange Hash([0]) from 9
 
 Fragment 8
 StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
 └── StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
     └── StreamExchange NoShuffle from 3
 
 Fragment 9
 StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
 └── StreamFilter { predicate: (eventType = 'part':Varchar) }
     └── StreamExchange NoShuffle from 3
 
 Fragment 10
 StreamProject { exprs: [$expr6, $expr7] }
 └── StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] } { tables: [ HashAggState: 23 ] }
     └── StreamExchange NoShuffle from 7
 
 Fragment 11
 StreamProject { exprs: [$expr6, $expr7] }
 └── StreamHashAgg { group_key: [$expr6, $expr7], aggs: [count] } { tables: [ HashAggState: 28 ] }
     └── StreamExchange NoShuffle from 7
 
 Fragment 12
 StreamProject { exprs: [lineitem, Field(lineitem, 1:Int32) as $expr11, Field(lineitem, 2:Int32) as $expr12, _row_id] }
 └── StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
     └── StreamExchange NoShuffle from 3
 
 Table 0
 ├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_row_op, s_name, s_address, _row_id, _row_id#1, $expr10490, $expr10487 ]
 ├── primary key: [ $0 ASC, $1 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8 ]
 ├── distribution key: [ 8 ]
 └── read pk prefix len hint: 2
 
 Table 1
 ├── columns: [ $expr1, $expr2, $expr3, _row_id, $expr4, _row_id_0 ]
 ├── primary key: [ $0 ASC, $3 ASC, $5 ASC, $4 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5 ]
 ├── distribution key: [ 0 ]
 └── read pk prefix len hint: 1
 
 Table 2 { columns: [ $expr1, _row_id, _row_id_0, $expr4, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 3 { columns: [ $expr7, _row_id, $expr6, $expr6_0, $expr7_0 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 4 { columns: [ $expr7, _row_id, $expr6, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 5 { columns: [ $expr1, $expr2, $expr3, $expr4, _row_id ], primary key: [ $3 ASC, $4 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [ 3 ], read pk prefix len hint: 1 }
 
 Table 6 { columns: [ $expr4, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 7 { columns: [ $expr5, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 8 { columns: [ $expr5, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 9 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
 Table 10 { columns: [ $expr6, $expr7, $expr10, _row_id ], primary key: [ $0 ASC, $1 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 11 { columns: [ $expr6, $expr7, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 12 { columns: [ $expr6, $expr7, $expr14 ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 13 { columns: [ $expr6, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 14 { columns: [ $expr6, $expr7, $expr8, _row_id ], primary key: [ $0 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 15 { columns: [ $expr6, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 16 { columns: [ $expr9, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 17 { columns: [ $expr9, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 18 { columns: [ $expr6, $expr7, sum($expr13), count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 19 { columns: [ $expr6, $expr7 ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 20 { columns: [ $expr6, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 21 { columns: [ $expr6, $expr7, $expr13, _row_id ], primary key: [ $0 ASC, $1 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 22 { columns: [ $expr6, $expr7, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 23 { columns: [ $expr6, $expr7, count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 2 }
 
 Table 24 { columns: [ $expr6, $expr7 ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 25 { columns: [ $expr6, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 26 { columns: [ lineitem, $expr11, $expr12, _row_id ], primary key: [ $1 ASC, $2 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 1, 2 ], read pk prefix len hint: 2 }
 
 Table 27 { columns: [ $expr11, $expr12, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 28 { columns: [ $expr6, $expr7, count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 2 }
 
(148 rows)

@lmatz
Copy link
Contributor Author

lmatz commented Jan 25, 2024

Flink:

INSERT INTO tpch_q20
    select
      s_name,
      s_address
    from
      supplier,
      nation
    where
      s_suppkey in (
        select
          ps_suppkey
        from
          partsupp
        where
          ps_partkey in (
            select
              p_partkey
            from
              part
          )
          and ps_availqty > (
            select
              0.005 * sum(l_quantity)
            from
              lineitem
            where
              l_partkey = ps_partkey
              and l_suppkey = ps_suppkey
          )
      )
      and s_nationkey = n_nationkey;

Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q20], fields=[s_name, s_address])
+- Calc(select=[s_name, s_address])
   +- Join(joinType=[InnerJoin], where=[=(s_nationkey, n_nationkey)], select=[s_name, s_address, s_nationkey, n_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :- Exchange(distribution=[hash[s_nationkey]])
      :  +- Calc(select=[s_name, s_address, s_nationkey])
      :     +- Join(joinType=[LeftSemiJoin], where=[=(s_suppkey, ps_suppkey)], select=[s_suppkey, s_name, s_address, s_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :        :- Exchange(distribution=[hash[s_suppkey]])
      :        :  +- Calc(select=[supplier.s_suppkey AS s_suppkey, supplier.s_name AS s_name, supplier.s_address AS s_address, supplier.s_nationkey AS s_nationkey], where=[=(eventType, _UTF-16LE'supplier':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
      :        :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
      :        +- Exchange(distribution=[hash[ps_suppkey]])
      :           +- Calc(select=[ps_suppkey])
      :              +- Join(joinType=[InnerJoin], where=[AND(=(ps_partkey, l_partkey), =(ps_suppkey, l_suppkey), >(ps_availqty, *(0.005:DECIMAL(4, 3), $f2)))], select=[ps_partkey, ps_suppkey, ps_availqty, l_partkey, l_suppkey, $f2], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :                 :- Exchange(distribution=[hash[ps_partkey, ps_suppkey]])
      :                 :  +- Join(joinType=[LeftSemiJoin], where=[=(ps_partkey, p_partkey)], select=[ps_partkey, ps_suppkey, ps_availqty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :                 :     :- Exchange(distribution=[hash[ps_partkey]])
      :                 :     :  +- Calc(select=[partsupp.ps_partkey AS ps_partkey, partsupp.ps_suppkey AS ps_suppkey, partsupp.ps_availqty AS ps_availqty], where=[=(eventType, _UTF-16LE'partsupp':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
      :                 :     :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
      :                 :     +- Exchange(distribution=[hash[p_partkey]])
      :                 :        +- Calc(select=[part.p_partkey AS p_partkey], where=[=(eventType, _UTF-16LE'part':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
      :                 :           +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
      :                 +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
      :                    +- GroupAggregate(groupBy=[l_partkey, l_suppkey], select=[l_partkey, l_suppkey, SUM(l_quantity) AS $f2])
      :                       +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
      :                          +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_suppkey AS l_suppkey, lineitem.l_quantity AS l_quantity], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NOT NULL(lineitem.l_partkey), IS NOT NULL(lineitem.l_suppkey))])
      :                             +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
      +- Exchange(distribution=[hash[n_nationkey]])
         +- Calc(select=[nation.n_nationkey AS n_nationkey], where=[=(eventType, _UTF-16LE'nation':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q20], fields=[s_name, s_address])
+- Calc(select=[s_name, s_address])
   +- Join(joinType=[InnerJoin], where=[(s_nationkey = n_nationkey)], select=[s_name, s_address, s_nationkey, n_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :- Exchange(distribution=[hash[s_nationkey]])
      :  +- Calc(select=[s_name, s_address, s_nationkey])
      :     +- Join(joinType=[LeftSemiJoin], where=[(s_suppkey = ps_suppkey)], select=[s_suppkey, s_name, s_address, s_nationkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :        :- Exchange(distribution=[hash[s_suppkey]])
      :        :  +- Calc(select=[supplier.s_suppkey AS s_suppkey, supplier.s_name AS s_name, supplier.s_address AS s_address, supplier.s_nationkey AS s_nationkey], where=[(eventType = 'supplier')])
      :        :     +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])(reuse_id=[1])
      :        +- Exchange(distribution=[hash[ps_suppkey]])
      :           +- Calc(select=[ps_suppkey])
      :              +- Join(joinType=[InnerJoin], where=[((ps_partkey = l_partkey) AND (ps_suppkey = l_suppkey) AND (ps_availqty > (0.005 * $f2)))], select=[ps_partkey, ps_suppkey, ps_availqty, l_partkey, l_suppkey, $f2], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :                 :- Exchange(distribution=[hash[ps_partkey, ps_suppkey]])
      :                 :  +- Join(joinType=[LeftSemiJoin], where=[(ps_partkey = p_partkey)], select=[ps_partkey, ps_suppkey, ps_availqty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :                 :     :- Exchange(distribution=[hash[ps_partkey]])
      :                 :     :  +- Calc(select=[partsupp.ps_partkey AS ps_partkey, partsupp.ps_suppkey AS ps_suppkey, partsupp.ps_availqty AS ps_availqty], where=[(eventType = 'partsupp')])
      :                 :     :     +- Reused(reference_id=[1])
      :                 :     +- Exchange(distribution=[hash[p_partkey]])
      :                 :        +- Calc(select=[part.p_partkey AS p_partkey], where=[(eventType = 'part')])
      :                 :           +- Reused(reference_id=[1])
      :                 +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
      :                    +- GroupAggregate(groupBy=[l_partkey, l_suppkey], select=[l_partkey, l_suppkey, SUM(l_quantity) AS $f2])
      :                       +- Exchange(distribution=[hash[l_partkey, l_suppkey]])
      :                          +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_suppkey AS l_suppkey, lineitem.l_quantity AS l_quantity], where=[((eventType = 'lineitem') AND lineitem.l_partkey IS NOT NULL AND lineitem.l_suppkey IS NOT NULL)])
      :                             +- Reused(reference_id=[1])
      +- Exchange(distribution=[hash[n_nationkey]])
         +- Calc(select=[nation.n_nationkey AS n_nationkey], where=[(eventType = 'nation')])
            +- Reused(reference_id=[1])

@lmatz
Copy link
Contributor Author

lmatz commented Jan 25, 2024

We notice that
there are 8 StreamHashJoin in Risingwave's query plan
even after we discount the 3 StreamHashJoin that is shared by StreamShare { id: 20 },
we still got 8 - (3-1) = 6 StreamHashJoin:
3 InnerJoin
2 LeftSemi
1 LeftOuter

while there are only 4 Join in Flink's query plan:
2 InnerJoin
2 LeftSemiJoin.

@chenzl25
Copy link
Contributor

@lmatz
Copy link
Contributor Author

lmatz commented Jan 29, 2024

@chenzl25
Copy link
Contributor

There are 2 issues here:

  1. Our subquery TranslateApplyRule has some special optimization on scan instead of source, so in the unified case, it would generate more join than the table scan test.
  2. Even if we use a table instead of source, we still have 5 Join instead of 4 because, for the general case, our subquery unnesting rewrite could transform a correlated apply to 2 join.

@lmatz lmatz changed the title perf: improve tpc-h q20 performance perf: improve tpc-h q20 performance (single-topic) Feb 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 21, 2024

now the new RW plan is:

 StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10423(hidden), $expr10420(hidden)] }
 └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
   ├─StreamExchange { dist: HashShard($expr1) }
   │ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
   │   ├─StreamExchange { dist: HashShard($expr4) }
   │   │ └─StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
   │   │   └─StreamFilter { predicate: (eventType = 'supplier':Varchar) }
   │   │     └─StreamShare { id: 4 }
   │   │       └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
   │   │         └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
   │   │           └─StreamRowIdGen { row_id_index: 10 }
   │   │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
   │   └─StreamExchange { dist: HashShard($expr5) }
   │     └─StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
   │       └─StreamFilter { predicate: (eventType = 'nation':Varchar) }
   │         └─StreamShare { id: 4 }
   │           └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
   │             └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
   │               └─StreamRowIdGen { row_id_index: 10 }
   │                 └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
   └─StreamExchange { dist: HashShard($expr7) }
     └─StreamProject { exprs: [$expr7, _row_id, $expr6, $expr12, $expr13] }
       └─StreamFilter { predicate: ($expr10 > $expr14) }
         └─StreamHashJoin { type: Inner, predicate: $expr6 = $expr12 AND $expr7 = $expr13 }
           ├─StreamExchange { dist: HashShard($expr6, $expr7) }
           │ └─StreamProject { exprs: [$expr6, $expr7, $expr8::Decimal as $expr10, _row_id] }
           │   └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr9 }
           │     ├─StreamExchange { dist: HashShard($expr6) }
           │     │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, Field(partsupp, 2:Int32) as $expr8, _row_id] }
           │     │   └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
           │     │     └─StreamShare { id: 4 }
           │     │       └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
           │     │         └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
           │     │           └─StreamRowIdGen { row_id_index: 10 }
           │     │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
           │     └─StreamExchange { dist: HashShard($expr9) }
           │       └─StreamProject { exprs: [Field(part, 0:Int32) as $expr9, _row_id] }
           │         └─StreamFilter { predicate: (eventType = 'part':Varchar) }
           │           └─StreamShare { id: 4 }
           │             └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
           │               └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
           │                 └─StreamRowIdGen { row_id_index: 10 }
           │                   └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
           └─StreamProject { exprs: [(0.005:Decimal * sum($expr11)) as $expr14, $expr12, $expr13] }
             └─StreamHashAgg [append_only] { group_key: [$expr12, $expr13], aggs: [sum($expr11), count] }
               └─StreamExchange { dist: HashShard($expr12, $expr13) }
                 └─StreamProject { exprs: [Field(lineitem, 4:Int32) as $expr11, Field(lineitem, 1:Int32) as $expr12, Field(lineitem, 2:Int32) as $expr13, _row_id] }
                   └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
                     └─StreamShare { id: 4 }
                       └─StreamProject { exprs: [eventType, lineitem, supplier, part, partsupp, nation, _row_id] }
                         └─StreamFilter { predicate: (((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) OR (eventType = 'lineitem':Varchar)) }
                           └─StreamRowIdGen { row_id_index: 10 }
                             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(53 rows)

The numbers of join in RW and Flink's query plan are the same now.

The one remaining difference, not sure if better or worse, is that the RW's plan is bushy while Flink's plan is one side deep.

RW's performance is better than before, but still has BIG room to improve as we can see from:

  1. https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1
  2. http://metabase.risingwave-cloud.xyz/question/5354-tpch-q20-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-383?start_date=2023-11-25

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1707614378000&to=1707616182000&var-namespace=tpch-1cn-affinity-weekly-20240210:

one data block miss ops seems very high, ~140ops/s
SCR-20240221-fed

join executor barrier align and join actor input blocking time ratio seem bad
SCR-20240221-fep

@xxchan could you help take a look?

@lmatz
Copy link
Contributor Author

lmatz commented Feb 21, 2024

link #14811 as both q20 and q4 have LeftSemiJoin in it, q20 have two
and LeftSemiJoin is shown to be the bottleneck in q4.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 22, 2024

@Li0k
Copy link
Contributor

Li0k commented Feb 22, 2024

https://buildkite.com/risingwave-test/tpch-benchmark/builds/991

using nightly-20240217 instead of nightly-20240127 because we need the new query plan of q20

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708218697000&to=1708220501000&var-namespace=tpch-1cn-affinity-weekly-20240217

it seems that L0 looks a lot like tpch q4: #14811 (comment)

SCR-20240222-kq1

Analyzed the information given by Grafana

  1. task fail
image
  1. object store timeout
image

Simple conclusion: task pending due to io timeout. Our default 8-minute timeout had a big impact on this short test.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 22, 2024

@lmatz
Copy link
Contributor Author

lmatz commented Feb 26, 2024

Since Q20 is a pretty complex query, we try to remove some parts of the query to reveal the true bottleneck.

Therefore, we introduced three variants of Q20, please check https://github.com/risingwavelabs/kube-bench/blob/main/manifests/tpch/tpch-modified-sinks.template.yaml#L830-L905
q20-no-greater
q20-no-greater-inner-in
q20-only-greater

Q20-NO-GREATER

Query:

create sink tpch_q20_no_greater as
    select
    	s_name,
    	s_address
    from
    	supplier,
    	nation
    where
    	s_suppkey in (
    		select
    			ps_suppkey
    		from
    			partsupp
    		where
    			ps_partkey in (
    				select
    					p_partkey
    				from
    					part
    			)
    	)
    	and s_nationkey = n_nationkey
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Plan:

 StreamSink { type: append-only, columns: [s_name, s_address, _row_id(hidden), _row_id#1(hidden), $expr10248(hidden), $expr10245(hidden)] }
 └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr7 }
   ├─StreamExchange { dist: HashShard($expr1) }
   │ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr4 = $expr5 }
   │   ├─StreamExchange { dist: HashShard($expr4) }
   │   │ └─StreamProject { exprs: [Field(supplier, 0:Int32) as $expr1, Field(supplier, 1:Int32) as $expr2, Field(supplier, 2:Int32) as $expr3, Field(supplier, 3:Int32) as $expr4, _row_id] }
   │   │   └─StreamFilter { predicate: (eventType = 'supplier':Varchar) }
   │   │     └─StreamShare { id: 4 }
   │   │       └─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
   │   │         └─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
   │   │           └─StreamRowIdGen { row_id_index: 10 }
   │   │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
   │   └─StreamExchange { dist: HashShard($expr5) }
   │     └─StreamProject { exprs: [Field(nation, 0:Int32) as $expr5, _row_id] }
   │       └─StreamFilter { predicate: (eventType = 'nation':Varchar) }
   │         └─StreamShare { id: 4 }
   │           └─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
   │             └─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
   │               └─StreamRowIdGen { row_id_index: 10 }
   │                 └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
   └─StreamExchange { dist: HashShard($expr7) }
     └─StreamHashJoin { type: LeftSemi, predicate: $expr6 = $expr8 }
       ├─StreamExchange { dist: HashShard($expr6) }
       │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr6, Field(partsupp, 1:Int32) as $expr7, _row_id] }
       │   └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
       │     └─StreamShare { id: 4 }
       │       └─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
       │         └─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
       │           └─StreamRowIdGen { row_id_index: 10 }
       │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
       └─StreamExchange { dist: HashShard($expr8) }
         └─StreamProject { exprs: [Field(part, 0:Int32) as $expr8, _row_id] }
           └─StreamFilter { predicate: (eventType = 'part':Varchar) }
             └─StreamShare { id: 4 }
               └─StreamProject { exprs: [eventType, supplier, part, partsupp, nation, _row_id] }
                 └─StreamFilter { predicate: ((((eventType = 'supplier':Varchar) OR (eventType = 'nation':Varchar)) OR (eventType = 'partsupp':Varchar)) OR (eventType = 'part':Varchar)) }
                   └─StreamRowIdGen { row_id_index: 10 }
                     └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(38 rows)

RW: http://metabase.risingwave-cloud.xyz/question/12860-tpch-q20-no-greater-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3111?start_date=2024-01-24

Flink: http://metabase.risingwave-cloud.xyz/question/12946-flink-tpch-q20-no-greater-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3108?start_date=2024-01-24

q20-no-greater is much much better than q20 on both systems. The improvement is more than 4 times for RW.

Therefore, we can conclude that this removed part is likely to be the bottleneck.

ps_availqty > (
    select
        0.005 * sum(l_quantity)
    from
    		lineitem
    where
    		l_partkey = ps_partkey
    		and l_suppkey = ps_suppkey
)

Therefore, let’s look at q20-only-greater.

Q20-ONLY-GREATER

Query:

    create sink tpch_q20_only_greater as
    select
    	ps_suppkey
    from
    	partsupp
    where
    	ps_availqty > (
    		select
    			0.005 * sum(l_quantity)
    		from
    			lineitem
    		where
    			l_partkey = ps_partkey
    			and l_suppkey = ps_suppkey
    	)
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Plan:

 StreamSink { type: append-only, columns: [ps_suppkey, _row_id(hidden), $expr10228(hidden), $expr10236(hidden), $expr10237(hidden)] }
 └─StreamProject { exprs: [$expr2, _row_id, $expr1, $expr5, $expr6] }
   └─StreamFilter { predicate: ($expr3 > $expr7) }
     └─StreamHashJoin { type: Inner, predicate: $expr1 = $expr5 AND $expr2 = $expr6 }
       ├─StreamExchange { dist: HashShard($expr1, $expr2) }
       │ └─StreamProject { exprs: [Field(partsupp, 0:Int32) as $expr1, Field(partsupp, 1:Int32) as $expr2, Field(partsupp, 2:Int32)::Decimal as $expr3, _row_id] }
       │   └─StreamFilter { predicate: (eventType = 'partsupp':Varchar) }
       │     └─StreamShare { id: 4 }
       │       └─StreamProject { exprs: [eventType, lineitem, partsupp, _row_id] }
       │         └─StreamFilter { predicate: ((eventType = 'partsupp':Varchar) OR (eventType = 'lineitem':Varchar)) }
       │           └─StreamRowIdGen { row_id_index: 10 }
       │             └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
       └─StreamProject { exprs: [(0.005:Decimal * sum($expr4)) as $expr7, $expr5, $expr6] }
         └─StreamHashAgg [append_only] { group_key: [$expr5, $expr6], aggs: [sum($expr4), count] }
           └─StreamExchange { dist: HashShard($expr5, $expr6) }
             └─StreamProject { exprs: [Field(lineitem, 4:Int32) as $expr4, Field(lineitem, 1:Int32) as $expr5, Field(lineitem, 2:Int32) as $expr6, _row_id] }
               └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
                 └─StreamShare { id: 4 }
                   └─StreamProject { exprs: [eventType, lineitem, partsupp, _row_id] }
                     └─StreamFilter { predicate: ((eventType = 'partsupp':Varchar) OR (eventType = 'lineitem':Varchar)) }
                       └─StreamRowIdGen { row_id_index: 10 }
                         └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(22 rows)

RW: http://metabase.risingwave-cloud.xyz/question/13316-tpch-q20-only-greater-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3123?start_date=2024-01-26

Flink: http://metabase.risingwave-cloud.xyz/question/13306-flink-tpch-q20-only-greater-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3120?start_date=2024-01-26

It confirms that the removed part, aka q20-only-greater is the bottleneck.

The barrier interval does not matter a lot for RW. And we are using the nightly-20240224 image here.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 26, 2024

suspect that the memory size, aka cache size, is the bottleneck for q20 and q20-only-greater.
Try to verify the throughput on a 8c32G machine, in contrast to the current 8c16G machine.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 27, 2024

suspect that the memory size, aka cache size, is the bottleneck for q20 and q20-only-greater. Try to verify the throughput on a 8c32G machine, in contrast to the current 8c16G machine.

run it once:
http://metabase.risingwave-cloud.xyz/question/13724-tpch-q20-only-greater-bs-8c32g-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3146?start_date=2024-01-28

https://buildkite.com/risingwave-test/tpch-benchmark/builds/1009

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709028167000&to=1709029750000&var-namespace=tpch-8c32g-1cn-affinity-test

Sadly we didn't adjust the setting in kube-bench, e.g. set the memory request and limit of compute node to 32G, but the memory limit is adjusted to 15GB instead of 13GB previously when still using 8c16GB machine.

And the effect is huge.

Previously: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708882589000&to=1708884401000&var-namespace=tpch-1cn-affinity-1s-0224

SCR-20240227-tql

Now: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709028167000&to=1709029750000&var-namespace=tpch-8c32g-1cn-affinity-test

SCR-20240227-tqe

  1. The eviction at the beginning of both tests is too aggressive.
  2. The eviction starts even when there is still quite much memory. Previously, eviction started at 8.5G when there is 13GB in total. Now, eviction started at 10.7 when there is 15GB. 35%/29% of total memory is not used.

@fuyufjh
Copy link
Member

fuyufjh commented Feb 28, 2024

#14797 (comment)

Does this confirm that subquery unnesting is the major cause?

@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Mar 7, 2024

#14797 (comment)

Does this confirm that subquery unnesting is the major cause?

q20-only-greater metabase:
http://metabase.risingwave-cloud.xyz/question/13316-tpch-q20-only-greater-bs-medium-1cn-af[…]-second-rows-s-history-thtb-3123?start_date=2024-01-26

Grafana:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-da[…]708884401000&var-namespace=tpch-1cn-affinity-1s-0224

SCR-20240307-j9q

when the cache starts to evict aggressively,
block cache data miss ops increases
s3-read increases
join barrier alignment latency increase,

I think it has a very strong correlation.

Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants