Skip to content

Commit

Permalink
fix(stream): stream hop window ignore the null time (#8146)
Browse files Browse the repository at this point in the history
fix #8130

Approved-By: liurenjie1024
  • Loading branch information
st1page committed Feb 23, 2023
1 parent 6222696 commit b39ae68
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 94 deletions.
11 changes: 7 additions & 4 deletions src/frontend/planner_test/tests/testdata/column_pruning.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,18 @@
logical_plan: |
LogicalProject { exprs: [t1.a, window_end] }
└─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] }
└─LogicalFilter { predicate: IsNotNull(t1.created_at) }
└─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.created_at, t1._row_id] }
optimized_logical_plan: |
LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] }
└─LogicalScan { table: t1, columns: [t1.a, t1.created_at] }
└─LogicalScan { table: t1, columns: [t1.a, t1.created_at], predicate: IsNotNull(t1.created_at) }
batch_plan: |
BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(t1.created_at) }
└─BatchScan { table: t1, columns: [t1.a, t1.created_at], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_end], pk_conflict: "no check" }
└─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.a, window_end, t1._row_id] }
└─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamFilter { predicate: IsNotNull(t1.created_at) }
└─StreamTableScan { table: t1, columns: [t1.a, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
18 changes: 11 additions & 7 deletions src/frontend/planner_test/tests/testdata/distribution_derive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -815,25 +815,29 @@
logical_plan: |
LogicalProject { exprs: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end] }
└─LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] }
└─LogicalFilter { predicate: IsNotNull(t1.created_at) }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id] }
optimized_logical_plan: |
LogicalHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at] }
└─LogicalScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], predicate: IsNotNull(t1.created_at) }
batch_plan: |
BatchHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: all }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(t1.created_at) }
└─BatchScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end], pk_conflict: "no check" }
└─StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] }
└─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamFilter { predicate: IsNotNull(t1.created_at) }
└─StreamTableScan { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id(hidden)], pk_columns: [t1._row_id, window_start, window_end], pk_conflict: "no check" }
materialized table: 4294967294
StreamHopWindow { time_col: t1.created_at, slide: 00:15:00, size: 00:30:00, output: [t1.row_id, t1.uid, t1.v, t1.created_at, window_start, window_end, t1._row_id] }
Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
StreamFilter { predicate: IsNotNull(t1.created_at) }
Chain { table: t1, columns: [t1.row_id, t1.uid, t1.v, t1.created_at, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
Table 4294967294 { columns: [row_id, uid, v, created_at, window_start, window_end, t1._row_id], primary key: [$6 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [6] }
26 changes: 16 additions & 10 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,16 @@
| └─BatchHashAgg { group_key: [window_start, bid.auction], aggs: [count] }
| └─BatchHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start] }
| └─BatchExchange { order: [], dist: HashShard(bid.auction) }
| └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
| └─BatchFilter { predicate: IsNotNull(bid.date_time) }
| └─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
└─BatchProject { exprs: [max(count), window_start] }
└─BatchHashAgg { group_key: [window_start], aggs: [max(count)] }
└─BatchExchange { order: [], dist: HashShard(window_start) }
└─BatchHashAgg { group_key: [bid.auction, window_start], aggs: [count] }
└─BatchHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start] }
└─BatchExchange { order: [], dist: HashShard(bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
└─BatchFilter { predicate: IsNotNull(bid.date_time) }
└─BatchScan { table: bid, columns: [bid.auction, bid.date_time], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1], pk_conflict: "no check" }
└─StreamProject { exprs: [bid.auction, count, window_start, window_start] }
Expand All @@ -297,15 +299,17 @@
| └─StreamAppendOnlyHashAgg { group_key: [window_start, bid.auction], aggs: [count, count] }
| └─StreamExchange { dist: HashShard(bid.auction, window_start) }
| └─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
| └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
| └─StreamFilter { predicate: IsNotNull(bid.date_time) }
| └─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamProject { exprs: [max(count), window_start] }
└─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
└─StreamExchange { dist: HashShard(window_start) }
└─StreamProject { exprs: [bid.auction, window_start, count] }
└─StreamAppendOnlyHashAgg { group_key: [bid.auction, window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(bid.auction, window_start) }
└─StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamFilter { predicate: IsNotNull(bid.date_time) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1], pk_conflict: "no check" }
Expand All @@ -328,9 +332,10 @@
Fragment 2
StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
StreamFilter { predicate: IsNotNull(bid.date_time) }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
Fragment 3
StreamProject { exprs: [bid.auction, window_start, count] }
Expand All @@ -340,9 +345,10 @@
Fragment 4
StreamHopWindow { time_col: bid.date_time, slide: 00:00:02, size: 00:00:10, output: [bid.auction, window_start, bid._row_id] }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
StreamFilter { predicate: IsNotNull(bid.date_time) }
Chain { table: bid, columns: [bid.auction, bid.date_time, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Upstream
BatchPlanNode
Table 0 { columns: [bid_auction, count, window_start], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [2] }
Table 1 { columns: [window_start, bid_auction, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Expand Down
53 changes: 32 additions & 21 deletions src/frontend/planner_test/tests/testdata/nexmark_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,20 @@
| └─BatchHashAgg { group_key: [window_start, auction], aggs: [count] }
| └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] }
| └─BatchExchange { order: [], dist: HashShard(auction) }
| └─BatchProject { exprs: [auction, date_time] }
| └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
| └─BatchFilter { predicate: IsNotNull(date_time) }
| └─BatchProject { exprs: [auction, date_time] }
| └─BatchFilter { predicate: IsNotNull(date_time) }
| └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
└─BatchProject { exprs: [max(count), window_start] }
└─BatchHashAgg { group_key: [window_start], aggs: [max(count)] }
└─BatchExchange { order: [], dist: HashShard(window_start) }
└─BatchHashAgg { group_key: [auction, window_start], aggs: [count] }
└─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] }
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchProject { exprs: [auction, date_time] }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
└─BatchFilter { predicate: IsNotNull(date_time) }
└─BatchProject { exprs: [auction, date_time] }
└─BatchFilter { predicate: IsNotNull(date_time) }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1], pk_conflict: "no check" }
└─StreamProject { exprs: [auction, count, window_start, window_start] }
Expand All @@ -349,23 +353,27 @@
| └─StreamAppendOnlyHashAgg { group_key: [window_start, auction], aggs: [count, count] }
| └─StreamExchange { dist: HashShard(auction, window_start) }
| └─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamShare { id = 1066 }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamRowIdGen { row_id_index: 7 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
| └─StreamFilter { predicate: IsNotNull(date_time) }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamShare { id = 1313 }
| └─StreamProject { exprs: [auction, date_time, _row_id] }
| └─StreamFilter { predicate: IsNotNull(date_time) }
| └─StreamRowIdGen { row_id_index: 7 }
| └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
└─StreamProject { exprs: [max(count), window_start] }
└─StreamHashAgg { group_key: [window_start], aggs: [count, max(count)] }
└─StreamExchange { dist: HashShard(window_start) }
└─StreamProject { exprs: [auction, window_start, count] }
└─StreamAppendOnlyHashAgg { group_key: [auction, window_start], aggs: [count, count] }
└─StreamExchange { dist: HashShard(auction, window_start) }
└─StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamShare { id = 1066 }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
└─StreamFilter { predicate: IsNotNull(date_time) }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamShare { id = 1313 }
└─StreamProject { exprs: [auction, date_time, _row_id] }
└─StreamFilter { predicate: IsNotNull(date_time) }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1], pk_conflict: "no check" }
Expand All @@ -388,14 +396,16 @@
Fragment 2
StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
StreamProject { exprs: [auction, date_time, _row_id] }
StreamExchange Hash([2]) from 3
StreamFilter { predicate: IsNotNull(date_time) }
StreamProject { exprs: [auction, date_time, _row_id] }
StreamExchange Hash([2]) from 3
Fragment 3
StreamProject { exprs: [auction, date_time, _row_id] }
StreamRowIdGen { row_id_index: 7 }
StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
source state table: 5
StreamFilter { predicate: IsNotNull(date_time) }
StreamRowIdGen { row_id_index: 7 }
StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
source state table: 5
Fragment 4
StreamProject { exprs: [auction, window_start, count] }
Expand All @@ -405,8 +415,9 @@
Fragment 5
StreamHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start, _row_id] }
StreamProject { exprs: [auction, date_time, _row_id] }
StreamExchange Hash([2]) from 3
StreamFilter { predicate: IsNotNull(date_time) }
StreamProject { exprs: [auction, date_time, _row_id] }
StreamExchange Hash([2]) from 3
Table 0 { columns: [auction, count, window_start], primary key: [$2 ASC, $0 ASC], value indices: [0, 1, 2], distribution key: [2] }
Table 1 { columns: [window_start, auction, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Expand Down

0 comments on commit b39ae68

Please sign in to comment.