Skip to content

Commit

Permalink
feat(agg,over_window): support first_value, last_value and refact…
Browse files Browse the repository at this point in the history
…or LogicalAgg (#10740)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc committed Jul 5, 2023
1 parent 6933c98 commit aea2da0
Show file tree
Hide file tree
Showing 19 changed files with 546 additions and 242 deletions.
21 changes: 21 additions & 0 deletions e2e_test/batch/aggregate/first_last_value.slt.part
@@ -0,0 +1,21 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t(v1 int, v2 int);

statement ok
insert into t values (1, 2), (5, null), (4, 0);

query iiii
select
first_value(v1 order by v1)
, first_value(v1 order by v2 desc)
, last_value(v1 order by v1)
, last_value(v1 order by v2 asc nulls first)
from t;
----
1 5 5 1

statement ok
drop table t;
26 changes: 26 additions & 0 deletions e2e_test/streaming/basic_agg.slt
Expand Up @@ -99,3 +99,29 @@ drop materialized view v2;

statement ok
drop table t1;

statement ok
create table t(v1 int, v2 int);

statement ok
create materialized view mv as
select
first_value(v1 order by v1) as o1
, first_value(v1 order by v2 desc) as o2
, last_value(v1 order by v1) as o3
, last_value(v1 order by v2 asc nulls first) as o4
from t;

statement ok
insert into t values (1, 2), (5, null), (4, 0);

query iiii
select * from mv;
----
1 5 5 1

statement ok
drop materialized view mv;

statement ok
drop table t;
1 change: 1 addition & 0 deletions proto/expr.proto
Expand Up @@ -317,6 +317,7 @@ message AggCall {
PERCENTILE_CONT = 22;
PERCENTILE_DISC = 23;
MODE = 24;
LAST_VALUE = 25;
}
Type type = 1;
repeated InputRef args = 2;
Expand Down
24 changes: 24 additions & 0 deletions src/common/src/util/sort_util.rs
Expand Up @@ -55,6 +55,15 @@ impl Direction {
}
}

impl Direction {
fn reverse(self) -> Self {
match self {
Self::Ascending => Self::Descending,
Self::Descending => Self::Ascending,
}
}
}

/// Nulls are largest/smallest.
#[derive(PartialEq, Eq, Hash, Copy, Clone, Debug, Display, Default)]
enum NullsAre {
Expand Down Expand Up @@ -201,6 +210,10 @@ impl OrderType {
pub fn nulls_are_last(&self) -> bool {
!self.nulls_are_first()
}

pub fn reverse(self) -> Self {
Self::new(self.direction.reverse(), self.nulls_are)
}
}

impl fmt::Display for OrderType {
Expand Down Expand Up @@ -590,6 +603,17 @@ mod tests {
assert!(OrderType::descending_nulls_last().is_descending());
assert!(OrderType::descending_nulls_last().nulls_are_smallest());
assert!(OrderType::descending_nulls_last().nulls_are_last());

assert_eq!(OrderType::ascending().reverse(), OrderType::descending());
assert_eq!(OrderType::descending().reverse(), OrderType::ascending());
assert_eq!(
OrderType::ascending_nulls_first().reverse(),
OrderType::descending_nulls_last()
);
assert_eq!(
OrderType::ascending_nulls_last().reverse(),
OrderType::descending_nulls_first()
);
}

#[test]
Expand Down
28 changes: 18 additions & 10 deletions src/common/src/util/stream_graph_visitor.rs
Expand Up @@ -97,11 +97,15 @@ fn visit_stream_node_tables_inner<F>(
NodeBody::HashAgg(node) => {
assert_eq!(node.agg_call_states.len(), node.agg_calls.len());
always!(node.result_table, "HashAggResult");
for state in &mut node.agg_call_states {
if let agg_call_state::Inner::MaterializedInputState(s) =
state.inner.as_mut().unwrap()
{
always!(s.table, "HashAgg");
for (call_idx, state) in node.agg_call_states.iter_mut().enumerate() {
match state.inner.as_mut().unwrap() {
agg_call_state::Inner::ResultValueState(_) => {}
agg_call_state::Inner::TableState(s) => {
always!(s.table, &format!("HashAggCall{}", call_idx));
}
agg_call_state::Inner::MaterializedInputState(s) => {
always!(s.table, &format!("HashAggCall{}", call_idx));
}
}
}
for (distinct_col, dedup_table) in &mut node.distinct_dedup_tables {
Expand All @@ -111,11 +115,15 @@ fn visit_stream_node_tables_inner<F>(
NodeBody::SimpleAgg(node) => {
assert_eq!(node.agg_call_states.len(), node.agg_calls.len());
always!(node.result_table, "SimpleAggResult");
for state in &mut node.agg_call_states {
if let agg_call_state::Inner::MaterializedInputState(s) =
state.inner.as_mut().unwrap()
{
always!(s.table, "SimpleAgg");
for (call_idx, state) in node.agg_call_states.iter_mut().enumerate() {
match state.inner.as_mut().unwrap() {
agg_call_state::Inner::ResultValueState(_) => {}
agg_call_state::Inner::TableState(s) => {
always!(s.table, &format!("SimpleAggCall{}", call_idx));
}
agg_call_state::Inner::MaterializedInputState(s) => {
always!(s.table, &format!("SimpleAggCall{}", call_idx));
}
}
}
for (distinct_col, dedup_table) in &mut node.distinct_dedup_tables {
Expand Down
3 changes: 3 additions & 0 deletions src/expr/src/agg/def.rs
Expand Up @@ -228,6 +228,7 @@ pub enum AggKind {
JsonbAgg,
JsonbObjectAgg,
FirstValue,
LastValue,
VarPop,
VarSamp,
StddevPop,
Expand Down Expand Up @@ -257,6 +258,7 @@ impl AggKind {
PbType::JsonbAgg => Ok(AggKind::JsonbAgg),
PbType::JsonbObjectAgg => Ok(AggKind::JsonbObjectAgg),
PbType::FirstValue => Ok(AggKind::FirstValue),
PbType::LastValue => Ok(AggKind::LastValue),
PbType::StddevPop => Ok(AggKind::StddevPop),
PbType::StddevSamp => Ok(AggKind::StddevSamp),
PbType::VarPop => Ok(AggKind::VarPop),
Expand Down Expand Up @@ -287,6 +289,7 @@ impl AggKind {
Self::JsonbAgg => PbType::JsonbAgg,
Self::JsonbObjectAgg => PbType::JsonbObjectAgg,
Self::FirstValue => PbType::FirstValue,
Self::LastValue => PbType::LastValue,
Self::StddevPop => PbType::StddevPop,
Self::StddevSamp => PbType::StddevSamp,
Self::VarPop => PbType::VarPop,
Expand Down
7 changes: 6 additions & 1 deletion src/expr/src/agg/general.rs
Expand Up @@ -74,10 +74,15 @@ where
}

#[aggregate("first_value(*) -> auto")]
fn first<T>(state: T, _: T) -> T {
fn first_value<T>(state: T, _: T) -> T {
state
}

#[aggregate("last_value(*) -> auto")]
fn last_value<T>(_: T, input: T) -> T {
input
}

/// Note the following corner cases:
///
/// ```slt
Expand Down
25 changes: 24 additions & 1 deletion src/frontend/planner_test/tests/testdata/input/agg.yaml
Expand Up @@ -862,4 +862,27 @@
create table t (x int, y varchar);
select mode(1) within group (order by y desc) from t;
expected_outputs:
- binder_error
- binder_error

- sql: |
create table t (x int, y int);
select first_value(x) from t;
expected_outputs:
- planner_error
- sql: |
create table t (x int, y int);
select last_value(x) from t;
expected_outputs:
- planner_error
- sql: |
create table t (x int, y int);
select first_value(x order by y asc) from t;
expected_outputs:
- batch_plan
- stream_plan
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
expected_outputs:
- batch_plan
- stream_plan
34 changes: 34 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Expand Up @@ -1508,3 +1508,37 @@
Caused by:
Invalid input syntax: no arguments are expected in mode agg
- sql: |
create table t (x int, y int);
select first_value(x) from t;
planner_error: 'Invalid input syntax: Aggregation function first_value requires ORDER BY clause'
- sql: |
create table t (x int, y int);
select last_value(x) from t;
planner_error: 'Invalid input syntax: Aggregation function last_value requires ORDER BY clause'
- sql: |
create table t (x int, y int);
select first_value(x order by y asc) from t;
batch_plan: |-
BatchSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC))] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [first_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [first_value(t.x order_by(t.y ASC))] }
└─StreamSimpleAgg { aggs: [first_value(t.x order_by(t.y ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t (x int, y int);
select last_value(x order by y desc nulls last) from t;
batch_plan: |-
BatchSimpleAgg { aggs: [last_value(t.x order_by(t.y DESC NULLS LAST))] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.x, t.y], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [last_value], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [last_value(t.x order_by(t.y DESC NULLS LAST))] }
└─StreamSimpleAgg { aggs: [last_value(t.x order_by(t.y DESC NULLS LAST)), count] }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
33 changes: 18 additions & 15 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml
Expand Up @@ -1651,12 +1651,13 @@
│ │ └─BatchScan { table: auction, columns: [auction.id, auction.item_name], distribution: UpstreamHashShard(auction.id) }
│ └─BatchExchange { order: [], dist: HashShard(bid.auction) }
│ └─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard }
└─BatchProject { exprs: [(sum0(count) / count(bid.auction)) as $expr1] }
└─BatchSimpleAgg { aggs: [sum0(count), count(bid.auction)] }
└─BatchProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] }
└─BatchSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction))] }
└─BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [bid.auction], aggs: [count] }
└─BatchExchange { order: [], dist: HashShard(bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard }
└─BatchSimpleAgg { aggs: [sum0(count), count(bid.auction)] }
└─BatchHashAgg { group_key: [bid.auction], aggs: [count] }
└─BatchExchange { order: [], dist: HashShard(bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (count(bid.auction) >= $expr1), output: [auction.id, auction.item_name, count(bid.auction)] }
Expand All @@ -1668,12 +1669,13 @@
│ └─StreamExchange { dist: HashShard(bid.auction) }
│ └─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(sum0(count) / count(bid.auction)) as $expr1] }
└─StreamSimpleAgg { aggs: [sum0(count), count(bid.auction), count] }
└─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] }
└─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] }
└─StreamExchange { dist: HashShard(bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
└─StreamStatelessSimpleAgg { aggs: [sum0(count), count(bid.auction)] }
└─StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] }
└─StreamExchange { dist: HashShard(bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck }
Expand All @@ -1697,13 +1699,14 @@
└── BatchPlanNode
Fragment 3
StreamProject { exprs: [(sum0(count) / count(bid.auction)) as $expr1] }
└── StreamSimpleAgg { aggs: [sum0(count), count(bid.auction), count] } { result table: 9, state tables: [], distinct tables: [] }
StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(bid.auction))) as $expr1] }
└── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(bid.auction)), count] } { result table: 9, state tables: [], distinct tables: [] }
└── StreamExchange Single from 4
Fragment 4
StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 5
StreamStatelessSimpleAgg { aggs: [sum0(count), count(bid.auction)] }
└── StreamHashAgg [append_only] { group_key: [bid.auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 5
Fragment 5
Chain { table: bid, columns: [bid.auction, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } { state table: 11 }
Expand Down Expand Up @@ -1744,7 +1747,7 @@
├── read pk prefix len hint: 1
└── vnode column idx: 0
Table 9 { columns: [ sum0(count), count(bid_auction), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 }
Table 9 { columns: [ sum0(sum0(count)), sum0(count(bid_auction)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 }
Table 10 { columns: [ bid_auction, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Expand Down
39 changes: 21 additions & 18 deletions src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml
Expand Up @@ -1629,12 +1629,13 @@
│ │ └─BatchSource { source: auction, columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id], filter: (None, None) }
│ └─BatchExchange { order: [], dist: HashShard(auction) }
│ └─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
└─BatchProject { exprs: [(sum0(count) / count(auction)) as $expr1] }
└─BatchSimpleAgg { aggs: [sum0(count), count(auction)] }
└─BatchProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] }
└─BatchSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction))] }
└─BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [auction], aggs: [count] }
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
└─BatchSimpleAgg { aggs: [sum0(count), count(auction)] }
└─BatchHashAgg { group_key: [auction], aggs: [count] }
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id], filter: (None, None) }
stream_plan: |-
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck }
└─StreamDynamicFilter { predicate: (count(auction) >= $expr1), output: [id, item_name, count(auction)] }
Expand All @@ -1650,15 +1651,16 @@
│ └─StreamRowIdGen { row_id_index: 7 }
│ └─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(sum0(count) / count(auction)) as $expr1] }
└─StreamSimpleAgg { aggs: [sum0(count), count(auction), count] }
└─StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] }
└─StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg [append_only] { group_key: [auction], aggs: [count] }
└─StreamExchange { dist: HashShard(auction) }
└─StreamShare { id: 5 }
└─StreamProject { exprs: [auction, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─StreamStatelessSimpleAgg { aggs: [sum0(count), count(auction)] }
└─StreamHashAgg [append_only] { group_key: [auction], aggs: [count] }
└─StreamExchange { dist: HashShard(auction) }
└─StreamShare { id: 5 }
└─StreamProject { exprs: [auction, _row_id] }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction_id, auction_item_name, bid_count], stream_key: [auction_id, auction_item_name], pk_columns: [auction_id, auction_item_name], pk_conflict: NoCheck }
Expand Down Expand Up @@ -1689,13 +1691,14 @@
└── StreamSource { source: bid, columns: [auction, bidder, price, channel, url, date_time, extra, _row_id] } { source state table: 8 }
Fragment 4
StreamProject { exprs: [(sum0(count) / count(auction)) as $expr1] }
└── StreamSimpleAgg { aggs: [sum0(count), count(auction), count] } { result table: 9, state tables: [], distinct tables: [] }
StreamProject { exprs: [(sum0(sum0(count)) / sum0(count(auction))) as $expr1] }
└── StreamSimpleAgg { aggs: [sum0(sum0(count)), sum0(count(auction)), count] } { result table: 9, state tables: [], distinct tables: [] }
└── StreamExchange Single from 5
Fragment 5
StreamHashAgg [append_only] { group_key: [auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 6
StreamStatelessSimpleAgg { aggs: [sum0(count), count(auction)] }
└── StreamHashAgg [append_only] { group_key: [auction], aggs: [count] } { result table: 10, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 6
Fragment 6
StreamNoOp
Expand Down Expand Up @@ -1724,7 +1727,7 @@
Table 8 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
Table 9 { columns: [ sum0(count), count(auction), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 }
Table 9 { columns: [ sum0(sum0(count)), sum0(count(auction)), count ], primary key: [], value indices: [ 0, 1, 2 ], distribution key: [], read pk prefix len hint: 0 }
Table 10 { columns: [ auction, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Expand Down

0 comments on commit aea2da0

Please sign in to comment.