Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: nexmark q7 become slower and slower #7244

Open
Tracked by #6640
fuyufjh opened this issue Jan 6, 2023 · 14 comments
Open
Tracked by #6640

perf: nexmark q7 become slower and slower #7244

fuyufjh opened this issue Jan 6, 2023 · 14 comments
Assignees
Labels
type/bug Something isn't working type/perf

Comments

@fuyufjh
Copy link
Member

fuyufjh commented Jan 6, 2023

image

The throughput goes down from ~100K rows/s to ~3k rows/s. Still investigating.

@fuyufjh fuyufjh added type/bug Something isn't working type/perf labels Jan 6, 2023
@fuyufjh fuyufjh self-assigned this Jan 6, 2023
@github-actions github-actions bot added this to the release-0.1.16 milestone Jan 6, 2023
@fuyufjh

This comment was marked as outdated.

@fuyufjh fuyufjh changed the title perf: nexmark q4/q10 become slower and slower perf: nexmark q4/q7 become slower and slower Jan 6, 2023
@hzxa21
Copy link
Collaborator

hzxa21 commented Jan 6, 2023

Previously I suspected it was caused by incresing join state, but it's not. Interestingly, the aggregation cached key becomes lower. This is unreasonable because the aggregations (including a append-only max) should be all constant-sized

image

Is it possible that the increasing join state causes excessive eviction on the agg operator cache?

@chenzl25
Copy link
Contributor

chenzl25 commented Jan 6, 2023

I took a look at nexmark q4. It seems it could be optimized by GroupJoin which can merge the Agg state and Join state. I can do some research on GroupJoin if it is helpful for this case.

SELECT
        Q.category,
        AVG(Q.final) as avg
    FROM (
        SELECT MAX(B.price) AS final, A.category
        FROM auction A, bid B
        WHERE A.id = B.auction AND B.date_time BETWEEN A.date_time AND A.expires
        GROUP BY A.id, A.category
    ) Q
    GROUP BY Q.category;

@fuyufjh
Copy link
Member Author

fuyufjh commented Jan 6, 2023

I guess the problem is in Q7, not Q4. Because Q7 contains such a join condition:

) B1 ON B.price = B1.maxprice
WHERE
B.date_time BETWEEN B1.date_time - INTERVAL '10' SECOND
AND B1.date_time;

This seems to be inefficient without interval join (risingwavelabs/rfcs#32).

I'll do futher investigate and post results here later.

@chenzl25
Copy link
Contributor

chenzl25 commented Jan 6, 2023

I guess the problem is in Q7, not Q4. Because Q7 contains such a join condition:

) B1 ON B.price = B1.maxprice
WHERE
B.date_time BETWEEN B1.date_time - INTERVAL '10' SECOND
AND B1.date_time;

This seems to be inefficient without interval join (risingwavelabs/rfcs#32).

I'll do futher investigate and post results here later.

Yes, q7 looks like a perfect case for interval/band join.

@fuyufjh
Copy link
Member Author

fuyufjh commented Jan 9, 2023

I ran the Q4 and everything looks fine.

image

So the problem is in Q7 as mentioned in #7244 (comment). I'll close this issue now. @chenzl25 Please help to check Q7 after interval/band join is completed.

@fuyufjh fuyufjh closed this as completed Jan 9, 2023
@fuyufjh fuyufjh changed the title perf: nexmark q4/q7 become slower and slower perf: nexmark q7 become slower and slower Jan 9, 2023
@fuyufjh
Copy link
Member Author

fuyufjh commented Jan 9, 2023

Well let's leave it for tracking that 🤣

@st1page
Copy link
Contributor

st1page commented Jan 9, 2023

please allow me to clarify first that I am just giving an optimization method for the special case in the Nexmark q7 and I think the interval/band join is necessary anyhow.
Let's take a look at the q7's SQL again.

SELECT
    B.auction,
    B.price,
    B.bidder,
    B.date_time
from
    bid B
    JOIN (
        SELECT
            MAX(price) AS maxprice,
            window_end as date_time
        FROM
            TUMBLE(bid, date_time, INTERVAL '10' SECOND)
        GROUP BY
            window_end
    ) B1 ON B.price = B1.maxprice
WHERE
    B.date_time BETWEEN B1.date_time - INTERVAL '10' SECOND
    AND B1.date_time;

And we can find that the SQL writer just racks his brains to get the records that have the highest bid price in each tumble window. So we can easily express it as a GroupTopN query.

    SELECT
      B.auction,
      B.price,
      B.bidder,
      B.date_time
    FROM (
      SELECT
        auction,
        price,
        bidder,
        date_time,
        /*use rank here to express top-N with ties*/
        rank() over (partition by window_end order by price desc) as price_rank
      FROM
        TUMBLE(bid, date_time, INTERVAL '10' SECOND)
    ) B
    WHERE price_rank <=1;

And the plan looks much better

    StreamMaterialize { columns: [auction, price, bidder, date_time, bid._row_id(hidden)], pk_columns: [bid._row_id] }
    └─StreamExchange { dist: HashShard(bid._row_id) }
      └─StreamProject { exprs: [bid.auction, bid.price, bid.bidder, bid.date_time, bid._row_id] }
        └─StreamGroupTopN { order: "[bid.price DESC]", limit: 1, offset: 0, group_key: [9], with_ties: true }
          └─StreamExchange { dist: HashShard((TumbleStart(bid.date_time, '00:00:10':Interval) + '00:00:10':Interval)) }
            └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id, TumbleStart(bid.date_time, '00:00:10':Interval), (TumbleStart(bid.date_time, '00:00:10':Interval) + '00:00:10':Interval)] }
              └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }

I think the two queries are equivalent and check the batch query with our e2e test data. It gives the correct result.
But unfortunately, when I try to process it in the streaming execution, the Stream groupTopN executor panic #7276.

Are they equivalent? I know it is a little tricky but can we rewrite the pattern to the group topn in optimizer? 🤔
c.c. @chenzl25 @fuyufjh @liurenjie1024

@chenzl25
Copy link
Contributor

Are they equivalent? I know it is a little tricky but can we rewrite the pattern to the group topn in optimizer?

I think these two queries are equivalent, but it seems not general enough and a little bit hard to rewrite this query. Let's see the performance comparison. If the performance improvement is large enough we can try to implement this rewrite rule.

@BugenZhao
Copy link
Member

FYI, the execution plan in Flink:

Calc(select=[auction, price, bidder, date_time])
+- Join(joinType=[InnerJoin], where=[((price = maxprice) AND (date_time >= (date_time0 - 10000:INTERVAL SECOND)) AND (date_time <= date_time0))], select=[auction, bidder, price, date_time, maxprice, date_time0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
   :- Exchange(distribution=[hash[price]])
   :  +- Calc(select=[auction, bidder, price, CAST(date_time AS TIMESTAMP(3)) AS date_time])
   :     +- TableSourceScan(table=[[bz_table_store_catalog, default, bid, project=[auction, bidder, price, date_time], watermark=[-(date_time, 20000:INTERVAL SECOND)]]], fields=[auction, bidder, price, date_time])
   +- Exchange(distribution=[hash[maxprice]])
      +- Calc(select=[maxprice, date_time])
         +- GroupAggregate(groupBy=[date_time], select=[date_time, MAX(price) AS maxprice])
            +- Exchange(distribution=[hash[date_time]])
               +- Calc(select=[window_end AS date_time, price])
                  +- WindowTableFunction(window=[TUMBLE(time_col=[date_time], size=[10 s])])
                     +- TableSourceScan(table=[[bz_table_store_catalog, default, bid, project=[price, date_time], watermark=[-(date_time, 20000:INTERVAL SECOND)]]], fields=[price, date_time])

Seems no magic other than interval joins. 🥵

@liurenjie1024
Copy link
Contributor

Yes, they are equal, but if we want to compare performance with flink, I think we should run both rewritten query with two engines.

@lmatz
Copy link
Contributor

lmatz commented Feb 15, 2023

SCR-20230215-i7a
cc: @st1page http://risingwave-perf-test-dashboard-metabase.us-west-2.elasticbeanstalk.com/dashboard/63-nexmark-history-avg-source-output-rows-per-second-medium-1cn

@lmatz
Copy link
Contributor

lmatz commented Mar 15, 2023

FYI, the execution plan in Flink:

== Optimized Physical Plan ==
Calc(select=[auction, price, bidder, dateTime, extra])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=0, leftTimeIndex=3, rightTimeIndex=1], where=[AND(=(price, maxprice), >=(dateTime, -(dateTime0, 10000:INTERVAL SECOND)), <=(dateTime, dateTime0))], select=[auction, bidder, price, dateTime, extra, maxprice, dateTime0])
   :- Exchange(distribution=[hash[price]])
   :  +- Calc(select=[bid.auction AS auction, bid.bidder AS bidder, bid.price AS price, dateTime, bid.extra AS extra], where=[=(event_type, 2)])
   :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
   :        +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
   :           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
   +- Exchange(distribution=[hash[maxprice]])
      +- Calc(select=[maxprice, w$rowtime AS dateTime])
         +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[MAX($f1) AS maxprice, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
            +- Exchange(distribution=[single])
               +- Calc(select=[dateTime, bid.price AS $f1], where=[=(event_type, 2)])
                  +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                     +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                        +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Calc(select=[auction, price, bidder, dateTime, extra])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=0, leftTimeIndex=3, rightTimeIndex=1], where=[((price = maxprice) AND (dateTime >= (dateTime0 - 10000:INTERVAL SECOND)) AND (dateTime <= dateTime0))], select=[auction, bidder, price, dateTime, extra, maxprice, dateTime0])
   :- Exchange(distribution=[hash[price]])
   :  +- Calc(select=[bid.auction AS auction, bid.bidder AS bidder, bid.price AS price, dateTime, bid.extra AS extra], where=[(event_type = 2)])
   :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])(reuse_id=[1])
   :        +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
   :           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
   +- Exchange(distribution=[hash[maxprice]])
      +- Calc(select=[maxprice, w$rowtime AS dateTime])
         +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[MAX($f1) AS maxprice, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
            +- Exchange(distribution=[single])
               +- Calc(select=[dateTime, bid.price AS $f1], where=[(event_type = 2)])
                  +- Reused(reference_id=[1])

But according to the new benchmark result of Flink, kind of suspect if the interval join is the right choice here.

Maybe we can force Flink to use normal hash join and test once.

@fuyufjh fuyufjh modified the milestones: release-0.18, release-0.19 Mar 22, 2023
@lmatz
Copy link
Contributor

lmatz commented Apr 12, 2023

FYI, the execution plan in Flink:

Calc(select=[auction, price, bidder, date_time])
+- Join(joinType=[InnerJoin], where=[((price = maxprice) AND (date_time >= (date_time0 - 10000:INTERVAL SECOND)) AND (date_time <= date_time0))], select=[auction, bidder, price, date_time, maxprice, date_time0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
   :- Exchange(distribution=[hash[price]])
   :  +- Calc(select=[auction, bidder, price, CAST(date_time AS TIMESTAMP(3)) AS date_time])
   :     +- TableSourceScan(table=[[bz_table_store_catalog, default, bid, project=[auction, bidder, price, date_time], watermark=[-(date_time, 20000:INTERVAL SECOND)]]], fields=[auction, bidder, price, date_time])
   +- Exchange(distribution=[hash[maxprice]])
      +- Calc(select=[maxprice, date_time])
         +- GroupAggregate(groupBy=[date_time], select=[date_time, MAX(price) AS maxprice])
            +- Exchange(distribution=[hash[date_time]])
               +- Calc(select=[window_end AS date_time, price])
                  +- WindowTableFunction(window=[TUMBLE(time_col=[date_time], size=[10 s])])
                     +- TableSourceScan(table=[[bz_table_store_catalog, default, bid, project=[price, date_time], watermark=[-(date_time, 20000:INTERVAL SECOND)]]], fields=[price, date_time])

Seems no magic other than interval joins. 🥵

wait a minute, this one is hash inner join, why #7244 (comment) shows interval join?

Because it is a logical plan?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working type/perf
Projects
None yet
Development

No branches or pull requests

7 participants