From aea2da04f042d5c7c7dafec7f41d666d359c7452 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 5 Jul 2023 02:24:03 -0700 Subject: [PATCH] feat(agg,over_window): support `first_value`, `last_value` and refactor LogicalAgg (#10740) Signed-off-by: Richard Chien --- .../batch/aggregate/first_last_value.slt.part | 21 ++ e2e_test/streaming/basic_agg.slt | 26 ++ proto/expr.proto | 1 + src/common/src/util/sort_util.rs | 24 ++ src/common/src/util/stream_graph_visitor.rs | 28 +- src/expr/src/agg/def.rs | 3 + src/expr/src/agg/general.rs | 7 +- .../tests/testdata/input/agg.yaml | 25 +- .../tests/testdata/output/agg.yaml | 34 ++ .../tests/testdata/output/nexmark.yaml | 33 +- .../tests/testdata/output/nexmark_source.yaml | 39 +- .../testdata/output/nexmark_watermark.yaml | 57 +-- .../testdata/output/stream_dist_agg.yaml | 60 +++- src/frontend/src/expr/agg_call.rs | 4 +- .../src/optimizer/plan_node/generic/agg.rs | 338 ++++++++++++------ .../src/optimizer/plan_node/logical_agg.rs | 59 ++- .../src/optimizer/rule/distinct_agg_rule.rs | 1 + src/stream/src/executor/aggregation/minput.rs | 13 +- src/tests/sqlsmith/src/sql_gen/agg.rs | 15 +- 19 files changed, 546 insertions(+), 242 deletions(-) create mode 100644 e2e_test/batch/aggregate/first_last_value.slt.part diff --git a/e2e_test/batch/aggregate/first_last_value.slt.part b/e2e_test/batch/aggregate/first_last_value.slt.part new file mode 100644 index 000000000000..f649fd408fad --- /dev/null +++ b/e2e_test/batch/aggregate/first_last_value.slt.part @@ -0,0 +1,21 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t(v1 int, v2 int); + +statement ok +insert into t values (1, 2), (5, null), (4, 0); + +query iiii +select + first_value(v1 order by v1) + , first_value(v1 order by v2 desc) + , last_value(v1 order by v1) + , last_value(v1 order by v2 asc nulls first) +from t; +---- +1 5 5 1 + +statement ok +drop table t; diff --git a/e2e_test/streaming/basic_agg.slt b/e2e_test/streaming/basic_agg.slt index 24167237b009..7a00df5f1233 100644 --- a/e2e_test/streaming/basic_agg.slt +++ b/e2e_test/streaming/basic_agg.slt @@ -99,3 +99,29 @@ drop materialized view v2; statement ok drop table t1; + +statement ok +create table t(v1 int, v2 int); + +statement ok +create materialized view mv as +select + first_value(v1 order by v1) as o1 + , first_value(v1 order by v2 desc) as o2 + , last_value(v1 order by v1) as o3 + , last_value(v1 order by v2 asc nulls first) as o4 +from t; + +statement ok +insert into t values (1, 2), (5, null), (4, 0); + +query iiii +select * from mv; +---- +1 5 5 1 + +statement ok +drop materialized view mv; + +statement ok +drop table t; diff --git a/proto/expr.proto b/proto/expr.proto index bebc59e48d55..62d5be91c679 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -317,6 +317,7 @@ message AggCall { PERCENTILE_CONT = 22; PERCENTILE_DISC = 23; MODE = 24; + LAST_VALUE = 25; } Type type = 1; repeated InputRef args = 2; diff --git a/src/common/src/util/sort_util.rs b/src/common/src/util/sort_util.rs index f3b52beb7edc..2d5969c58e4d 100644 --- a/src/common/src/util/sort_util.rs +++ b/src/common/src/util/sort_util.rs @@ -55,6 +55,15 @@ impl Direction { } } +impl Direction { + fn reverse(self) -> Self { + match self { + Self::Ascending => Self::Descending, + Self::Descending => Self::Ascending, + } + } +} + /// Nulls are largest/smallest. #[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Display, Default)] enum NullsAre { @@ -201,6 +210,10 @@ impl OrderType { pub fn nulls_are_last(&self) -> bool { !self.nulls_are_first() } + + pub fn reverse(self) -> Self { + Self::new(self.direction.reverse(), self.nulls_are) + } } impl fmt::Display for OrderType { @@ -590,6 +603,17 @@ mod tests { assert!(OrderType::descending_nulls_last().is_descending()); assert!(OrderType::descending_nulls_last().nulls_are_smallest()); assert!(OrderType::descending_nulls_last().nulls_are_last()); + + assert_eq!(OrderType::ascending().reverse(), OrderType::descending()); + assert_eq!(OrderType::descending().reverse(), OrderType::ascending()); + assert_eq!( + OrderType::ascending_nulls_first().reverse(), + OrderType::descending_nulls_last() + ); + assert_eq!( + OrderType::ascending_nulls_last().reverse(), + OrderType::descending_nulls_first() + ); } #[test] diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index a81828da5d78..8623ee8edeed 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -97,11 +97,15 @@ fn visit_stream_node_tables_inner( NodeBody::HashAgg(node) => { assert_eq!(node.agg_call_states.len(), node.agg_calls.len()); always!(node.result_table, "HashAggResult"); - for state in &mut node.agg_call_states { - if let agg_call_state::Inner::MaterializedInputState(s) = - state.inner.as_mut().unwrap() - { - always!(s.table, "HashAgg"); + for (call_idx, state) in node.agg_call_states.iter_mut().enumerate() { + match state.inner.as_mut().unwrap() { + agg_call_state::Inner::ResultValueState(_) => {} + agg_call_state::Inner::TableState(s) => { + always!(s.table, &format!("HashAggCall{}", call_idx)); + } + agg_call_state::Inner::MaterializedInputState(s) => { + always!(s.table, &format!("HashAggCall{}", call_idx)); + } } } for (distinct_col, dedup_table) in &mut node.distinct_dedup_tables { @@ -111,11 +115,15 @@ fn visit_stream_node_tables_inner( NodeBody::SimpleAgg(node) => { assert_eq!(node.agg_call_states.len(), node.agg_calls.len()); always!(node.result_table, "SimpleAggResult"); - for state in &mut node.agg_call_states { - if let agg_call_state::Inner::MaterializedInputState(s) = - state.inner.as_mut().unwrap() - { - always!(s.table, "SimpleAgg"); + for (call_idx, state) in node.agg_call_states.iter_mut().enumerate() { + match state.inner.as_mut().unwrap() { + agg_call_state::Inner::ResultValueState(_) => {} + agg_call_state::Inner::TableState(s) => { + always!(s.table, &format!("SimpleAggCall{}", call_idx)); + } + agg_call_state::Inner::MaterializedInputState(s) => { + always!(s.table, &format!("SimpleAggCall{}", call_idx)); + } } } for (distinct_col, dedup_table) in &mut node.distinct_dedup_tables { diff --git a/src/expr/src/agg/def.rs b/src/expr/src/agg/def.rs index 7ab85be93100..963bd0ec5221 100644 --- a/src/expr/src/agg/def.rs +++ b/src/expr/src/agg/def.rs @@ -228,6 +228,7 @@ pub enum AggKind { JsonbAgg, JsonbObjectAgg, FirstValue, + LastValue, VarPop, VarSamp, StddevPop, @@ -257,6 +258,7 @@ impl AggKind { PbType::JsonbAgg => Ok(AggKind::JsonbAgg), PbType::JsonbObjectAgg => Ok(AggKind::JsonbObjectAgg), PbType::FirstValue => Ok(AggKind::FirstValue), + PbType::LastValue => Ok(AggKind::LastValue), PbType::StddevPop => Ok(AggKind::StddevPop), PbType::StddevSamp => Ok(AggKind::StddevSamp), PbType::VarPop => Ok(AggKind::VarPop), @@ -287,6 +289,7 @@ impl AggKind { Self::JsonbAgg => PbType::JsonbAgg, Self::JsonbObjectAgg => PbType::JsonbObjectAgg, Self::FirstValue => PbType::FirstValue, + Self::LastValue => PbType::LastValue, Self::StddevPop => PbType::StddevPop, Self::StddevSamp => PbType::StddevSamp, Self::VarPop => PbType::VarPop, diff --git a/src/expr/src/agg/general.rs b/src/expr/src/agg/general.rs index 0da9268cacf2..db057101a93d 100644 --- a/src/expr/src/agg/general.rs +++ b/src/expr/src/agg/general.rs @@ -74,10 +74,15 @@ where } #[aggregate("first_value(*) -> auto")] -fn first(state: T, _: T) -> T { +fn first_value(state: T, _: T) -> T { state } +#[aggregate("last_value(*) -> auto")] +fn last_value(_: T, input: T) -> T { + input +} + /// Note the following corner cases: /// /// ```slt diff --git a/src/frontend/planner_test/tests/testdata/input/agg.yaml b/src/frontend/planner_test/tests/testdata/input/agg.yaml index aed77e5b7b46..01ee1785181a 100644 --- a/src/frontend/planner_test/tests/testdata/input/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/input/agg.yaml @@ -862,4 +862,27 @@ create table t (x int, y varchar); select mode(1) within group (order by y desc) from t; expected_outputs: - - binder_error \ No newline at end of file + - binder_error + +- sql: | + create table t (x int, y int); + select first_value(x) from t; + expected_outputs: + - planner_error +- sql: | + create table t (x int, y int); + select last_value(x) from t; + expected_outputs: + - planner_error +- sql: | + create table t (x int, y int); + select first_value(x order by y asc) from t; + expected_outputs: + - batch_plan + - stream_plan +- sql: | + create table t (x int, y int); + select last_value(x order by y desc nulls last) from t; + expected_outputs: + - batch_plan + - stream_plan diff --git a/src/frontend/planner_test/tests/testdata/output/agg.yaml b/src/frontend/planner_test/tests/testdata/output/agg.yaml index 50826ff589bc..81f693b28d1f 100644 --- a/src/frontend/planner_test/tests/testdata/output/agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/agg.yaml @@ -1508,3 +1508,37 @@ Caused by: Invalid input syntax: no arguments are expected in mode agg +- sql: | + create table t (x int, y int); + select first_value(x) from t; + planner_error: 'Invalid input syntax: Aggregation function first_value requires ORDER BY clause' +- sql: | + create table t (x int, y int); + select last_value(x) from t; + planner_error: 'Invalid input syntax: Aggregation function last_value requires ORDER BY clause' +- sql: | + create table t (x int, y int); + select first_value(x order by y asc) from t; + batch_plan: |- + BatchSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC))] } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] } + └─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] } + └─StreamExchange { dist: Single } + └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- sql: | + create table t (x int, y int); + select last_value(x order by y desc nulls last) from t; + batch_plan: |- + BatchSimpleAgg { aggs: [last_value(t.x order_by(t.y DESC NULLS LAST))] } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [last_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck } + └─StreamProject { exprs: [last_value(t.x order_by(t.y DESC NULLS LAST))] } + └─StreamSimpleAgg { aggs: [last_value(t.x order_by(t.y DESC NULLS LAST)), count] } + └─StreamExchange { dist: Single } + └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 85baa6bd2e4a..049dffb4ac2e 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1651,12 +1651,13 @@ │ │ └─BatchScan { table: auction, columns: [auction.id, auction.item_name], distribution: UpstreamHashShard(auction.id) } │ └─BatchExchange { order: [], dist: HashShard(bid.auction) } │ └─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard } - └─BatchProject { exprs: [(sum0(count) / count(bid.auction)) as $expr1] } - └─BatchSimpleAgg { aggs: [sum0(count), count(bid.auction)] } + └─BatchProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } + └─BatchSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction))] } └─BatchExchange { order: [], dist: Single } - └─BatchHashAgg { group_key: [bid.auction], aggs: [count] } - └─BatchExchange { order: [], dist: HashShard(bid.auction) } - └─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard } + └─BatchSimpleAgg { aggs: [sum0(count), count(bid.auction)] } + └─BatchHashAgg { group_key: [bid.auction], aggs: [count] } + └─BatchExchange { order: [], dist: HashShard(bid.auction) } + └─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } └─StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, auction.item_name, count(bid.auction)] } @@ -1668,12 +1669,13 @@ │ └─StreamExchange { dist: HashShard(bid.auction) } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(count) / count(bid.auction)) as $expr1] } - └─StreamSimpleAgg { aggs: [sum0(count), count(bid.auction), count] } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } + └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } └─StreamExchange { dist: Single } - └─StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } - └─StreamExchange { dist: HashShard(bid.auction) } - └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + └─StreamStatelessSimpleAgg { aggs: [sum0(count), count(bid.auction)] } + └─StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } + └─StreamExchange { dist: HashShard(bid.auction) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } @@ -1697,13 +1699,14 @@ └── BatchPlanNode Fragment 3 - StreamProject { exprs: [(sum0(count) / count(bid.auction)) as $expr1] } - └── StreamSimpleAgg { aggs: [sum0(count), count(bid.auction), count] } { result table: 9, state tables: [], distinct tables: [] } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] } + └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } { result table: 9, state tables: [], distinct tables: [] } └── StreamExchange Single from 4 Fragment 4 - StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] } - └── StreamExchange Hash([0]) from 5 + StreamStatelessSimpleAgg { aggs: [sum0(count), count(bid.auction)] } + └── StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] } + └── StreamExchange Hash([0]) from 5 Fragment 5 Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 11 } @@ -1744,7 +1747,7 @@ ├── read pk prefix len hint: 1 └── vnode column idx: 0 - Table 9 { columns: [ sum0(count), count(bid_auction), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } + Table 9 { columns: [ sum0(sum0(count)), sum0(count(bid_auction)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } Table 10 { columns: [ bid_auction, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml index 34c300a9b46b..0607897adce7 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml @@ -1629,12 +1629,13 @@ │ │ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) } │ └─BatchExchange { order: [], dist: HashShard(auction) } │ └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) } - └─BatchProject { exprs: [(sum0(count) / count(auction)) as $expr1] } - └─BatchSimpleAgg { aggs: [sum0(count), count(auction)] } + └─BatchProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] } + └─BatchSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction))] } └─BatchExchange { order: [], dist: Single } - └─BatchHashAgg { group_key: [auction], aggs: [count] } - └─BatchExchange { order: [], dist: HashShard(auction) } - └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) } + └─BatchSimpleAgg { aggs: [sum0(count), count(auction)] } + └─BatchHashAgg { group_key: [auction], aggs: [count] } + └─BatchExchange { order: [], dist: HashShard(auction) } + └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) } stream_plan: |- StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } └─StreamDynamicFilter { predicate: (count(auction) >= $expr1), output: [id, item_name, count(auction)] } @@ -1650,15 +1651,16 @@ │ └─StreamRowIdGen { row_id_index: 7 } │ └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(count) / count(auction)) as $expr1] } - └─StreamSimpleAgg { aggs: [sum0(count), count(auction), count] } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] } + └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] } └─StreamExchange { dist: Single } - └─StreamHashAgg [append_only] { group_key: [auction], aggs: [count] } - └─StreamExchange { dist: HashShard(auction) } - └─StreamShare { id: 5 } - └─StreamProject { exprs: [auction, _row_id] } - └─StreamRowIdGen { row_id_index: 7 } - └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } + └─StreamStatelessSimpleAgg { aggs: [sum0(count), count(auction)] } + └─StreamHashAgg [append_only] { group_key: [auction], aggs: [count] } + └─StreamExchange { dist: HashShard(auction) } + └─StreamShare { id: 5 } + └─StreamProject { exprs: [auction, _row_id] } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } @@ -1689,13 +1691,14 @@ └── StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } { source state table: 8 } Fragment 4 - StreamProject { exprs: [(sum0(count) / count(auction)) as $expr1] } - └── StreamSimpleAgg { aggs: [sum0(count), count(auction), count] } { result table: 9, state tables: [], distinct tables: [] } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] } + └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] } { result table: 9, state tables: [], distinct tables: [] } └── StreamExchange Single from 5 Fragment 5 - StreamHashAgg [append_only] { group_key: [auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] } - └── StreamExchange Hash([0]) from 6 + StreamStatelessSimpleAgg { aggs: [sum0(count), count(auction)] } + └── StreamHashAgg [append_only] { group_key: [auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] } + └── StreamExchange Hash([0]) from 6 Fragment 6 StreamNoOp @@ -1724,7 +1727,7 @@ Table 8 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 } - Table 9 { columns: [ sum0(count), count(auction), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } + Table 9 { columns: [ sum0(sum0(count)), sum0(count(auction)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } Table 10 { columns: [ auction, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml index e8d83f8a9f6f..9bb64cb8d5cb 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml @@ -1785,15 +1785,16 @@ │ └─BatchFilter { predicate: (event_type = 2:Int32) } │ └─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr4, _row_id] } │ └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) } - └─BatchProject { exprs: [(sum0(count) / count($expr7)) as $expr8] } - └─BatchSimpleAgg { aggs: [sum0(count), count($expr7)] } + └─BatchProject { exprs: [(sum0(sum0(count)) / sum0(count($expr7))) as $expr8] } + └─BatchSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr7))] } └─BatchExchange { order: [], dist: Single } - └─BatchHashAgg { group_key: [$expr7], aggs: [count] } - └─BatchExchange { order: [], dist: HashShard($expr7) } - └─BatchProject { exprs: [Field(bid, 0:Int32) as $expr7] } - └─BatchFilter { predicate: (event_type = 2:Int32) } - └─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr6, _row_id] } - └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) } + └─BatchSimpleAgg { aggs: [sum0(count), count($expr7)] } + └─BatchHashAgg { group_key: [$expr7], aggs: [count] } + └─BatchExchange { order: [], dist: HashShard($expr7) } + └─BatchProject { exprs: [Field(bid, 0:Int32) as $expr7] } + └─BatchFilter { predicate: (event_type = 2:Int32) } + └─BatchProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr6, _row_id] } + └─BatchSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id], filter: (None, None) } stream_plan: |- StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } └─StreamDynamicFilter { predicate: (count($expr4) >= $expr5), output: [$expr2, $expr3, count($expr4)] } @@ -1822,21 +1823,22 @@ │ └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id], output_watermarks: [_row_id] } │ └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum0(count) / count($expr4)) as $expr5] } - └─StreamSimpleAgg { aggs: [sum0(count), count($expr4), count] } + └─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] } + └─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] } └─StreamExchange { dist: Single } - └─StreamHashAgg [append_only] { group_key: [$expr4], aggs: [count] } - └─StreamExchange { dist: HashShard($expr4) } - └─StreamShare { id: 12 } - └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr4, _row_id] } - └─StreamFilter { predicate: (event_type = 2:Int32) } - └─StreamShare { id: 6 } - └─StreamProject { exprs: [event_type, auction, bid, _row_id] } - └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) } - └─StreamRowIdGen { row_id_index: 5 } - └─StreamWatermarkFilter { watermark_descs: [Desc { idx: 4, expr: ($expr1 - '00:00:04':Interval) }] } - └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id], output_watermarks: [_row_id] } - └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } + └─StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr4)] } + └─StreamHashAgg [append_only] { group_key: [$expr4], aggs: [count] } + └─StreamExchange { dist: HashShard($expr4) } + └─StreamShare { id: 12 } + └─StreamProject { exprs: [Field(bid, 0:Int32) as $expr4, _row_id] } + └─StreamFilter { predicate: (event_type = 2:Int32) } + └─StreamShare { id: 6 } + └─StreamProject { exprs: [event_type, auction, bid, _row_id] } + └─StreamFilter { predicate: ((event_type = 1:Int32) OR (event_type = 2:Int32)) } + └─StreamRowIdGen { row_id_index: 5 } + └─StreamWatermarkFilter { watermark_descs: [Desc { idx: 4, expr: ($expr1 - '00:00:04':Interval) }] } + └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _row_id], output_watermarks: [_row_id] } + └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _row_id] } stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck } @@ -1876,13 +1878,14 @@ └── StreamExchange NoShuffle from 2 Fragment 5 - StreamProject { exprs: [(sum0(count) / count($expr4)) as $expr5] } - └── StreamSimpleAgg { aggs: [sum0(count), count($expr4), count] } { result table: 9, state tables: [], distinct tables: [] } + StreamProject { exprs: [(sum0(sum0(count)) / sum0(count($expr4))) as $expr5] } + └── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count($expr4)), count] } { result table: 9, state tables: [], distinct tables: [] } └── StreamExchange Single from 6 Fragment 6 - StreamHashAgg [append_only] { group_key: [$expr4], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] } - └── StreamExchange Hash([0]) from 7 + StreamStatelessSimpleAgg { aggs: [sum0(count), count($expr4)] } + └── StreamHashAgg [append_only] { group_key: [$expr4], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] } + └── StreamExchange Hash([0]) from 7 Fragment 7 StreamNoOp @@ -1904,7 +1907,7 @@ Table 8 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 } - Table 9 { columns: [ sum0(count), count($expr4), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } + Table 9 { columns: [ sum0(sum0(count)), sum0(count($expr4)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 } Table 10 { columns: [ $expr4, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } diff --git a/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml b/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml index 2fcfc73de357..03b1629b931f 100644 --- a/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml +++ b/src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml @@ -660,26 +660,33 @@ ├── materialized table: 4294967294 └── StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └── StreamSimpleAgg [append_only] { aggs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } - ├── result table: 0 - ├── state tables: [] + ├── result table: 1 + ├── state tables: [ 0 ] ├── distinct tables: [] └── StreamExchange Single from 1 Fragment 1 StreamProject { exprs: [ao.s, ',':Varchar, ao.o, ao._row_id] } └── Chain { table: ao, columns: [ao.o, ao.s, ao._row_id], pk: [ao._row_id], dist: UpstreamHashShard(ao._row_id) } - ├── state table: 1 + ├── state table: 2 ├── Upstream └── BatchPlanNode Table 0 + ├── columns: [ ao_o, ao__row_id, ao_s, ',':Varchar ] + ├── primary key: [ $0 ASC, $1 ASC ] + ├── value indices: [ 0, 1, 2, 3 ] + ├── distribution key: [] + └── read pk prefix len hint: 0 + + Table 1 ├── columns: [ string_agg(ao_s, ',':Varchar order_by(ao_o ASC)), count ] ├── primary key: [] ├── value indices: [ 0, 1 ] ├── distribution key: [] └── read pk prefix len hint: 0 - Table 1 + Table 2 ├── columns: [ vnode, _row_id, ao_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -913,26 +920,33 @@ ├── materialized table: 4294967294 └── StreamProject { exprs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └── StreamSimpleAgg [append_only] { aggs: [count(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } - ├── result table: 0 - ├── state tables: [] + ├── result table: 1 + ├── state tables: [ 0 ] ├── distinct tables: [] └── StreamExchange Single from 1 Fragment 1 StreamProject { exprs: [ao.v, ao.s, ',':Varchar, ao.o, ao._row_id] } └── Chain { table: ao, columns: [ao.v, ao.o, ao.s, ao._row_id], pk: [ao._row_id], dist: UpstreamHashShard(ao._row_id) } - ├── state table: 1 + ├── state table: 2 ├── Upstream └── BatchPlanNode Table 0 + ├── columns: [ ao_o, ao__row_id, ao_s, ',':Varchar ] + ├── primary key: [ $0 ASC, $1 ASC ] + ├── value indices: [ 0, 1, 2, 3 ] + ├── distribution key: [] + └── read pk prefix len hint: 0 + + Table 1 ├── columns: [ count(ao_v), string_agg(ao_s, ',':Varchar order_by(ao_o ASC)), count ] ├── primary key: [] ├── value indices: [ 0, 1, 2 ] ├── distribution key: [] └── read pk prefix len hint: 0 - Table 1 + Table 2 ├── columns: [ vnode, _row_id, ao_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -1036,26 +1050,33 @@ ├── materialized table: 4294967294 └── StreamProject { exprs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC))] } └── StreamSimpleAgg [append_only] { aggs: [max(ao.v), string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } - ├── result table: 0 - ├── state tables: [] + ├── result table: 1 + ├── state tables: [ 0 ] ├── distinct tables: [] └── StreamExchange Single from 1 Fragment 1 StreamProject { exprs: [ao.v, ao.s, ',':Varchar, ao.o, ao._row_id] } └── Chain { table: ao, columns: [ao.v, ao.o, ao.s, ao._row_id], pk: [ao._row_id], dist: UpstreamHashShard(ao._row_id) } - ├── state table: 1 + ├── state table: 2 ├── Upstream └── BatchPlanNode Table 0 + ├── columns: [ ao_o, ao__row_id, ao_s, ',':Varchar ] + ├── primary key: [ $0 ASC, $1 ASC ] + ├── value indices: [ 0, 1, 2, 3 ] + ├── distribution key: [] + └── read pk prefix len hint: 0 + + Table 1 ├── columns: [ max(ao_v), string_agg(ao_s, ',':Varchar order_by(ao_o ASC)), count ] ├── primary key: [] ├── value indices: [ 0, 1, 2 ] ├── distribution key: [] └── read pk prefix len hint: 0 - Table 1 + Table 2 ├── columns: [ vnode, _row_id, ao_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] @@ -1941,26 +1962,33 @@ ├── materialized table: 4294967294 └── StreamProject { exprs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), ao.k] } └── StreamHashAgg [append_only] { group_key: [ao.k], aggs: [string_agg(ao.s, ',':Varchar order_by(ao.o ASC)), count] } - ├── result table: 0 - ├── state tables: [] + ├── result table: 1 + ├── state tables: [ 0 ] ├── distinct tables: [] └── StreamExchange Hash([0]) from 1 Fragment 1 StreamProject { exprs: [ao.k, ao.s, ',':Varchar, ao.o, ao._row_id] } └── Chain { table: ao, columns: [ao.k, ao.o, ao.s, ao._row_id], pk: [ao._row_id], dist: UpstreamHashShard(ao._row_id) } - ├── state table: 1 + ├── state table: 2 ├── Upstream └── BatchPlanNode Table 0 + ├── columns: [ ao_k, ao_o, ao__row_id, ao_s, ',':Varchar ] + ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ] + ├── value indices: [ 1, 2, 3, 4 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 + + Table 1 ├── columns: [ ao_k, string_agg(ao_s, ',':Varchar order_by(ao_o ASC)), count ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] ├── distribution key: [ 0 ] └── read pk prefix len hint: 1 - Table 1 + Table 2 ├── columns: [ vnode, _row_id, ao_backfill_finished ] ├── primary key: [ $0 ASC ] ├── value indices: [ 1, 2 ] diff --git a/src/frontend/src/expr/agg_call.rs b/src/frontend/src/expr/agg_call.rs index 6760376de1d8..44d8060ac319 100644 --- a/src/frontend/src/expr/agg_call.rs +++ b/src/frontend/src/expr/agg_call.rs @@ -69,7 +69,9 @@ impl AggCall { // XXX: some special cases that can not be handled by signature map. // may return list or struct type - (AggKind::Min | AggKind::Max | AggKind::FirstValue, [input]) => input.clone(), + (AggKind::Min | AggKind::Max | AggKind::FirstValue | AggKind::LastValue, [input]) => { + input.clone() + } (AggKind::ArrayAgg, [input]) => List(Box::new(input.clone())), // functions that are rewritten in the frontend and don't exist in the expr crate (AggKind::Avg, [input]) => match input { diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index a586d46de737..b178c3704832 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -39,6 +39,125 @@ use crate::utils::{ }; use crate::TableCatalog; +/// Macros to generate match arms for [`AggKind`]. +/// IMPORTANT: These macros must be carefully maintained especially when adding new [`AggKind`] +/// variants. +pub(crate) mod agg_kinds { + /// [`AggKind`]s that are currently not supported in streaming mode. + macro_rules! unimplemented_in_stream { + () => { + AggKind::BitAnd + | AggKind::BitOr + | AggKind::BoolAnd + | AggKind::BoolOr + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg + | AggKind::PercentileCont + | AggKind::PercentileDisc + | AggKind::Mode + }; + } + pub(crate) use unimplemented_in_stream; + + /// [`AggKind`]s that should've been rewritten to other kinds. These kinds should not appear + /// when generating physical plan nodes. + macro_rules! rewritten { + () => { + AggKind::Avg + | AggKind::StddevPop + | AggKind::StddevSamp + | AggKind::VarPop + | AggKind::VarSamp + }; + } + pub(crate) use rewritten; + + /// [`AggKind`]s of which the aggregate results are not affected by the user given ORDER BY + /// clause. + macro_rules! result_unaffected_by_order_by { + () => { + AggKind::BitAnd + | AggKind::BitOr + | AggKind::BitXor // XOR is commutative and associative + | AggKind::BoolAnd + | AggKind::BoolOr + | AggKind::Min + | AggKind::Max + | AggKind::Sum + | AggKind::Sum0 + | AggKind::Count + | AggKind::Avg + | AggKind::ApproxCountDistinct + | AggKind::VarPop + | AggKind::VarSamp + | AggKind::StddevPop + | AggKind::StddevSamp + }; + } + pub(crate) use result_unaffected_by_order_by; + + /// [`AggKind`]s that must be called with ORDER BY clause. These are slightly different from + /// variants not in [`result_unaffected_by_order_by`], in that variants returned by this macro + /// should be banned while the others should just be warned. + macro_rules! must_have_order_by { + () => { + AggKind::FirstValue + | AggKind::LastValue + | AggKind::PercentileCont + | AggKind::PercentileDisc + | AggKind::Mode + }; + } + pub(crate) use must_have_order_by; + + /// [`AggKind`]s of which the aggregate results are not affected by the user given DISTINCT + /// keyword. + macro_rules! result_unaffected_by_distinct { + () => { + AggKind::BitAnd + | AggKind::BitOr + | AggKind::BoolAnd + | AggKind::BoolOr + | AggKind::Min + | AggKind::Max + | AggKind::ApproxCountDistinct + }; + } + pub(crate) use result_unaffected_by_distinct; + + /// [`AggKind`]s that are simply cannot 2-phased. + macro_rules! simply_cannot_two_phase { + () => { + AggKind::StringAgg + | AggKind::ApproxCountDistinct + | AggKind::ArrayAgg + | AggKind::JsonbAgg + | AggKind::JsonbObjectAgg + | AggKind::PercentileCont + | AggKind::PercentileDisc + | AggKind::Mode + }; + } + pub(crate) use simply_cannot_two_phase; + + /// [`AggKind`]s that are implemented with a single value state (so-called stateless). + macro_rules! single_value_state { + () => { + AggKind::Sum | AggKind::Sum0 | AggKind::Count | AggKind::BitXor + }; + } + pub(crate) use single_value_state; + + /// [`AggKind`]s that are implemented with a single value state (so-called stateless) iff the + /// input is append-only. + macro_rules! single_value_state_iff_in_append_only { + () => { + AggKind::Max | AggKind::Min + }; + } + pub(crate) use single_value_state_iff_in_append_only; +} + /// [`Agg`] groups input data by their group key and computes aggregation functions. /// /// It corresponds to the `GROUP BY` operator in a SQL query statement together with the aggregate @@ -82,17 +201,6 @@ impl Agg { ColIndexMapping::with_target_size(map, self.output_len()) } - pub(crate) fn can_two_phase_agg(&self) -> bool { - self.call_support_two_phase() - && !self.is_agg_result_affected_by_order() - && self.two_phase_agg_enabled() - } - - /// Must try two phase agg iff we are forced to, and we satisfy the constraints. - pub(crate) fn must_try_two_phase_agg(&self) -> bool { - self.two_phase_agg_forced() && self.can_two_phase_agg() - } - fn two_phase_agg_forced(&self) -> bool { self.ctx().session_ctx().config().get_force_two_phase_agg() } @@ -101,6 +209,25 @@ impl Agg { self.ctx().session_ctx().config().get_enable_two_phase_agg() } + pub(crate) fn can_two_phase_agg(&self) -> bool { + self.two_phase_agg_enabled() + && !self.agg_calls.is_empty() + && self.agg_calls.iter().all(|call| { + let agg_kind_ok = !matches!(call.agg_kind, agg_kinds::simply_cannot_two_phase!()); + let order_ok = matches!(call.agg_kind, agg_kinds::result_unaffected_by_order_by!()) + || call.order_by.is_empty(); + let distinct_ok = + matches!(call.agg_kind, agg_kinds::result_unaffected_by_distinct!()) + || !call.distinct; + agg_kind_ok && order_ok && distinct_ok + }) + } + + /// Must try two phase agg iff we are forced to, and we satisfy the constraints. + pub(crate) fn must_try_two_phase_agg(&self) -> bool { + self.two_phase_agg_forced() && self.can_two_phase_agg() + } + /// Generally used by two phase hash agg. /// If input dist already satisfies hash agg distribution, /// it will be more expensive to do two phase agg, should just do shuffle agg. @@ -112,21 +239,12 @@ impl Agg { input_dist.satisfies(&required_dist) } - fn call_support_two_phase(&self) -> bool { - !self.agg_calls.is_empty() - && self.agg_calls.iter().all(|call| { - matches!( - call.agg_kind, - AggKind::Min | AggKind::Max | AggKind::Sum | AggKind::Count - ) && !call.distinct - }) - } - - /// Check if the aggregation result will be affected by order by clause, if any. - pub(crate) fn is_agg_result_affected_by_order(&self) -> bool { - self.agg_calls - .iter() - .any(|call| matches!(call.agg_kind, AggKind::StringAgg | AggKind::ArrayAgg)) + /// See if all stream aggregation calls have a stateless local agg counterpart. + pub(crate) fn all_local_aggs_are_stateless(&self, stream_input_append_only: bool) -> bool { + self.agg_calls.iter().all(|c| { + matches!(c.agg_kind, agg_kinds::single_value_state!()) + || (matches!(c.agg_kind, agg_kinds::single_value_state_iff_in_append_only!() if stream_input_append_only)) + }) } pub(crate) fn watermark_group_key(&self, input_watermark_columns: &FixedBitSet) -> Vec { @@ -395,23 +513,14 @@ impl Agg { } }; - let gen_table_state = |agg_kind: AggKind| -> TableState { + let gen_table_state = |fields: Vec| -> TableState { let (mut table_builder, included_upstream_indices, _) = self.create_table_builder(me.ctx(), window_col_idx); let read_prefix_len_hint = table_builder.get_current_pk_len(); - match agg_kind { - AggKind::ApproxCountDistinct => { - // Add register column. - table_builder.add_column(&Field { - data_type: DataType::List(Box::new(DataType::Int64)), - name: String::from("registers"), - sub_fields: vec![], - type_name: String::default(), - }); - } - _ => panic!("state of agg kind `{agg_kind}` is not supposed to be `TableState`"), - } + fields.iter().for_each(|field| { + table_builder.add_column(field); + }); let mapping = ColIndexMapping::with_included_columns(&included_upstream_indices, in_fields.len()); @@ -427,78 +536,85 @@ impl Agg { self.agg_calls .iter() .map(|agg_call| match agg_call.agg_kind { + agg_kinds::single_value_state_iff_in_append_only!() if in_append_only => { + AggCallState::ResultValue + } + agg_kinds::single_value_state!() => AggCallState::ResultValue, AggKind::Min | AggKind::Max + | AggKind::FirstValue + | AggKind::LastValue | AggKind::StringAgg - | AggKind::ArrayAgg - | AggKind::JsonbAgg - | AggKind::JsonbObjectAgg - | AggKind::FirstValue => { - if !in_append_only { - // columns with order requirement in state table - let sort_keys = { - match agg_call.agg_kind { - AggKind::Min => { - vec![(OrderType::ascending(), agg_call.inputs[0].index)] - } - AggKind::Max => { - vec![(OrderType::descending(), agg_call.inputs[0].index)] + | AggKind::ArrayAgg => { + // columns with order requirement in state table + let sort_keys = { + match agg_call.agg_kind { + AggKind::Min => { + vec![(OrderType::ascending(), agg_call.inputs[0].index)] + } + AggKind::Max => { + vec![(OrderType::descending(), agg_call.inputs[0].index)] + } + AggKind::FirstValue + | AggKind::LastValue + | AggKind::StringAgg + | AggKind::ArrayAgg => { + if agg_call.order_by.is_empty() { + me.ctx().warn_to_user(format!( + "{} without ORDER BY may produce non-deterministic result", + agg_call.agg_kind, + )); } - AggKind::StringAgg - | AggKind::ArrayAgg - | AggKind::JsonbAgg - | AggKind::JsonbObjectAgg => agg_call + agg_call .order_by .iter() - .map(|o| (o.order_type, o.column_index)) - .collect(), - _ => unreachable!(), - } - }; - // other columns that should be contained in state table - let include_keys = match agg_call.agg_kind { - AggKind::StringAgg - | AggKind::ArrayAgg - | AggKind::JsonbAgg - | AggKind::JsonbObjectAgg => { - agg_call.inputs.iter().map(|i| i.index).collect() + .map(|o| { + ( + if agg_call.agg_kind == AggKind::LastValue { + o.order_type.reverse() + } else { + o.order_type + }, + o.column_index, + ) + }) + .collect() } - _ => vec![], - }; - let state = gen_materialized_input_state(sort_keys, include_keys); - AggCallState::MaterializedInput(Box::new(state)) - } else { - AggCallState::ResultValue - } + _ => unreachable!(), + } + }; + // other columns that should be contained in state table + let include_keys = match agg_call.agg_kind { + AggKind::FirstValue + | AggKind::LastValue + | AggKind::StringAgg + | AggKind::ArrayAgg => agg_call.inputs.iter().map(|i| i.index).collect(), + _ => vec![], + }; + let state = gen_materialized_input_state(sort_keys, include_keys); + AggCallState::MaterializedInput(Box::new(state)) } - AggKind::BitXor - | AggKind::Sum - | AggKind::Sum0 - | AggKind::Count - | AggKind::Avg - | AggKind::StddevPop - | AggKind::StddevSamp - | AggKind::VarPop - | AggKind::VarSamp => AggCallState::ResultValue, AggKind::ApproxCountDistinct => { - if !in_append_only { - // FIXME: now the approx count distinct on a non-append-only stream does not - // really has state and can handle failover or scale-out correctly - AggCallState::ResultValue - } else { - let state = gen_table_state(agg_call.agg_kind); + // NOTE(rc): This is quite confusing, in that the append-only version has table + // state while updatable version has value state. The latter one may be + // incorrect. + if in_append_only { + let state = gen_table_state(vec![Field { + data_type: DataType::List(Box::new(DataType::Int64)), + name: String::from("registers"), + sub_fields: vec![], + type_name: String::default(), + }]); AggCallState::Table(Box::new(state)) + } else { + AggCallState::ResultValue } } - // TODO: is its state a Table? - AggKind::BitAnd - | AggKind::BitOr - | AggKind::BoolAnd - | AggKind::BoolOr - | AggKind::PercentileCont - | AggKind::PercentileDisc - | AggKind::Mode => { - unimplemented!() + agg_kinds::rewritten!() => { + unreachable!("should have been rewritten") + } + agg_kinds::unimplemented_in_stream!() => { + unreachable!("should have been banned") } }) .collect() @@ -728,21 +844,21 @@ impl PlanAggCall { | AggKind::BoolOr | AggKind::Min | AggKind::Max - | AggKind::StringAgg | AggKind::FirstValue - | AggKind::PercentileCont - | AggKind::PercentileDisc - | AggKind::Mode => self.agg_kind, - AggKind::Count | AggKind::ApproxCountDistinct | AggKind::Sum0 => AggKind::Sum0, + | AggKind::LastValue => self.agg_kind, AggKind::Sum => AggKind::Sum, - AggKind::Avg => { - panic!("Avg aggregation should have been rewritten to Sum+Count") - } - AggKind::ArrayAgg | AggKind::JsonbAgg | AggKind::JsonbObjectAgg => { - panic!("2-phase {} is not supported yet", self.agg_kind) + AggKind::Sum0 | AggKind::Count => AggKind::Sum0, + agg_kinds::simply_cannot_two_phase!() => { + unreachable!( + "{} aggregation cannot be converted to 2-phase", + self.agg_kind + ) } - AggKind::StddevPop | AggKind::StddevSamp | AggKind::VarPop | AggKind::VarSamp => { - panic!("Stddev/Var aggregation should have been rewritten to Sum, Count and Case") + agg_kinds::rewritten!() => { + unreachable!( + "{} aggregation should have been rewritten to Sum, Count and Case", + self.agg_kind + ) } }; PlanAggCall { diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index b11450a3c62c..e5d12b14532d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -19,7 +19,7 @@ use risingwave_common::types::{DataType, Datum, ScalarImpl}; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_expr::agg::AggKind; -use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder}; +use super::generic::{self, agg_kinds, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder}; use super::utils::impl_distill_by_unit; use super::{ BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, PlanBase, PlanRef, @@ -179,14 +179,6 @@ impl LogicalAgg { Ok(new_stream_hash_agg(logical, None).into()) } - /// See if all stream aggregation calls have a stateless local agg counterpart. - fn all_local_aggs_are_stateless(&self, stream_input_append_only: bool) -> bool { - self.agg_calls().iter().all(|c| { - matches!(c.agg_kind, AggKind::Sum | AggKind::Count) - || (matches!(c.agg_kind, AggKind::Min | AggKind::Max) && stream_input_append_only) - }) - } - /// Generates distributed stream plan. fn gen_dist_stream_agg_plan(&self, stream_input: PlanRef) -> Result { let input_dist = stream_input.distribution(); @@ -215,7 +207,9 @@ impl LogicalAgg { // can be applied on stateless simple agg calls, // with input distributed by [`Distribution::AnyShard`] if self.group_key().is_empty() - && self.all_local_aggs_are_stateless(stream_input.append_only()) + && self + .core + .all_local_aggs_are_stateless(stream_input.append_only()) && input_dist.satisfies(&RequiredDist::AnyShard) { return self.gen_stateless_two_phase_streaming_agg_plan(stream_input); @@ -421,25 +415,21 @@ impl LogicalAggBuilder { let return_type = agg_call.return_type(); let (agg_kind, inputs, mut distinct, mut order_by, filter, direct_args) = agg_call.decompose(); - match &agg_kind { - AggKind::Min | AggKind::Max => { - distinct = false; - order_by = OrderBy::any(); - } - AggKind::Sum - | AggKind::Count - | AggKind::Avg - | AggKind::ApproxCountDistinct - | AggKind::StddevSamp - | AggKind::StddevPop - | AggKind::VarPop - | AggKind::VarSamp => { - order_by = OrderBy::any(); - } - _ => { - // To be conservative, we just treat newly added AggKind in the future as not - // rewritable. - } + + if matches!(agg_kind, agg_kinds::must_have_order_by!()) && order_by.sort_exprs.is_empty() { + return Err(ErrorCode::InvalidInputSyntax(format!( + "Aggregation function {} requires ORDER BY clause", + agg_kind + ))); + } + + // try ignore ORDER BY if it doesn't affect the result + if matches!(agg_kind, agg_kinds::result_unaffected_by_order_by!()) { + order_by = OrderBy::any(); + } + // try ignore DISTINCT if it doesn't affect the result + if matches!(agg_kind, agg_kinds::result_unaffected_by_distinct!()) { + distinct = false; } self.is_in_filter_clause = true; @@ -1062,16 +1052,7 @@ fn new_stream_hash_agg(logical: Agg, vnode_col_idx: Option) -> S impl ToStream for LogicalAgg { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { for agg_call in self.agg_calls() { - if matches!( - agg_call.agg_kind, - AggKind::BitAnd - | AggKind::BitOr - | AggKind::BoolAnd - | AggKind::BoolOr - | AggKind::PercentileCont - | AggKind::PercentileDisc - | AggKind::Mode - ) { + if matches!(agg_call.agg_kind, agg_kinds::unimplemented_in_stream!()) { return Err(ErrorCode::NotImplemented( format!("{} aggregation in materialized view", agg_call.agg_kind), None.into(), diff --git a/src/frontend/src/optimizer/rule/distinct_agg_rule.rs b/src/frontend/src/optimizer/rule/distinct_agg_rule.rs index b392d142e1d0..4b0c4ac25f23 100644 --- a/src/frontend/src/optimizer/rule/distinct_agg_rule.rs +++ b/src/frontend/src/optimizer/rule/distinct_agg_rule.rs @@ -291,6 +291,7 @@ impl DistinctAggRule { | AggKind::JsonbAgg | AggKind::JsonbObjectAgg | AggKind::FirstValue + | AggKind::LastValue | AggKind::StddevPop | AggKind::StddevSamp | AggKind::VarPop diff --git a/src/stream/src/executor/aggregation/minput.rs b/src/stream/src/executor/aggregation/minput.rs index 79bc284f02fd..808689ab3f68 100644 --- a/src/stream/src/executor/aggregation/minput.rs +++ b/src/stream/src/executor/aggregation/minput.rs @@ -93,7 +93,16 @@ impl MaterializedInputState { agg_call .column_orders .iter() - .map(|p| (p.column_index, p.order_type)) + .map(|p| { + ( + p.column_index, + if agg_call.kind == AggKind::LastValue { + p.order_type.reverse() + } else { + p.order_type + }, + ) + }) .unzip() }; @@ -128,7 +137,7 @@ impl MaterializedInputState { let cache_key_serializer = OrderedRowSerde::new(cache_key_data_types, order_types); let cache: Box = match agg_call.kind { - AggKind::Min | AggKind::Max | AggKind::FirstValue => Box::new( + AggKind::Min | AggKind::Max | AggKind::FirstValue | AggKind::LastValue => Box::new( GenericAggStateCache::new(TopNStateCache::new(extreme_cache_size), ExtremeAgg), ), AggKind::StringAgg => Box::new(GenericAggStateCache::new( diff --git a/src/tests/sqlsmith/src/sql_gen/agg.rs b/src/tests/sqlsmith/src/sql_gen/agg.rs index a05c6f75ee9d..9ec3c9e9f670 100644 --- a/src/tests/sqlsmith/src/sql_gen/agg.rs +++ b/src/tests/sqlsmith/src/sql_gen/agg.rs @@ -90,7 +90,20 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { ))) } } - A::FirstValue => None, + kind @ (A::FirstValue | A::LastValue) => { + if order_by.is_empty() { + // `first/last_value` only works when ORDER BY is provided + None + } else { + Some(Expr::Function(make_agg_func( + &kind.to_string(), + exprs, + distinct, + filter, + order_by, + ))) + } + } A::ApproxCountDistinct => { if self.is_distinct_allowed { None