Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta, frontend): support drop cascade for tables, mviews, views, indexes, sources and sinks #11250

Merged
merged 11 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions e2e_test/ddl/drop/drop_index_cascade.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
statement ok
set rw_streaming_enable_delta_join = true;

statement ok
create table a (a1 int, a2 int);

statement ok
create index i_a1 on a(a1);

statement ok
create table b (b1 int, b2 int);

statement ok
create index i_b1 on b(b1);

statement ok
create materialized view v as select * from a join b on a.a1 = b.b1;

statement ok
drop index i_a1 cascade;

statement ok
drop table a;

statement ok
drop table b;

statement ok
set rw_streaming_enable_delta_join = false;
16 changes: 16 additions & 0 deletions e2e_test/ddl/drop/drop_mv_cascade.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
statement ok
create materialized view mv as select 1 a

statement ok
CREATE view v AS select * from mv;

statement ok
CREATE view mv2 AS select * from v;

statement ok
CREATE SINK my_sink AS select * from mv2 WITH (
connector = 'blackhole'
);

statement ok
drop materialized view mv cascade;
21 changes: 21 additions & 0 deletions e2e_test/ddl/drop/drop_source_cascade.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
statement ok
create source my_source (a int, b int) with (
connector = 'datagen',
datagen.rows.per.second = '15',
datagen.split.num = '1'
);

statement ok
create view v as select * from my_source;

statement ok
create materialized view mv as select * from v;

statement ok
CREATE SINK my_sink AS select * from mv WITH (
connector = 'blackhole'
);

statement ok
drop source my_source cascade;

17 changes: 17 additions & 0 deletions e2e_test/ddl/drop/drop_table_cascade.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
statement ok
create table t (a int, b int);

statement ok
create view v as select * from t;

statement ok
create materialized view mv as select * from v;

statement ok
CREATE SINK my_sink AS select * from mv WITH (
connector = 'blackhole'
);

statement ok
drop table t cascade;

14 changes: 14 additions & 0 deletions e2e_test/ddl/drop/drop_view_cascade.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
statement ok
create view v as select 1;

statement ok
create materialized view mv as select * from v;

statement ok
CREATE SINK my_sink AS select * from mv WITH (
connector = 'blackhole'
);

statement ok
drop view v cascade;

6 changes: 6 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ message CreateSourceResponse {

message DropSourceRequest {
uint32 source_id = 1;
bool cascade = 2;
}

message DropSourceResponse {
Expand All @@ -79,6 +80,7 @@ message CreateSinkResponse {

message DropSinkRequest {
uint32 sink_id = 1;
bool cascade = 2;
}

message DropSinkResponse {
Expand All @@ -99,6 +101,7 @@ message CreateMaterializedViewResponse {

message DropMaterializedViewRequest {
uint32 table_id = 1;
bool cascade = 2;
}

message DropMaterializedViewResponse {
Expand All @@ -118,6 +121,7 @@ message CreateViewResponse {

message DropViewRequest {
uint32 view_id = 1;
bool cascade = 2;
}

message DropViewResponse {
Expand Down Expand Up @@ -179,6 +183,7 @@ message DropTableRequest {
uint32 id = 1;
}
uint32 table_id = 2;
bool cascade = 3;
}

message DropTableResponse {
Expand Down Expand Up @@ -208,6 +213,7 @@ message CreateIndexResponse {

message DropIndexRequest {
uint32 index_id = 1;
bool cascade = 2;
}

message DropIndexResponse {
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use risingwave_frontend::{
build_graph, explain_stream_graph, Binder, Explain, FrontendOpts, OptimizerContext,
OptimizerContextRef, PlanRef, Planner, WithOptions,
};
use risingwave_sqlparser::ast::{EmitMode, ExplainOptions, ObjectName, Statement};
use risingwave_sqlparser::ast::{
AstOption, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
};
use risingwave_sqlparser::parser::Parser;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -508,6 +510,7 @@ impl TestCase {
handler_args,
drop_statement.object_name,
drop_statement.if_exists,
matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
)
.await?;
}
Expand Down
52 changes: 34 additions & 18 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,26 @@ pub trait CatalogWriter: Send + Sync {
connection: create_connection_request::Payload,
) -> Result<()>;

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()>;
async fn drop_table(
&self,
source_id: Option<u32>,
table_id: TableId,
cascade: bool,
) -> Result<()>;

async fn drop_materialized_view(&self, table_id: TableId) -> Result<()>;
async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()>;

async fn drop_view(&self, view_id: u32) -> Result<()>;
async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()>;

async fn drop_source(&self, source_id: u32) -> Result<()>;
async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()>;

async fn drop_sink(&self, sink_id: u32) -> Result<()>;
async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()>;

async fn drop_database(&self, database_id: u32) -> Result<()>;

async fn drop_schema(&self, schema_id: u32) -> Result<()>;

async fn drop_index(&self, index_id: IndexId) -> Result<()>;
async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()>;

async fn drop_function(&self, function_id: FunctionId) -> Result<()>;

Expand Down Expand Up @@ -264,33 +269,44 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn drop_table(&self, source_id: Option<u32>, table_id: TableId) -> Result<()> {
let version = self.meta_client.drop_table(source_id, table_id).await?;
async fn drop_table(
&self,
source_id: Option<u32>,
table_id: TableId,
cascade: bool,
) -> Result<()> {
let version = self
.meta_client
.drop_table(source_id, table_id, cascade)
.await?;
self.wait_version(version).await
}

async fn drop_materialized_view(&self, table_id: TableId) -> Result<()> {
let version = self.meta_client.drop_materialized_view(table_id).await?;
async fn drop_materialized_view(&self, table_id: TableId, cascade: bool) -> Result<()> {
let version = self
.meta_client
.drop_materialized_view(table_id, cascade)
.await?;
self.wait_version(version).await
}

async fn drop_view(&self, view_id: u32) -> Result<()> {
let version = self.meta_client.drop_view(view_id).await?;
async fn drop_view(&self, view_id: u32, cascade: bool) -> Result<()> {
let version = self.meta_client.drop_view(view_id, cascade).await?;
self.wait_version(version).await
}

async fn drop_source(&self, source_id: u32) -> Result<()> {
let version = self.meta_client.drop_source(source_id).await?;
async fn drop_source(&self, source_id: u32, cascade: bool) -> Result<()> {
let version = self.meta_client.drop_source(source_id, cascade).await?;
self.wait_version(version).await
}

async fn drop_sink(&self, sink_id: u32) -> Result<()> {
let version = self.meta_client.drop_sink(sink_id).await?;
async fn drop_sink(&self, sink_id: u32, cascade: bool) -> Result<()> {
let version = self.meta_client.drop_sink(sink_id, cascade).await?;
self.wait_version(version).await
}

async fn drop_index(&self, index_id: IndexId) -> Result<()> {
let version = self.meta_client.drop_index(index_id).await?;
async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result<()> {
let version = self.meta_client.drop_index(index_id, cascade).await?;
self.wait_version(version).await
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/drop_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub async fn handle_drop_index(
handler_args: HandlerArgs,
index_name: ObjectName,
if_exists: bool,
cascade: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -81,7 +82,7 @@ pub async fn handle_drop_index(
};

let catalog_writer = session.catalog_writer()?;
catalog_writer.drop_index(index_id).await?;
catalog_writer.drop_index(index_id, cascade).await?;

Ok(PgResponse::empty_result(StatementType::DROP_INDEX))
}
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/handler/drop_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub async fn handle_drop_mv(
handler_args: HandlerArgs,
table_name: ObjectName,
if_exists: bool,
cascade: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -71,7 +72,9 @@ pub async fn handle_drop_mv(
};

let catalog_writer = session.catalog_writer()?;
catalog_writer.drop_materialized_view(table_id).await?;
catalog_writer
.drop_materialized_view(table_id, cascade)
.await?;

Ok(PgResponse::empty_result(
StatementType::DROP_MATERIALIZED_VIEW,
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub async fn handle_drop_sink(
handler_args: HandlerArgs,
sink_name: ObjectName,
if_exists: bool,
cascade: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -55,7 +56,7 @@ pub async fn handle_drop_sink(
};

let catalog_writer = session.catalog_writer()?;
catalog_writer.drop_sink(sink_id.sink_id).await?;
catalog_writer.drop_sink(sink_id.sink_id, cascade).await?;

Ok(PgResponse::empty_result(StatementType::DROP_SINK))
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/drop_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub async fn handle_drop_source(
handler_args: HandlerArgs,
name: ObjectName,
if_exists: bool,
cascade: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -62,7 +63,7 @@ pub async fn handle_drop_source(
session.check_privilege_for_drop_alter(schema_name, &*source)?;

let catalog_writer = session.catalog_writer()?;
catalog_writer.drop_source(source.id).await?;
catalog_writer.drop_source(source.id, cascade).await?;

Ok(PgResponse::empty_result(StatementType::DROP_SOURCE))
}
3 changes: 2 additions & 1 deletion src/frontend/src/handler/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub async fn handle_drop_table(
handler_args: HandlerArgs,
table_name: ObjectName,
if_exists: bool,
cascade: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -62,7 +63,7 @@ pub async fn handle_drop_table(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.drop_table(source_id.map(|id| id.table_id), table_id)
.drop_table(source_id.map(|id| id.table_id), table_id, cascade)
.await?;

Ok(PgResponse::empty_result(StatementType::DROP_TABLE))
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/drop_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub async fn handle_drop_view(
handler_args: HandlerArgs,
table_name: ObjectName,
if_exists: bool,
cascade: bool,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
Expand Down Expand Up @@ -56,7 +57,7 @@ pub async fn handle_drop_view(
};

let catalog_writer = session.catalog_writer()?;
catalog_writer.drop_view(view_id).await?;
catalog_writer.drop_view(view_id, cascade).await?;

Ok(PgResponse::empty_result(StatementType::DROP_VIEW))
}
Loading
Loading