Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve nexmark q16 scaling up performance #14985

Closed
Tracked by #14448
lmatz opened this issue Feb 4, 2024 · 4 comments
Closed
Tracked by #14448

perf: improve nexmark q16 scaling up performance #14985

lmatz opened this issue Feb 4, 2024 · 4 comments
Assignees
Labels
help wanted Issues that need help from contributors type/perf

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 4, 2024

nightly-20240127
all default configuration

4X:

  1. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706385538000&orgId=1&to=1706387341000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127
  2. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2925

1X:

  1. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706401160000&orgId=1&to=1706402963000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-weekly-20240127
  2. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2921

RW 4X: 145K
RW 1X: 102K

4X/1X Ratio: 1.42

Flink: http://metabase.risingwave-cloud.xyz/question/9549-nexmark-rw-vs-flink-avg-source-throughput-all-testbeds?rw_tag=nightly-20240127&flink_tag=v1.16.0&flink_label=flink-medium-1tm-test-20230104,flink-4x-medium-1tm-test-20240104&flink_metrics=avg-job-throughput-per-second

4X/1X Ratio: 2.36.

RW:

CREATE SINK nexmark_q16 AS
SELECT channel,
       to_char(date_time, 'YYYY-MM-DD')                                          as "day",
       max(to_char(date_time, 'HH:mm'))                                          as "minute",
       count(*)                                                                  AS total_bids,
       count(*) filter (where price < 10000)                                     AS rank1_bids,
       count(*) filter (where price >= 10000 and price < 1000000)                AS rank2_bids,
       count(*) filter (where price >= 1000000)                                  AS rank3_bids,
       count(distinct bidder)                                                    AS total_bidders,
       count(distinct bidder) filter (where price < 10000)                       AS rank1_bidders,
       count(distinct bidder) filter (where price >= 10000 and price < 1000000)  AS rank2_bidders,
       count(distinct bidder) filter (where price >= 1000000)                    AS rank3_bidders,
       count(distinct auction)                                                   AS total_auctions,
       count(distinct auction) filter (where price < 10000)                      AS rank1_auctions,
       count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
       count(distinct auction) filter (where price >= 1000000)                   AS rank3_auctions
FROM bid
GROUP BY to_char(date_time, 'YYYY-MM-DD'), channel
WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

One critical choice is whether we want to enable partial aggregation for q16.

We know that date_time is monotonically increasing and since it is to_char to the granularity of DD, during a period of time, the to_char(date_time, 'YYYY-MM-DD') of each input record is the same.
For the channel column, we can refer to the data generator:

  1. https://github.com/risingwavelabs/nexmark-bench/blob/main/src/generator/nexmark/event.rs#L340-L348
  2. https://github.com/risingwavelabs/nexmark-bench/blob/3bee2aaad26713087d0465a4ec43e3a3b2cec07a/src/generator/nexmark/config.rs#L161-L162
  3. https://github.com/risingwavelabs/nexmark-bench/blob/3bee2aaad26713087d0465a4ec43e3a3b2cec07a/src/generator/nexmark/config.rs#L7
  4. https://github.com/risingwavelabs/nexmark-bench/blob/3bee2aaad26713087d0465a4ec43e3a3b2cec07a/src/generator/nexmark/config.rs#L128

In essence,
50% chance is that channel is chosen from 4 hot channel candidates.
Another 50% chance is that channel is chosen from 10000 non-hot channel candidates.

By default, RW_FORCE_TWO_PHASE_AGG and RW_FORCE_SPLIT_DISTINCT_AGG are both set to false.

query plan:

 StreamSink { type: append-only, columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
 └─StreamProject { exprs: [$expr3, $expr2, max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count(distinct $expr6), count(distinct $expr6) filter(($expr5 < 10000:Int32)), count(distinct $expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr6) filter(($expr5 >= 1000000:Int32)), count(distinct $expr7), count(distinct $expr7) filter(($expr5 < 10000:Int32)), count(distinct $expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr7) filter(($expr5 >= 1000000:Int32))] }
   └─StreamHashAgg [append_only] { group_key: [$expr2, $expr3], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count(distinct $expr6), count(distinct $expr6) filter(($expr5 < 10000:Int32)), count(distinct $expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr6) filter(($expr5 >= 1000000:Int32)), count(distinct $expr7), count(distinct $expr7) filter(($expr5 < 10000:Int32)), count(distinct $expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr7) filter(($expr5 >= 1000000:Int32))] }
     └─StreamExchange { dist: HashShard($expr2, $expr3) }
       └─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 3:Int32) as $expr3, ToChar($expr1, 'HH:mm':Varchar) as $expr4, Field(bid, 2:Int32) as $expr5, Field(bid, 1:Int32) as $expr6, Field(bid, 0:Int32) as $expr7, _row_id] }
         └─StreamFilter { predicate: (event_type = 2:Int32) }
           └─StreamRowIdGen { row_id_index: 6 }
             └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
               └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                 └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(10 rows)

Dist query plan:

 Fragment 0
 StreamSink { type: append-only, columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] } { tables: [ Sink: 0 ] }
 └── StreamProject { exprs: [$expr3, $expr2, max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count(distinct $expr6), count(distinct $expr6) filter(($expr5 < 10000:Int32)), count(distinct $expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr6) filter(($expr5 >= 1000000:Int32)), count(distinct $expr7), count(distinct $expr7) filter(($expr5 < 10000:Int32)), count(distinct $expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr7) filter(($expr5 >= 1000000:Int32))] }
     └── StreamHashAgg [append_only] { group_key: [$expr2, $expr3], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count(distinct $expr6), count(distinct $expr6) filter(($expr5 < 10000:Int32)), count(distinct $expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr6) filter(($expr5 >= 1000000:Int32)), count(distinct $expr7), count(distinct $expr7) filter(($expr5 < 10000:Int32)), count(distinct $expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr7) filter(($expr5 >= 1000000:Int32))] }
         ├── tables: [ HashAggState: 1, HashAggDedupForCol4: 2, HashAggDedupForCol5: 3 ]
         └── StreamExchange Hash([0, 1]) from 1
 
 Fragment 1
 StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 3:Int32) as $expr3, ToChar($expr1, 'HH:mm':Varchar) as $expr4, Field(bid, 2:Int32) as $expr5, Field(bid, 1:Int32) as $expr6, Field(bid, 0:Int32) as $expr7, _row_id] }
 └── StreamFilter { predicate: (event_type = 2:Int32) }
     └── StreamRowIdGen { row_id_index: 6 }
         └── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 4 ] }
             └── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                 └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 5 ] }
 
 Table 0 { columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_vnode, kv_log_store_row_op, channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18 ], distribution key: [ 5, 4 ], read pk prefix len hint: 3, vnode column idx: 2 }
 
 Table 1
 ├── columns: [ $expr2, $expr3, max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count(distinct $expr6), count(distinct $expr6) filter(($expr5 < 10000:Int32)), count(distinct $expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr6) filter(($expr5 >= 1000000:Int32)), count(distinct $expr7), count(distinct $expr7) filter(($expr5 < 10000:Int32)), count(distinct $expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count(distinct $expr7) filter(($expr5 >= 1000000:Int32)) ]
 ├── primary key: [ $0 ASC, $1 ASC ]
 ├── value indices: [ 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 ]
 ├── distribution key: [ 0, 1 ]
 └── read pk prefix len hint: 2
 
 Table 2 { columns: [ $expr2, $expr3, $expr6, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3, 4, 5, 6 ], distribution key: [ 0, 1 ], read pk prefix len hint: 3 }
 
 Table 3 { columns: [ $expr2, $expr3, $expr7, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3, 4, 5, 6 ], distribution key: [ 0, 1 ], read pk prefix len hint: 3 }
 
 Table 4 { columns: [ vnode, offset ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }
 
 Table 5 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
(32 rows)

If we set both RW_FORCE_TWO_PHASE_AGG and RW_FORCE_SPLIT_DISTINCT_AGG to true,

query plan:

 StreamSink { type: append-only, columns: [auction, day, total_bids, rank1_bids, rank2_bids, rank3_bids, min_price, max_price, avg_price, sum_price] }
 └─StreamProject { exprs: [$expr3, $expr2, sum0(count), sum0(count filter(($expr4 < 10000:Int32))), sum0(count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32))), sum0(count filter(($expr4 >= 1000000:Int32))), min(min($expr4)), max(max($expr4)), (sum(sum($expr4)) / sum0(count($expr4))::Decimal) as $expr6, sum(sum($expr4))] }
   └─StreamHashAgg { group_key: [$expr2, $expr3], aggs: [sum0(count), sum0(count filter(($expr4 < 10000:Int32))), sum0(count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32))), sum0(count filter(($expr4 >= 1000000:Int32))), min(min($expr4)), max(max($expr4)), sum(sum($expr4)), sum0(count($expr4)), count] }
     └─StreamExchange { dist: HashShard($expr2, $expr3) }
       └─StreamHashAgg [append_only] { group_key: [$expr2, $expr3, $expr5], aggs: [count, count filter(($expr4 < 10000:Int32)), count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32)), count filter(($expr4 >= 1000000:Int32)), min($expr4), max($expr4), sum($expr4), count($expr4)] }
         └─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id, Vnode(_row_id) as $expr5] }
           └─StreamFilter { predicate: (event_type = 2:Int32) }
             └─StreamRowIdGen { row_id_index: 6 }
               └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                 └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                   └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(11 rows)

Dist query plan:

 Fragment 0
 StreamSink { type: append-only, columns: [auction, day, total_bids, rank1_bids, rank2_bids, rank3_bids, min_price, max_price, avg_price, sum_price] } { tables: [ Sink: 0 ] }
 └── StreamProject { exprs: [$expr3, $expr2, sum0(count), sum0(count filter(($expr4 < 10000:Int32))), sum0(count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32))), sum0(count filter(($expr4 >= 1000000:Int32))), min(min($expr4)), max(max($expr4)), (sum(sum($expr4)) / sum0(count($expr4))::Decimal) as $expr6, sum(sum($expr4))] }
     └── StreamHashAgg { group_key: [$expr2, $expr3], aggs: [sum0(count), sum0(count filter(($expr4 < 10000:Int32))), sum0(count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32))), sum0(count filter(($expr4 >= 1000000:Int32))), min(min($expr4)), max(max($expr4)), sum(sum($expr4)), sum0(count($expr4)), count] }
         ├── tables: [ HashAggState: 3, HashAggCall4: 1, HashAggCall5: 2 ]
         └── StreamExchange Hash([0, 1]) from 1
 
 Fragment 1
 StreamHashAgg [append_only] { group_key: [$expr2, $expr3, $expr5], aggs: [count, count filter(($expr4 < 10000:Int32)), count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32)), count filter(($expr4 >= 1000000:Int32)), min($expr4), max($expr4), sum($expr4), count($expr4)] } { tables: [ HashAggState: 4 ] }
 └── StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 0:Int32) as $expr3, Field(bid, 2:Int32) as $expr4, _row_id, Vnode(_row_id) as $expr5] }
     └── StreamFilter { predicate: (event_type = 2:Int32) }
         └── StreamRowIdGen { row_id_index: 6 }
             └── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 5 ] }
                 └── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                     └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 6 ] }
 
 Table 0
 ├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_vnode, kv_log_store_row_op, auction, day, total_bids, rank1_bids, rank2_bids, rank3_bids, min_price, max_price, avg_price, sum_price ]
 ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 ]
 ├── distribution key: [ 5, 4 ]
 ├── read pk prefix len hint: 3
 └── vnode column idx: 2
 
 Table 1 { columns: [ $expr2, $expr3, min($expr4), $expr5 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 2 { columns: [ $expr2, $expr3, max($expr4), $expr5 ], primary key: [ $0 ASC, $1 ASC, $2 DESC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }
 
 Table 3
 ├── columns: [ $expr2, $expr3, sum0(count), sum0(count filter(($expr4 < 10000:Int32))), sum0(count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32))), sum0(count filter(($expr4 >= 1000000:Int32))), min(min($expr4)), max(max($expr4)), sum(sum($expr4)), sum0(count($expr4)), count ]
 ├── primary key: [ $0 ASC, $1 ASC ]
 ├── value indices: [ 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
 ├── distribution key: [ 0, 1 ]
 └── read pk prefix len hint: 2
 
 Table 4
 ├── columns: [ $expr2, $expr3, $expr5, count, count filter(($expr4 < 10000:Int32)), count filter(($expr4 >= 10000:Int32) AND ($expr4 < 1000000:Int32)), count filter(($expr4 >= 1000000:Int32)), min($expr4), max($expr4), sum($expr4), count($expr4) ]
 ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
 ├── value indices: [ 3, 4, 5, 6, 7, 8, 9, 10 ]
 ├── distribution key: []
 ├── read pk prefix len hint: 3
 └── vnode column idx: 2
 
 Table 5 { columns: [ vnode, offset ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }
 
 Table 6 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
 
(47 rows)

Flink:

CREATE TABLE nexmark_q16 (
        channel VARCHAR,
        `day` VARCHAR,
        `minute` VARCHAR,
        total_bids BIGINT,
        rank1_bids BIGINT,
        rank2_bids BIGINT,
        rank3_bids BIGINT,
        total_bidders BIGINT,
        rank1_bidders BIGINT,
        rank2_bidders BIGINT,
        rank3_bidders BIGINT,
        total_auctions BIGINT,
        rank1_auctions BIGINT,
        rank2_auctions BIGINT,
        rank3_auctions BIGINT
    ) WITH (
        'connector' = 'blackhole'
    );

    INSERT INTO nexmark_q16
    SELECT
        channel,
        DATE_FORMAT(dateTime, 'yyyy-MM-dd') as `day`,
        max(DATE_FORMAT(dateTime, 'HH:mm')) as `minute`,
        count(*) AS total_bids,
        count(*) filter (where price < 10000) AS rank1_bids,
        count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
        count(*) filter (where price >= 1000000) AS rank3_bids,
        count(distinct bidder) AS total_bidders,
        count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
        count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
        count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
        count(distinct auction) AS total_auctions,
        count(distinct auction) filter (where price < 10000) AS rank1_auctions,
        count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
        count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
    FROM bid
    GROUP BY channel, DATE_FORMAT(dateTime, 'yyyy-MM-dd');

Query plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q16], fields=[channel, day, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, $f13, $f14])
+- GlobalGroupAggregate(groupBy=[channel, day], partialFinalType=[FINAL], select=[channel, day, MAX(max$0) AS $f2, $SUM0(count1$1) AS $f3, $SUM0(count1$2) AS $f4, $SUM0(count1$3) AS $f5, $SUM0(count1$4) AS $f6, $SUM0(count$5) AS $f7, $SUM0(count$6) AS $f8, $SUM0(count$7) AS $f9, $SUM0(count$8) AS $f10, $SUM0(count$9) AS $f11, $SUM0(count$10) AS $f12, $SUM0(count$11) AS $f13, $SUM0(count$12) AS $f14])
   +- Exchange(distribution=[hash[channel, day]])
      +- IncrementalGroupAggregate(partialAggGrouping=[channel, day, $f8, $f9, $f10], finalAggGrouping=[channel, day], select=[channel, day, MAX(max$0) AS max$0, COUNT(count1$1) AS count1$1, COUNT(count1$2) AS count1$2, COUNT(count1$3) AS count1$3, COUNT(count1$4) AS count1$4, COUNT(distinct$0 count$5) AS count$5, COUNT(distinct$0 count$6) AS count$6, COUNT(distinct$0 count$7) AS count$7, COUNT(distinct$0 count$8) AS count$8, COUNT(distinct$1 count$9) AS count$9, COUNT(distinct$1 count$10) AS count$10, COUNT(distinct$1 count$11) AS count$11, COUNT(distinct$1 count$12) AS count$12])
         +- Exchange(distribution=[hash[channel, day, $f8, $f9, $f10]])
            +- LocalGroupAggregate(groupBy=[channel, day, $f8, $f9, $f10], partialFinalType=[PARTIAL], select=[channel, day, $f8, $f9, $f10, MAX($f2) FILTER $g_3 AS max$0, COUNT(*) FILTER $g_7 AS count1$1, COUNT(*) FILTER $g_70 AS count1$2, COUNT(*) FILTER $g_71 AS count1$3, COUNT(*) FILTER $g_72 AS count1$4, COUNT(distinct$0 bidder) FILTER $g_5 AS count$5, COUNT(distinct$0 bidder) FILTER $g_50 AS count$6, COUNT(distinct$0 bidder) FILTER $g_51 AS count$7, COUNT(distinct$0 bidder) FILTER $g_52 AS count$8, COUNT(distinct$1 auction) FILTER $g_6 AS count$9, COUNT(distinct$1 auction) FILTER $g_60 AS count$10, COUNT(distinct$1 auction) FILTER $g_61 AS count$11, COUNT(distinct$1 auction) FILTER $g_62 AS count$12, DISTINCT(bidder) AS distinct$0, DISTINCT(auction) AS distinct$1])
               +- Calc(select=[channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, $f9, $f10, =($e, 3) AS $g_3, =($e, 7) AS $g_7, AND(=($e, 7), $f3) AS $g_70, AND(=($e, 7), $f4) AS $g_71, AND(=($e, 7), $f5) AS $g_72, =($e, 5) AS $g_5, AND(=($e, 5), $f3) AS $g_50, AND(=($e, 5), $f4) AS $g_51, AND(=($e, 5), $f5) AS $g_52, =($e, 6) AS $g_6, AND(=($e, 6), $f3) AS $g_60, AND(=($e, 6), $f4) AS $g_61, AND(=($e, 6), $f5) AS $g_62])
                  +- Expand(projects=[{channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, null AS $f9, null AS $f10, 3 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, $f9, null AS $f10, 5 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, $f10, 6 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, null AS $f10, 7 AS $e}])
                     +- Calc(select=[bid.channel AS channel, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'HH:mm') AS $f2, IS TRUE(<(bid.price, 10000)) AS $f3, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f4, IS TRUE(>=(bid.price, 1000000)) AS $f5, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'HH:mm')), 1024) AS $f8, MOD(HASH_CODE(bid.bidder), 1024) AS $f9, MOD(HASH_CODE(bid.auction), 1024) AS $f10], where=[=(event_type, 2)])
                        +- MiniBatchAssigner(interval=[2000ms], mode=[ProcTime])
                           +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                              +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                                 +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])


== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q16], fields=[channel, day, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, $f13, $f14])
+- GlobalGroupAggregate(groupBy=[channel, day], partialFinalType=[FINAL], select=[channel, day, MAX(max$0) AS $f2, $SUM0(count1$1) AS $f3, $SUM0(count1$2) AS $f4, $SUM0(count1$3) AS $f5, $SUM0(count1$4) AS $f6, $SUM0(count$5) AS $f7, $SUM0(count$6) AS $f8, $SUM0(count$7) AS $f9, $SUM0(count$8) AS $f10, $SUM0(count$9) AS $f11, $SUM0(count$10) AS $f12, $SUM0(count$11) AS $f13, $SUM0(count$12) AS $f14])
   +- Exchange(distribution=[hash[channel, day]])
      +- IncrementalGroupAggregate(partialAggGrouping=[channel, day, $f8, $f9, $f10], finalAggGrouping=[channel, day], select=[channel, day, MAX(max$0) AS max$0, COUNT(count1$1) AS count1$1, COUNT(count1$2) AS count1$2, COUNT(count1$3) AS count1$3, COUNT(count1$4) AS count1$4, COUNT(distinct$0 count$5) AS count$5, COUNT(distinct$0 count$6) AS count$6, COUNT(distinct$0 count$7) AS count$7, COUNT(distinct$0 count$8) AS count$8, COUNT(distinct$1 count$9) AS count$9, COUNT(distinct$1 count$10) AS count$10, COUNT(distinct$1 count$11) AS count$11, COUNT(distinct$1 count$12) AS count$12])
         +- Exchange(distribution=[hash[channel, day, $f8, $f9, $f10]])
            +- LocalGroupAggregate(groupBy=[channel, day, $f8, $f9, $f10], partialFinalType=[PARTIAL], select=[channel, day, $f8, $f9, $f10, MAX($f2) FILTER $g_3 AS max$0, COUNT(*) FILTER $g_7 AS count1$1, COUNT(*) FILTER $g_70 AS count1$2, COUNT(*) FILTER $g_71 AS count1$3, COUNT(*) FILTER $g_72 AS count1$4, COUNT(distinct$0 bidder) FILTER $g_5 AS count$5, COUNT(distinct$0 bidder) FILTER $g_50 AS count$6, COUNT(distinct$0 bidder) FILTER $g_51 AS count$7, COUNT(distinct$0 bidder) FILTER $g_52 AS count$8, COUNT(distinct$1 auction) FILTER $g_6 AS count$9, COUNT(distinct$1 auction) FILTER $g_60 AS count$10, COUNT(distinct$1 auction) FILTER $g_61 AS count$11, COUNT(distinct$1 auction) FILTER $g_62 AS count$12, DISTINCT(bidder) AS distinct$0, DISTINCT(auction) AS distinct$1])
               +- Calc(select=[channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, $f9, $f10, ($e = 3) AS $g_3, ($e = 7) AS $g_7, (($e = 7) AND $f3) AS $g_70, (($e = 7) AND $f4) AS $g_71, (($e = 7) AND $f5) AS $g_72, ($e = 5) AS $g_5, (($e = 5) AND $f3) AS $g_50, (($e = 5) AND $f4) AS $g_51, (($e = 5) AND $f5) AS $g_52, ($e = 6) AS $g_6, (($e = 6) AND $f3) AS $g_60, (($e = 6) AND $f4) AS $g_61, (($e = 6) AND $f5) AS $g_62])
                  +- Expand(projects=[{channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, null AS $f9, null AS $f10, 3 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, $f9, null AS $f10, 5 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, $f10, 6 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, null AS $f10, 7 AS $e}])
                     +- Calc(select=[bid.channel AS channel, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'yyyy-MM-dd') AS day, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'HH:mm') AS $f2, (bid.price < 10000) IS TRUE AS $f3, SEARCH(bid.price, Sarg[[10000..1000000)]) IS TRUE AS $f4, (bid.price >= 1000000) IS TRUE AS $f5, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'HH:mm')), 1024) AS $f8, MOD(HASH_CODE(bid.bidder), 1024) AS $f9, MOD(HASH_CODE(bid.auction), 1024) AS $f10], where=[(event_type = 2)])
                        +- MiniBatchAssigner(interval=[2000ms], mode=[ProcTime])
                           +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
                              +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
                                 +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
@github-actions github-actions bot added this to the release-1.7 milestone Feb 4, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 4, 2024

Some notable phenomenon:

1. Mem table spill frequency.

4X’s mem table spill frequency is higher than 1X’s mem table spill frequency. It sounds unreasonable because 4X has 4 times more memory than 1X. The real memory utilization is about 31GB versus 7.1GB, which is even larger than 4 times.

4X:
SCR-20240203-1kb

1X:
SCR-20240203-1k7

2. Different compaction patterns

4X:
SCR-20240203-ii

1X:
SCR-20240203-in

3. Skewness

4X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706385538000&orgId=1&to=1706387341000&var-datasource=Prometheus: test-useast1-eks-a&var-namespace=nexmark-lt-4x-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240203-1j5

1X:
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706401160000&orgId=1&to=1706402963000&var-datasource=Prometheus: test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-weekly-20240127&editPanel=63
SCR-20240203-1j2

4. Distinct Agg Cache miss ratio is the same.

4X:
SCR-20240203-1pl

1X:
SCR-20240203-1pi

5. More than 5X the look-up throughput but no cache miss in the Agg operator? Incompatible with (4)

4X:
SCR-20240203-1rb

1X:
SCR-20240203-1r6

6. Different block cache miss rate

4X:
SCR-20240203-1tx

4X is worse, i.e. miss rate higher

1X:
SCR-20240203-1u0

@lmatz lmatz added help wanted Issues that need help from contributors type/perf labels Feb 4, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 22, 2024

try to run the q16_no_distinct, aka the same q16 but without the distinct keyword in count:

query:

CREATE SINK nexmark_q16_no_distinct AS
     SELECT channel,
            to_char(date_time, 'YYYY-MM-DD')                                          as "day",
            max(to_char(date_time, 'HH:mm'))                                          as "minute",
            count(*)                                                                  AS total_bids,
            count(*) filter (where price < 10000)                                     AS rank1_bids,
            count(*) filter (where price >= 10000 and price < 1000000)                AS rank2_bids,
            count(*) filter (where price >= 1000000)                                  AS rank3_bids,
            count(bidder)                                                    AS total_bidders,
            count(bidder) filter (where price < 10000)                       AS rank1_bidders,
            count(bidder) filter (where price >= 10000 and price < 1000000)  AS rank2_bidders,
            count(bidder) filter (where price >= 1000000)                    AS rank3_bidders,
            count(auction)                                                   AS total_auctions,
            count(auction) filter (where price < 10000)                      AS rank1_auctions,
            count(auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
            count(auction) filter (where price >= 1000000)                   AS rank3_auctions
     FROM bid
     GROUP BY to_char(date_time, 'YYYY-MM-DD'), channel
     WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

without 2-phase aggregation
plan:

 StreamSink { type: append-only, columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
 └─StreamProject { exprs: [$expr3, $expr2, max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count($expr6), count($expr6) filter(($expr5 < 10000:Int32)), count($expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count($expr6) filter(($expr5 >= 1000000:Int32)), count($expr7), count($expr7) filter(($expr5 < 10000:Int32)), count($expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count($expr7) filter(($expr5 >= 1000000:Int32))] }
   └─StreamHashAgg [append_only] { group_key: [$expr2, $expr3], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count($expr6), count($expr6) filter(($expr5 < 10000:Int32)), count($expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count($expr6) filter(($expr5 >= 1000000:Int32)), count($expr7), count($expr7) filter(($expr5 < 10000:Int32)), count($expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count($expr7) filter(($expr5 >= 1000000:Int32))] }
     └─StreamExchange { dist: HashShard($expr2, $expr3) }
       └─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 3:Int32) as $expr3, ToChar($expr1, 'HH:mm':Varchar) as $expr4, Field(bid, 2:Int32) as $expr5, Field(bid, 1:Int32) as $expr6, Field(bid, 0:Int32) as $expr7, _row_id] }
         └─StreamFilter { predicate: (event_type = 2:Int32) }
           └─StreamRowIdGen { row_id_index: 6 }
             └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
               └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                 └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(10 rows)

with 2-phase aggregation
plan:

 StreamSink { type: append-only, columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
 └─StreamProject { exprs: [$expr3, $expr2, max(max($expr4)), sum0(count), sum0(count filter(($expr5 < 10000:Int32))), sum0(count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))), sum0(count filter(($expr5 >= 1000000:Int32))), sum0(count($expr6)), sum0(count($expr6) filter(($expr5 < 10000:Int32))), sum0(count($expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))), sum0(count($expr6) filter(($expr5 >= 1000000:Int32))), sum0(count($expr7)), sum0(count($expr7) filter(($expr5 < 10000:Int32))), sum0(count($expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))), sum0(count($expr7) filter(($expr5 >= 1000000:Int32)))] }
   └─StreamHashAgg { group_key: [$expr2, $expr3], aggs: [max(max($expr4)), sum0(count), sum0(count filter(($expr5 < 10000:Int32))), sum0(count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))), sum0(count filter(($expr5 >= 1000000:Int32))), sum0(count($expr6)), sum0(count($expr6) filter(($expr5 < 10000:Int32))), sum0(count($expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))), sum0(count($expr6) filter(($expr5 >= 1000000:Int32))), sum0(count($expr7)), sum0(count($expr7) filter(($expr5 < 10000:Int32))), sum0(count($expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))), sum0(count($expr7) filter(($expr5 >= 1000000:Int32))), count] }
     └─StreamExchange { dist: HashShard($expr2, $expr3) }
       └─StreamHashAgg [append_only] { group_key: [$expr2, $expr3, $expr8], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count($expr6), count($expr6) filter(($expr5 < 10000:Int32)), count($expr6) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count($expr6) filter(($expr5 >= 1000000:Int32)), count($expr7), count($expr7) filter(($expr5 < 10000:Int32)), count($expr7) filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count($expr7) filter(($expr5 >= 1000000:Int32))] }
         └─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 3:Int32) as $expr3, ToChar($expr1, 'HH:mm':Varchar) as $expr4, Field(bid, 2:Int32) as $expr5, Field(bid, 1:Int32) as $expr6, Field(bid, 0:Int32) as $expr7, _row_id, Vnode(_row_id) as $expr8] }
           └─StreamFilter { predicate: (event_type = 2:Int32) }
             └─StreamRowIdGen { row_id_index: 6 }
               └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                 └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                   └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(11 rows)

RW Session Variable:

  1. https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L116-L117
  2. https://github.com/risingwavelabs/kube-bench/blob/main/manifests/nexmark/nexmark-sinks.template.yaml#L673

RW

4X:

metabase: http://metabase.risingwave-cloud.xyz/question/12618-nexmark-q16-no-distinct-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3095?start_date=2024-01-23

1X:

metabase: http://metabase.risingwave-cloud.xyz/question/12678-nexmark-q16-no-distinct-blackhole-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3093?start_date=2024-01-24

Flink

4X:

metabase: http://metabase.risingwave-cloud.xyz/question/13498-flink-nexmark-q16-no-distinct-flink-4x-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3053?start_date=2024-01-27

1X:

metabase: http://metabase.risingwave-cloud.xyz/question/13488-flink-nexmark-q16-no-distinct-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3052?start_date=2024-01-27

Both 1X and 4X, RW is not as good as Flink.
The gap between RW and Flink under 1X setting is about 15%.
The gap between RW and Flink under 4X setting is about 40%.

It is possible that increasing aggregation dirty heap size limit can improve the performance in theory, but as shown in #15251 (comment), increasing it does not improve much performance

We remark that q16 asks for very little memory, and there is no data block miss ops, aka no cache miss rate.

Therefore, it is weird that RW cannot out-perfom

@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Mar 18, 2024

Thanks to #15696, we are good enough right now, let's close it for now. We can re-open later if needed.

@lmatz lmatz closed this as completed Mar 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Issues that need help from contributors type/perf
Projects
None yet
Development

No branches or pull requests

2 participants