Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve nexmark q102 scaling up performance #15004

Closed
Tracked by #14448
lmatz opened this issue Feb 5, 2024 · 2 comments
Closed
Tracked by #14448

perf: improve nexmark q102 scaling up performance #15004

lmatz opened this issue Feb 5, 2024 · 2 comments
Labels
help wanted Issues that need help from contributors type/perf

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 5, 2024

nightly-20240127

4X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2921
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706396338000&to=1706398141000&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127

1X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2925
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706413292000&to=1706415095000&var-namespace=nexmark-1cn-affinity-weekly-20240127

RW 4X: 358K
RW 1X: 187K
4X/1X Ratio: 1.91

Flink: http://metabase.risingwave-cloud.xyz/question/9549-nexmark-rw-vs-flink-avg-source-throughput-all-testbeds?rw_tag=nightly-20240127&flink_tag=v1.16.0&flink_label=flink-medium-1tm-test-20230104,flink-4x-medium-1tm-test-20240104&flink_metrics=avg-job-throughput-per-second

4X/1X Ratio: 2.72

RW:

CREATE SINK nexmark_q102 AS
    SELECT
        a.id AS auction_id,
        a.item_name AS auction_item_name,
        COUNT(b.auction) AS bid_count
    FROM auction a
    JOIN bid b ON a.id = b.auction
    GROUP BY a.id, a.item_name
    HAVING COUNT(b.auction) >= (
        SELECT COUNT(*) / COUNT(DISTINCT auction) FROM bid
    )
    WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Query Plan:

 StreamSink { type: append-only, columns: [auction_id, auction_item_name, bid_count] }
 └─StreamDynamicFilter { predicate: (count($expr4) >= $expr5), output: [$expr2, $expr3, count($expr4)] }
   ├─StreamProject { exprs: [$expr2, $expr3, count($expr4)] }
   │ └─StreamHashAgg [append_only] { group_key: [$expr2, $expr3], aggs: [count($expr4), count] }
   │   └─StreamHashJoin [append_only] { type: Inner, predicate: $expr2 = $expr4 }
   │     ├─StreamExchange { dist: HashShard($expr2) }
   │     │ └─StreamProject { exprs: [Field(auction, 0:Int32) as $expr2, Field(auction, 1:Int32) as $expr3, _row_id] }
   │     │   └─StreamFilter { predicate: (event_type = 1:Int32) }
   │     │     └─StreamShare { id: 6 }
   │     │       └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   │     │         └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
   │     │           └─StreamRowIdGen { row_id_index: 6 }
   │     │             └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
   │     │               └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
   │     │                 └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
   │     └─StreamExchange { dist: HashShard($expr4) }
   │       └─StreamShare { id: 12 }
   │         └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr4, _row_id] }
   │           └─StreamFilter { predicate: (event_type = 2:Int32) }
   │             └─StreamShare { id: 6 }
   │               └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   │                 └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
   │                   └─StreamRowIdGen { row_id_index: 6 }
   │                     └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
   │                       └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
   │                         └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
   └─StreamExchange { dist: Broadcast }
     └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] }
       └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] }
         └─StreamExchange { dist: Single }
           └─StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr4)] }
             └─StreamHashAgg [append_only] { group_key: [$expr4], aggs: [count] }
               └─StreamExchange { dist: HashShard($expr4) }
                 └─StreamShare { id: 12 }
                   └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr4, _row_id] }
                     └─StreamFilter { predicate: (event_type = 2:Int32) }
                       └─StreamShare { id: 6 }
                         └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
                           └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
                             └─StreamRowIdGen { row_id_index: 6 }
                               └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                                 └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                                   └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(43 rows)

Dist Query Plan:

  Fragment 0
 StreamSink { type: append-only, columns: [auction_id, auction_item_name, bid_count] }
 ├── tables: [ Sink: 0 ]
 └── StreamDynamicFilter { predicate: (count($expr4) >= $expr5), output: [$expr2, $expr3, count($expr4)] }
     ├── tables: [ DynamicFilterLeft: 1, DynamicFilterRight: 2 ]
     ├── StreamProject { exprs: [$expr2, $expr3, count($expr4)] }
     │   └── StreamHashAgg [append_only] { group_key: [$expr2, $expr3], aggs: [count($expr4), count] }
     │       ├── tables: [ HashAggState: 3 ]
     │       └── StreamHashJoin [append_only] { type: Inner, predicate: $expr2 = $expr4 }
     │           ├── tables:
     │           │   ┌── HashJoinLeft: 4
     │           │   ├── HashJoinDegreeLeft: 5
     │           │   ├── HashJoinRight: 6
     │           │   └── HashJoinDegreeRight: 7
     │           ├── StreamExchange Hash([0]) from 1
     │           └── StreamExchange Hash([0]) from 3
     └── StreamExchange Broadcast from 5
 
 Fragment 1
 StreamProject { exprs: [Field(auction, 0:Int32) as $expr2, Field(auction, 1:Int32) as $expr3, _row_id] }
 └── StreamFilter { predicate: (event_type = 1:Int32) }
     └── StreamExchange NoShuffle from 2
 
 Fragment 2
 StreamProject { exprs: [event_type, auction, bid, _row_id] }
 └── StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
     └── StreamRowIdGen { row_id_index: 6 }
         └── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 8 ] }
             └── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                 └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 9 ] }
 
 Fragment 3
 StreamNoOp
 └── StreamExchange NoShuffle from 4
 
 Fragment 4
 StreamProject { exprs: [Field(bid, 0:Int32) as $expr4, _row_id] }
 └── StreamFilter { predicate: (event_type = 2:Int32) }
     └── StreamExchange NoShuffle from 2
 
 Fragment 5
 StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] }
 └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] } { tables: [ SimpleAggState: 10 ] }
     └── StreamExchange Single from 6
 
 Fragment 6
 StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr4)] }
 └── StreamHashAgg [append_only] { group_key: [$expr4], aggs: [count] } { tables: [ HashAggState: 11 ] }
     └── StreamExchange Hash([0]) from 7
 
 Fragment 7
 StreamNoOp
 └── StreamExchange NoShuffle from 4
 
 Table 0
 ├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_vnode, kv_log_store_row_op, auction_id, auction_item_name, bid_count ]
 ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5, 6 ]
 ├── distribution key: [ 4 ]
 ├── read pk prefix len hint: 3
 └── vnode column idx: 2
 
 Table 1 { columns: [ $expr2, $expr3, count($expr4) ], primary key: [ $2 ASC, $0 ASC, $1 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 2 { columns: [ $expr5 ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 }
 
 Table 3 { columns: [ $expr2, $expr3, count($expr4), count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 2 }
 
 Table 4 { columns: [ $expr2, $expr3, _row_id ], primary key: [ $0 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 5 { columns: [ $expr2, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 6 { columns: [ $expr4, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 7 { columns: [ $expr4, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 8 { columns: [ vnode, offset ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }
 
 Table 9 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
 Table 10 { columns: [ sum0(sum0(count)), sum0(count($expr4)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 }
 
 Table 11 { columns: [ $expr4, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
(84 rows)

Flink:

INSERT INTO nexmark_q102
    SELECT
        a.id,
        a.itemName,
        COUNT(b.auction) AS bid_count
    FROM auction a
    JOIN bid b ON a.id = b.auction
    GROUP BY a.id, a.itemName
    HAVING COUNT(b.auction) >= (
        SELECT COUNT(*) / COUNT(DISTINCT auction) FROM bid
    );

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q102], fields=[id, itemName, bid_count])
+- Calc(select=[id, itemName, bid_count])
   +- Join(joinType=[InnerJoin], where=[>=(bid_count, $f0)], select=[id, itemName, bid_count, $f0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[single])
      :  +- GroupAggregate(groupBy=[id, itemName], select=[id, itemName, COUNT(auction) AS bid_count])
      :     +- Exchange(distribution=[hash[id, itemName]])
      :        +- Join(joinType=[InnerJoin], where=[=(id, auction)], select=[id, itemName, auction], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :           :- Exchange(distribution=[hash[id]])
      :           :  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[=(event_type, 1)])
      :           :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
      :           :        +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
      :           :           +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
      :           +- Exchange(distribution=[hash[auction]])
      :              +- Calc(select=[bid.auction AS auction], where=[=(event_type, 2)])
      :                 +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
      :                    +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
      :                       +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
      +- Exchange(distribution=[single])
         +- GroupAggregate(select=[SINGLE_VALUE_RETRACT(EXPR$0) AS $f0])
            +- Exchange(distribution=[single])
               +- Calc(select=[/($f0, $f1) AS EXPR$0])
                  +- GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f1_0) AS $f0, $SUM0_RETRACT($f2) AS $f1])
                     +- Exchange(distribution=[single])
                        +- GroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(*) AS $f1_0, COUNT(DISTINCT auction) AS $f2])
                           +- Exchange(distribution=[hash[$f1]])
                              +- Calc(select=[bid.auction AS auction, MOD(HASH_CODE(bid.auction), 1024) AS $f1], where=[=(event_type, 2)])
                                 +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                                    +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                                       +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q102], fields=[id, itemName, bid_count])
+- Calc(select=[id, itemName, bid_count])
   +- Join(joinType=[InnerJoin], where=[(bid_count >= $f0)], select=[id, itemName, bid_count, $f0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[single])
      :  +- GroupAggregate(groupBy=[id, itemName], select=[id, itemName, COUNT(auction) AS bid_count])
      :     +- Exchange(distribution=[hash[id, itemName]])
      :        +- Join(joinType=[InnerJoin], where=[(id = auction)], select=[id, itemName, auction], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      :           :- Exchange(distribution=[hash[id]])
      :           :  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[(event_type = 1)])
      :           :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])(reuse_id=[1])
      :           :        +- Calc(select=[event_type, person, auction, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
      :           :           +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
      :           +- Exchange(distribution=[hash[auction]])
      :              +- Calc(select=[bid.auction AS auction], where=[(event_type = 2)])
      :                 +- Reused(reference_id=[1])
      +- Exchange(distribution=[single])
         +- GroupAggregate(select=[SINGLE_VALUE_RETRACT(EXPR$0) AS $f0])
            +- Exchange(distribution=[single])
               +- Calc(select=[($f0 / $f1) AS EXPR$0])
                  +- GroupAggregate(partialFinalType=[FINAL], select=[$SUM0_RETRACT($f1_0) AS $f0, $SUM0_RETRACT($f2) AS $f1])
                     +- Exchange(distribution=[single])
                        +- GroupAggregate(groupBy=[$f1], partialFinalType=[PARTIAL], select=[$f1, COUNT(*) AS $f1_0, COUNT(DISTINCT auction) AS $f2])
                           +- Exchange(distribution=[hash[$f1]])
                              +- Calc(select=[bid.auction AS auction, MOD(HASH_CODE(bid.auction), 1024) AS $f1], where=[(event_type = 2)])
                                 +- Reused(reference_id=[1])
@github-actions github-actions bot added this to the release-1.7 milestone Feb 5, 2024
@lmatz lmatz added help wanted Issues that need help from contributors type/perf labels Feb 5, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 5, 2024

Some notable phenomena:

1. Join Executor Barrie Align is much higher when scaling up

4X:
SCR-20240205-smg

1X:
SCR-20240205-smj

2. HashJoin executor's throughput is much worse when scaling up

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706396338000&orgId=1&to=1706398141000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-sqm

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706413292000&to=1706415095000&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-sqj

3. StatelessSimpleAgg executor's throughput is much worse when scaling up

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706396338000&orgId=1&to=1706398141000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-sv7

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706413292000&to=1706415095000&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-sva

No wonder, SimpleAgg is bad too since it is on the other side of exchange after StatelessSimpleAgg.
4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706396338000&orgId=1&to=1706398141000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-szk

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706413292000&to=1706415095000&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-szn

4. HashAgg executor throughput is worse when scaling up

There are two HashAgg in the query plan, but since they are both worse, it does not matter which one is which one (How to differentiate two though?)

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706396338000&orgId=1&to=1706398141000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-t3z

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706413292000&to=1706415095000&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-t48

5. Dynamic executor throughput is better when scaling up

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706396338000&orgId=1&to=1706398141000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-t9u

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706413292000&to=1706415095000&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63

SCR-20240205-t9x

@lmatz
Copy link
Contributor Author

lmatz commented Feb 5, 2024

Since:

  1. The bottom HashAgg is the first stateful operator in the branch of subplan, it does not make sense that its performance will decrease when scaling up. probably due to back-pressure
  2. StatelessSimpleAgg and SimpleAgg are both too simple to be the root cause. (may still take a look) probably also due to back-pressure
  3. Dynamic filter executor's throughput gets better when scaling up

I suppose HashJoin and Exchange may need to be investigated first?

@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
@lmatz lmatz closed this as completed Mar 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Issues that need help from contributors type/perf
Projects
None yet
Development

No branches or pull requests

1 participant