Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: alter column for table with connector #12164

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ chmod +x ./scripts/source/prepare_data_after_alter.sh
./scripts/source/prepare_data_after_alter.sh 2
sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data.slt'

echo "--- e2e, kafka alter source again"
./scripts/source/prepare_data_after_alter.sh 3
sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data_2.slt'

echo "--- Run CH-benCHmark"
./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
./risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/streaming/*.slt'
28 changes: 25 additions & 3 deletions e2e_test/source/basic/alter/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ CREATE SOURCE s2 (v2 varchar) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE t (v1 int) with (
connector = 'kafka',
topic = 'kafka_alter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;


statement ok
create materialized view mv1 as select * from s1;

statement ok
create materialized view mv2 as select * from s2;

sleep 10s
sleep 5s

statement ok
flush;
Expand All @@ -35,6 +44,11 @@ select * from s2;
----
11

query I
select * from t;
----
1

# alter source
statement ok
alter source s1 add column v2 varchar;
Expand All @@ -49,7 +63,10 @@ create materialized view mv3 as select * from s1;
statement ok
create materialized view mv4 as select * from s2;

sleep 10s
statement ok
alter table t add column v2 varchar;

sleep 5s

statement ok
flush;
Expand Down Expand Up @@ -84,14 +101,19 @@ select * from mv4
----
11 NULL

query IT
select * from t
----
1 NULL

# alter source again
statement ok
alter source s1 add column v3 int;

statement ok
create materialized view mv5 as select * from s1;

sleep 10s
sleep 5s

statement ok
flush;
Expand Down
15 changes: 15 additions & 0 deletions e2e_test/source/basic/alter/kafka_after_new_data.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ select * from mv5
1 11 111
2 22 222

query IT rowsort
select * from t
----
1 NULL
2 22

statement ok
alter table t add column v3 int;

query IT rowsort
select * from t
----
1 NULL NULL
2 22 NULL

statement ok
drop materialized view mv1

Expand Down
14 changes: 14 additions & 0 deletions e2e_test/source/basic/alter/kafka_after_new_data_2.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
sleep 5s

statement ok
flush;

query IT rowsort
select * from t
----
1 NULL NULL
2 22 NULL
3 33 333

statement ok
drop table t;
2 changes: 2 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ message ReplaceTablePlanRequest {
stream_plan.StreamFragmentGraph fragment_graph = 2;
// The mapping from the old columns to the new columns of the table.
catalog.ColIndexMapping table_col_index_mapping = 3;
// Source catalog of table's associated source
catalog.Source source = 4;
}

message ReplaceTablePlanResponse {
Expand Down
1 change: 1 addition & 0 deletions scripts/source/alter_data/kafka_alter.3
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"v1": 3, "v2": "33", "v3": 333}
4 changes: 3 additions & 1 deletion src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub trait CatalogWriter: Send + Sync {

async fn replace_table(
&self,
source: Option<PbSource>,
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
Expand Down Expand Up @@ -229,13 +230,14 @@ impl CatalogWriter for CatalogWriterImpl {

async fn replace_table(
&self,
source: Option<PbSource>,
table: PbTable,
graph: StreamFragmentGraph,
mapping: ColIndexMapping,
) -> Result<()> {
let version = self
.meta_client
.replace_table(table, graph, mapping)
.replace_table(source, table, graph, mapping)
.await?;
self.wait_version(version).await
}
Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub async fn handle_alter_source_column(
None.into(),
)));
}
SourceEncode::Json if catalog.info.use_schema_registry => {
return Err(RwError::from(ErrorCode::NotImplemented(
"Alter source with schema registry".into(),
None.into(),
)));
}
SourceEncode::Invalid | SourceEncode::Native => {
return Err(RwError::from(ErrorCode::NotSupported(
format!("Alter source with encode {:?}", encode),
Expand Down
84 changes: 60 additions & 24 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
use risingwave_pb::catalog::Table;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{AlterTableOperation, ColumnOption, ObjectName, Statement};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnOption, Encode, ObjectName, SourceSchemaV2, Statement,
};
use risingwave_sqlparser::parser::Parser;

use super::create_source::get_json_schema_location;
use super::create_table::{gen_create_table_plan, ColumnIdGenerator};
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::handler::create_table::gen_create_table_plan_with_source;
use crate::{build_graph, Binder, OptimizerContext, TableCatalog};

/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
Expand All @@ -51,13 +56,6 @@ pub async fn handle_alter_table_column(
reader.get_table_by_name(db_name, schema_path, &real_table_name)?;

match table.table_type() {
// Do not allow altering a table with a connector. It should be done passively according
// to the messages from the connector.
TableType::Table if table.has_associated_source() => {
Err(ErrorCode::InvalidInputSyntax(format!(
"cannot alter table \"{table_name}\" because it has a connector"
)))?
}
TableType::Table => {}

_ => Err(ErrorCode::InvalidInputSyntax(format!(
Expand All @@ -82,9 +80,21 @@ pub async fn handle_alter_table_column(
.context("unable to parse original table definition")?
.try_into()
.unwrap();
let Statement::CreateTable { columns, .. } = &mut definition else {
let Statement::CreateTable { columns, source_schema, .. } = &mut definition else {
panic!("unexpected statement: {:?}", definition);
};
let source_schema = source_schema
.clone()
.map(|source_schema| source_schema.into_source_schema_v2());

if let Some(source_schema) = &source_schema {
if schema_has_schema_registry(source_schema) {
return Err(RwError::from(ErrorCode::NotImplemented(
"Alter table with source having schema registry".into(),
None.into(),
)));
}
}

match operation {
AlterTableOperation::AddColumn {
Expand Down Expand Up @@ -170,20 +180,32 @@ pub async fn handle_alter_table_column(
panic!("unexpected statement type: {:?}", definition);
};

let (graph, table) = {
let (graph, table, source) = {
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, source, table) = gen_create_table_plan(
context,
table_name,
columns,
constraints,
col_id_gen,
source_watermarks,
append_only,
)?;

// We should already have rejected the case where the table has a connector.
assert!(source.is_none());
let (plan, source, table) = match source_schema {
Some(source_schema) => {
gen_create_table_plan_with_source(
context,
table_name,
columns,
constraints,
source_schema,
source_watermarks,
col_id_gen,
append_only,
)
.await?
}
None => gen_create_table_plan(
context,
table_name,
columns,
constraints,
col_id_gen,
source_watermarks,
append_only,
)?,
};

// TODO: avoid this backward conversion.
if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
Expand All @@ -203,10 +225,13 @@ pub async fn handle_alter_table_column(
// Fill the original table ID.
let table = Table {
id: original_catalog.id().table_id(),
optional_associated_source_id: original_catalog
.associated_source_id()
.map(|source_id| OptionalAssociatedSourceId::AssociatedSourceId(source_id.into())),
..table
};

(graph, table)
(graph, table, source)
};

// Calculate the mapping from the original columns to the new columns.
Expand All @@ -226,12 +251,23 @@ pub async fn handle_alter_table_column(
let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(table, graph, col_index_mapping)
.replace_table(source, table, graph, col_index_mapping)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

pub fn schema_has_schema_registry(schema: &SourceSchemaV2) -> bool {
match schema.row_encode {
Encode::Avro | Encode::Protobuf => true,
Encode::Json => {
let mut options = schema.gen_options().unwrap();
matches!(get_json_schema_location(&mut options), Ok(Some(_)))
}
_ => false,
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ fn consume_string_from_options(
))))
}

fn get_json_schema_location(
pub fn get_json_schema_location(
row_options: &mut BTreeMap<String, String>,
) -> Result<Option<(AstString, bool)>> {
let schema_location = try_consume_string_from_options(row_options, "schema.location");
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ pub fn get_connection_name(with_properties: &BTreeMap<String, String>) -> Option
.get(CONNECTION_NAME_KEY)
.map(|s| s.to_lowercase())
}

#[cfg(test)]
mod tests {
use bytes::BytesMut;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl CatalogWriter for MockCatalogWriter {

async fn replace_table(
&self,
_source: Option<PbSource>,
table: PbTable,
_graph: StreamFragmentGraph,
_mapping: ColIndexMapping,
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub enum Command {
new_table_fragments: TableFragments,
merge_updates: Vec<MergeUpdate>,
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: SplitAssignment,
},

/// `SourceSplitAssignment` generates Plain(Mutation::Splits) for pushing initialized splits or
Expand Down Expand Up @@ -352,6 +353,7 @@ impl CommandContext {
old_table_fragments,
merge_updates,
dispatchers,
init_split_assignment,
..
} => {
let dropped_actors = old_table_fragments.actor_ids();
Expand All @@ -368,10 +370,16 @@ impl CommandContext {
})
.collect();

let actor_splits = init_split_assignment
.values()
.flat_map(build_actor_connector_splits)
.collect();

Some(Mutation::Update(UpdateMutation {
actor_new_dispatchers,
merge_update: merge_updates.clone(),
dropped_actors,
actor_splits,
..Default::default()
}))
}
Expand Down Expand Up @@ -761,6 +769,7 @@ impl CommandContext {
new_table_fragments,
merge_updates,
dispatchers,
..
} => {
let table_ids = HashSet::from_iter(std::iter::once(old_table_fragments.table_id()));

Expand Down