Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve nexmark q101 scaling up performance #14987

Closed
Tracked by #14448
lmatz opened this issue Feb 4, 2024 · 2 comments
Closed
Tracked by #14448

perf: improve nexmark q101 scaling up performance #14987

lmatz opened this issue Feb 4, 2024 · 2 comments
Labels
help wanted Issues that need help from contributors type/perf

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 4, 2024

nightly-20240127

4X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2921
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706394676000&to=1706396059000&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127

1X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2925
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706411270000&to=1706413073000&var-namespace=nexmark-1cn-affinity-weekly-20240127

RW 4X: 716K
RW 1X: 384K
4X/1X Ratio: 1.86

Flink: http://metabase.risingwave-cloud.xyz/question/9549-nexmark-rw-vs-flink-avg-source-throughput-all-testbeds?rw_tag=nightly-20240127&flink_tag=v1.16.0&flink_label=flink-medium-1tm-test-20230104,flink-4x-medium-1tm-test-20240104&flink_metrics=avg-job-throughput-per-second

4X/1X Ratio: 2.81

RW:

CREATE SINK nexmark_q101 AS
    SELECT
        a.id AS auction_id,
        a.item_name AS auction_item_name,
        b.max_price AS current_highest_bid
    FROM auction a
    LEFT OUTER JOIN (
        SELECT
            b1.auction,
            MAX(b1.price) max_price
        FROM bid b1
        GROUP BY b1.auction
    ) b ON a.id = b.auction
    WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Query Plan:

 StreamSink { type: append-only, columns: [auction_id, auction_item_name, current_highest_bid, _row_id(hidden), $expr10134(hidden)] }
 └─StreamHashJoin { type: LeftOuter, predicate: $expr2 = $expr4 }
   ├─StreamExchange { dist: HashShard($expr2) }
   │ └─StreamProject { exprs: [Field(auction, 0:Int32) as $expr2, Field(auction, 1:Int32) as $expr3, _row_id] }
   │   └─StreamFilter { predicate: (event_type = 1:Int32) }
   │     └─StreamShare { id: 6 }
   │       └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
   │         └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
   │           └─StreamRowIdGen { row_id_index: 6 }
   │             └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
   │               └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
   │                 └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
   └─StreamProject { exprs: [$expr4, max($expr5)] }
     └─StreamHashAgg [append_only] { group_key: [$expr4], aggs: [max($expr5), count] }
       └─StreamExchange { dist: HashShard($expr4) }
         └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr4, Field(bid, 2:Int32) as $expr5, _row_id] }
           └─StreamFilter { predicate: (event_type = 2:Int32) }
             └─StreamShare { id: 6 }
               └─StreamProject { exprs: [event_type, auction, bid, _row_id] }
                 └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
                   └─StreamRowIdGen { row_id_index: 6 }
                     └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                       └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                         └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(24 rows)

Dist Query Plan:

 Fragment 0
 StreamSink { type: append-only, columns: [auction_id, auction_item_name, current_highest_bid, _row_id(hidden), $expr10134(hidden)] }
 ├── tables: [ Sink: 0 ]
 └── StreamHashJoin { type: LeftOuter, predicate: $expr2 = $expr4 }
     ├── tables: [ HashJoinLeft: 1, HashJoinDegreeLeft: 2, HashJoinRight: 3, HashJoinDegreeRight: 4 ]
     ├── StreamExchange Hash([0]) from 1
     └── StreamProject { exprs: [$expr4, max($expr5)] }
         └── StreamHashAgg [append_only] { group_key: [$expr4], aggs: [max($expr5), count] } { tables: [ HashAggState: 7 ] }
             └── StreamExchange Hash([0]) from 3
 
 Fragment 1
 StreamProject { exprs: [Field(auction, 0:Int32) as $expr2, Field(auction, 1:Int32) as $expr3, _row_id] }
 └── StreamFilter { predicate: (event_type = 1:Int32) }
     └── StreamExchange NoShuffle from 2
 
 Fragment 2
 StreamProject { exprs: [event_type, auction, bid, _row_id] }
 └── StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) }
     └── StreamRowIdGen { row_id_index: 6 }
         └── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 5 ] }
             └── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                 └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 6 ] }
 
 Fragment 3
 StreamProject { exprs: [Field(bid, 0:Int32) as $expr4, Field(bid, 2:Int32) as $expr5, _row_id] }
 └── StreamFilter { predicate: (event_type = 2:Int32) }
     └── StreamExchange NoShuffle from 2
 
 Table 0
 ├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_vnode, kv_log_store_row_op, auction_id, auction_item_name, current_highest_bid, _row_id, $expr10134 ]
 ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8 ]
 ├── distribution key: [ 4 ]
 ├── read pk prefix len hint: 3
 └── vnode column idx: 2
 
 Table 1 { columns: [ $expr2, $expr3, _row_id ], primary key: [ $0 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 2 { columns: [ $expr2, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 3 { columns: [ $expr4, max($expr5) ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 4 { columns: [ $expr4, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
 Table 5 { columns: [ vnode, offset ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }
 
 Table 6 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
 Table 7 { columns: [ $expr4, max($expr5), count ], primary key: [ $0 ASC ], value indices: [ 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
 
(50 rows)

Flink:

INSERT INTO nexmark_q101
    SELECT
        a.id,
        a.itemName,
        b.max_price
    FROM auction a
    LEFT OUTER JOIN (
        SELECT
            b1.auction,
            MAX(b1.price) max_price
        FROM bid b1
        GROUP BY b1.auction
    ) b ON a.id = b.auction;

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q101], fields=[id, itemName, max_price])
+- Calc(select=[id, itemName, max_price])
   +- Join(joinType=[LeftOuterJoin], where=[=(id, auction)], select=[id, itemName, auction, max_price], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[id]])
      :  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[=(event_type, 1)])
      :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
      :        +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
      :           +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
      +- Exchange(distribution=[hash[auction]])
         +- GroupAggregate(groupBy=[auction], select=[auction, MAX(price) AS max_price])
            +- Exchange(distribution=[hash[auction]])
               +- Calc(select=[bid.auction AS auction, bid.price AS price], where=[=(event_type, 2)])
                  +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                     +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                        +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q101], fields=[id, itemName, max_price])
+- Calc(select=[id, itemName, max_price])
   +- Join(joinType=[LeftOuterJoin], where=[(id = auction)], select=[id, itemName, auction, max_price], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[id]])
      :  +- Calc(select=[auction.id AS id, auction.itemName AS itemName], where=[(event_type = 1)])
      :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])(reuse_id=[1])
      :        +- Calc(select=[event_type, person, auction, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
      :           +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
      +- Exchange(distribution=[hash[auction]])
         +- GroupAggregate(groupBy=[auction], select=[auction, MAX(price) AS max_price])
            +- Exchange(distribution=[hash[auction]])
               +- Calc(select=[bid.auction AS auction, bid.price AS price], where=[(event_type = 2)])
                  +- Reused(reference_id=[1])
@github-actions github-actions bot added this to the release-1.7 milestone Feb 4, 2024
@lmatz lmatz added help wanted Issues that need help from contributors type/perf labels Feb 4, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 4, 2024

Some notable phenomenon:

1. CN's CPU cannot keep up when scaling up

4X:
SCR-20240205-1q9

1X:
SCR-20240205-1qe

only 3 times

2. Join Executor Barrier Align is significantly higher

4X:
SCR-20240205-1sd

1X:
SCR-20240205-1sg

3. HashAgg executor throughput seems to be OK in general when scaling up

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706394676000&to=1706396059000&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-nu8

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706411270000&orgId=1&to=1706413073000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-nuc

4. HashJoin (LeftOuter) executor throughput seems much lower when scaling up

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706394676000&to=1706396059000&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-nwi

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706411270000&orgId=1&to=1706413073000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240205-nwm

This is also weird as the executor cache miss ratio shown above is generally lower when 4X. It seems to imply that cache miss is not the bottleneck when scaling up.

5. Very weird that 4X's data block cache miss rate is also lower

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706394676000&to=1706396059000&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=99
SCR-20240205-o2q

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706411270000&orgId=1&to=1706413073000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=99
SCR-20240205-o2u

@lmatz
Copy link
Contributor Author

lmatz commented Feb 5, 2024

As

  1. the cache/storage statistics favor 4X more than 1X,
  2. HashAgg does not output less when scaling up

I suppose it is more likely to be an issue of the LeftOuter join operator or/and Exchange.

@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
@lmatz lmatz closed this as completed Mar 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Issues that need help from contributors type/perf
Projects
None yet
Development

No branches or pull requests

1 participant