Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(on conflict): add with version column clause and behavior #16091

Merged
merged 23 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions e2e_test/streaming/with_version_column.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@

statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement error
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
create table t0 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict do update if not null with version column(v5);

statement error
create table t0 (v1 int, v2 int, v3 int, v4 bool, primary key(v1)) on conflict do update if not null with version column(v4);

statement error
create table t0 (v1 int, v2 int, v3 int, v4 bool, primary key(v1)) on conflict do update if not null with version column v4;

statement error
create table t0 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict ignore with version column(v4);

statement ok
create table t1 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict do update if not null with version column(v4);

statement ok
insert into t1 values (1,null,2, 4), (2,3,null, 1);

statement ok
insert into t1 values (3,null,5,2), (3,6,null, 1);

statement ok
insert into t1 values (1,5,null,5), (2,null, 6, 1);

statement ok
create materialized view mv1 as select * from t1;


query III rowsort
select v1, v2, v3, v4 from mv1;
----
1 5 2 5
2 3 6 1
3 NULL 5 2


statement ok
update t1 set v2 = 2 where v1 > 1;

statement ok
flush;

query IIII rowsort
select v1, v2, v3, v4 from mv1;
----
1 5 2 5
2 2 6 1
3 2 5 2

statement ok
drop materialized view mv1;

statement ok
drop table t1;


statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t2 (v1 int, v2 int, v3 int, v4 int, primary key(v1)) on conflict overwrite with version column(v4);

statement ok
insert into t2 values (1,null,2, 4), (2,3,null, 1);

statement ok
insert into t2 values (3,null,5,2), (3,6,null, 1);

statement ok
insert into t2 values (1,5,null,3), (2,null, 6, 1);

statement ok
create materialized view mv2 as select * from t2;


query III rowsort
select v1, v2, v3, v4 from mv2;
----
1 NULL 2 4
2 NULL 6 1
3 NULL 5 2

statement ok
drop materialized view mv2;

statement ok
drop table t2;
3 changes: 3 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ message Table {
// TTL of the record in the table, to ensure the consistency with other tables in the streaming plan, it only applies to append-only tables.
optional uint32 retention_seconds = 37;

// This field specifies the index of the column set in the "with version column" within all the columns. It is used for filtering during "on conflict" operations.
optional uint32 version_column_index = 38;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ impl TestCase {
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
wildcard_idx,
Expand All @@ -449,6 +450,7 @@ impl TestCase {
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
)
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub struct TableCatalog {
/// `No Check`.
pub conflict_behavior: ConflictBehavior,

pub version_column_index: Option<usize>,

pub read_prefix_len_hint: usize,

/// Per-table catalog version, used by schema change. `None` for internal tables and tests.
Expand Down Expand Up @@ -438,6 +440,7 @@ impl TableCatalog {
watermark_indices: self.watermark_columns.ones().map(|x| x as _).collect_vec(),
dist_key_in_pk: self.dist_key_in_pk.iter().map(|x| *x as _).collect(),
handle_pk_conflict_behavior: self.conflict_behavior.to_protobuf().into(),
version_column_index: self.version_column_index.map(|value| value as u32),
cardinality: Some(self.cardinality.to_protobuf()),
initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
Expand Down Expand Up @@ -519,6 +522,7 @@ impl From<PbTable> for TableCatalog {
let mut col_index: HashMap<i32, usize> = HashMap::new();

let conflict_behavior = ConflictBehavior::from_protobuf(&tb_conflict_behavior);
let version_column_index = tb.version_column_index.map(|value| value as usize);
let columns: Vec<ColumnCatalog> = tb.columns.into_iter().map(ColumnCatalog::from).collect();
for (idx, catalog) in columns.clone().into_iter().enumerate() {
let col_name = catalog.name();
Expand Down Expand Up @@ -558,6 +562,7 @@ impl From<PbTable> for TableCatalog {
value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
definition: tb.definition,
conflict_behavior,
version_column_index,
read_prefix_len_hint: tb.read_prefix_len_hint as usize,
version: tb.version.map(TableVersion::from_prost),
watermark_columns,
Expand Down Expand Up @@ -672,6 +677,7 @@ mod tests {
incoming_sinks: vec![],
created_at_cluster_version: None,
initialized_at_cluster_version: None,
version_column_index: None,
}
.into();

Expand Down Expand Up @@ -732,6 +738,7 @@ mod tests {
created_at_cluster_version: None,
initialized_at_cluster_version: None,
dependent_relations: vec![],
version_column_index: None,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub async fn replace_table_with_definition(
source_watermarks,
append_only,
on_conflict,
with_version_column,
wildcard_idx,
..
} = definition
Expand All @@ -70,6 +71,7 @@ pub async fn replace_table_with_definition(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ pub(crate) async fn reparse_table_for_sink(
source_watermarks,
append_only,
on_conflict,
with_version_column,
..
} = definition
else {
Expand All @@ -631,6 +632,7 @@ pub(crate) async fn reparse_table_for_sink(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)
.await?;

Expand Down
19 changes: 19 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
mut col_id_gen: ColumnIdGenerator,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
include_column_options: IncludeOption,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
if append_only
Expand Down Expand Up @@ -554,6 +555,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
watermark_descs,
append_only,
on_conflict,
with_version_column,
Some(col_id_gen.into_version()),
)
}
Expand All @@ -569,6 +571,7 @@ pub(crate) fn gen_create_table_plan(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
let definition = context.normalized_sql().to_owned();
let mut columns = bind_sql_columns(&column_defs)?;
Expand All @@ -587,6 +590,7 @@ pub(crate) fn gen_create_table_plan(
source_watermarks,
append_only,
on_conflict,
with_version_column,
Some(col_id_gen.into_version()),
)
}
Expand All @@ -603,6 +607,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
version: Option<TableVersion>,
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
ensure_table_constraints_supported(&constraints)?;
Expand Down Expand Up @@ -636,6 +641,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
watermark_descs,
append_only,
on_conflict,
with_version_column,
version,
)
}
Expand All @@ -653,6 +659,7 @@ fn gen_table_plan_inner(
watermark_descs: Vec<WatermarkDesc>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
version: Option<TableVersion>, /* TODO: this should always be `Some` if we support `ALTER
* TABLE` for `CREATE TABLE AS`. */
) -> Result<(PlanRef, Option<PbSource>, PbTable)> {
Expand Down Expand Up @@ -749,6 +756,7 @@ fn gen_table_plan_inner(
row_id_index,
append_only,
on_conflict,
with_version_column,
watermark_descs,
version,
is_external_source,
Expand All @@ -771,6 +779,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
constraints: Vec<TableConstraint>,
mut col_id_gen: ColumnIdGenerator,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<(PlanRef, PbTable)> {
let session = context.session_ctx().clone();
let db_name = session.database();
Expand Down Expand Up @@ -873,6 +882,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_source(
None,
append_only,
on_conflict,
with_version_column,
vec![],
Some(col_id_gen.into_version()),
true,
Expand Down Expand Up @@ -940,6 +950,7 @@ pub(super) async fn handle_create_table_plan(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
include_column_options: IncludeOption,
) -> Result<(PlanRef, Option<PbSource>, PbTable, TableJobType)> {
let source_schema = check_create_table_with_source(
Expand All @@ -962,6 +973,7 @@ pub(super) async fn handle_create_table_plan(
col_id_gen,
append_only,
on_conflict,
with_version_column,
include_column_options,
)
.await?,
Expand All @@ -977,6 +989,7 @@ pub(super) async fn handle_create_table_plan(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)?,
TableJobType::General,
),
Expand All @@ -991,6 +1004,7 @@ pub(super) async fn handle_create_table_plan(
constraints,
col_id_gen,
on_conflict,
with_version_column,
)?;

((plan, None, table), TableJobType::SharedCdcSource)
Expand All @@ -1017,6 +1031,7 @@ pub async fn handle_create_table(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
cdc_table_info: Option<CdcTableInfo>,
include_column_options: IncludeOption,
) -> Result<RwPgResponse> {
Expand Down Expand Up @@ -1049,6 +1064,7 @@ pub async fn handle_create_table(
source_watermarks,
append_only,
on_conflict,
with_version_column,
include_column_options,
)
.await?;
Expand Down Expand Up @@ -1112,6 +1128,7 @@ pub async fn generate_stream_graph_for_table(
source_watermarks: Vec<SourceWatermark>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<(StreamFragmentGraph, Table, Option<PbSource>)> {
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;

Expand All @@ -1129,6 +1146,7 @@ pub async fn generate_stream_graph_for_table(
col_id_gen,
append_only,
on_conflict,
with_version_column,
vec![],
)
.await?
Expand All @@ -1142,6 +1160,7 @@ pub async fn generate_stream_graph_for_table(
source_watermarks,
append_only,
on_conflict,
with_version_column,
)?,
};

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub async fn handle_create_as(
column_defs: Vec<ColumnDef>,
append_only: bool,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
) -> Result<RwPgResponse> {
if column_defs.iter().any(|column| column.data_type.is_some()) {
return Err(ErrorCode::InvalidInputSyntax(
Expand Down Expand Up @@ -106,6 +107,7 @@ pub async fn handle_create_as(
vec![], // No watermark should be defined in for `CREATE TABLE AS`
append_only,
on_conflict,
with_version_column,
Some(col_id_gen.into_version()),
)?;
let mut graph = build_graph(plan)?;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn do_handle_explain(
source_watermarks,
append_only,
on_conflict,
with_version_column,
cdc_table_info,
include_column_options,
wildcard_idx,
Expand All @@ -82,6 +83,7 @@ async fn do_handle_explain(
source_watermarks,
append_only,
on_conflict,
with_version_column,
include_column_options,
)
.await?;
Expand Down
Loading
Loading