Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): support alter rate limit in mv, source, table with source #16399

Merged
merged 26 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ risedev slt './e2e_test/source/basic/*.slt'
risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt'
risedev slt './e2e_test/source/basic/alter/kafka.slt'

echo "--- e2e, kafka alter source rate limit"
risedev slt './e2e_test/source/basic/alter/rate_limit_source_kafka.slt'
risedev slt './e2e_test/source/basic/alter/rate_limit_table_kafka.slt'

echo "--- e2e, kafka alter source"
chmod +x ./scripts/source/prepare_data_after_alter.sh
./scripts/source/prepare_data_after_alter.sh 2
Expand Down
130 changes: 130 additions & 0 deletions e2e_test/source/basic/alter/rate_limit_source_kafka.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_source',
type = 'append-only',
force_append_only='true',
connector = 'kafka'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
connector = 'kafka',
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

statement ok
SET STREAMING_RATE_LIMIT=0;

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

statement ok
create materialized view rl_mv3 as select count(*) from kafka_source;

statement ok
SET STREAMING_RATE_LIMIT=default;

############## MVs should have 0 records, since source has (rate_limit = 0)

statement ok
flush;

query I
select * from rl_mv1;
----
0

query I
select * from rl_mv2;
----
0

query I
select * from rl_mv3;
----
0

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to 1000;

skipif in-memory
query I
alter source kafka_source set streaming_rate_limit to default;

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) > 0 from rl_mv1;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv2;
----
t

skipif in-memory
query I
select count(*) > 0 from rl_mv3;
----
t

############## Cleanup

statement ok
drop materialized view rl_mv1;

statement ok
drop materialized view rl_mv2;

statement ok
drop materialized view rl_mv3;

statement ok
drop source kafka_source;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;
88 changes: 88 additions & 0 deletions e2e_test/source/basic/alter/rate_limit_table_kafka.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_source',
type = 'append-only',
force_append_only='true',
connector = 'kafka'
);

############## Source from kafka (rate_limit = 0)

statement ok
create table kafka_source (v1 int) with (
connector = 'kafka',
topic = 'kafka_source',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
streaming_rate_limit = 0
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) from kafka_source;
----
0

############## Can still insert data when rate limit = 0

statement ok
insert into kafka_source values(1);

statement ok
flush;

query I
select count(*) from kafka_source;
----
1

############## Alter source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to 1000;

skipif in-memory
query I
alter table kafka_source set streaming_rate_limit to default;

skipif in-memory
sleep 3s

skipif in-memory
query I
select count(*) > 1 from kafka_source;
----
t

############## Cleanup

statement ok
drop table kafka_source;

statement ok
drop sink kafka_sink;

statement ok
drop table kafka_seed_data;
35 changes: 35 additions & 0 deletions e2e_test/streaming/rate_limit/alter_rate_limit.slt.ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- This test is ignored until alter mv rate limit is fully supported.

statement ok
CREATE TABLE t (v1 int);

statement ok
INSERT INTO t VALUES (1);

statement ok
flush;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE MATERIALIZED VIEW streaming_rate_limit_0 with ( streaming_rate_limit = 0 ) AS SELECT * FROM t;

skipif in-memory
sleep 1s

query I
select progress from rw_ddl_progress;
----
0.00%

statement ok
ALTER MATERIALIZED VIEW streaming_rate_limit_0 SET STREAMING_RATE_LIMIT = 1;

statement ok
wait;

query I
select * from streaming_rate_limit_0;
----
1
1 change: 1 addition & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ enum ThrottleTarget {
THROTTLE_TARGET_UNSPECIFIED = 0;
SOURCE = 1;
MV = 2;
TABLE_WITH_SOURCE = 3;
}

message ApplyThrottleRequest {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ impl Catalog {
))
}

/// Used to get `TableCatalog` for Materialized Views, Tables and Indexes.
pub fn get_table_by_name<'a>(
&self,
db_name: &str,
Expand Down
89 changes: 89 additions & 0 deletions src/frontend/src/handler/alter_streaming_rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_pb::meta::PbThrottleTarget;
use risingwave_sqlparser::ast::ObjectName;

use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result};
use crate::Binder;

pub async fn handle_alter_streaming_rate_limit(
handler_args: HandlerArgs,
kind: PbThrottleTarget,
table_name: ObjectName,
rate_limit: i32,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let (schema_name, real_table_name) =
Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (stmt_type, id) = match kind {
PbThrottleTarget::Mv => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_table_by_name(db_name, schema_path, &real_table_name)?;
if table.table_type != TableType::MaterializedView {
return Err(ErrorCode::InvalidInputSyntax(format!(
"\"{table_name}\" is not a materialized view",
))
.into());
}
session.check_privilege_for_drop_alter(schema_name, &**table)?;
(StatementType::ALTER_MATERIALIZED_VIEW, table.id.table_id)
}
PbThrottleTarget::Source => {
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
(StatementType::ALTER_SOURCE, source.id)
}
PbThrottleTarget::TableWithSource => {
let reader = session.env().catalog_reader().read_guard();
let (table, schema_name) =
reader.get_table_by_name(db_name, schema_path, &real_table_name)?;
session.check_privilege_for_drop_alter(schema_name, &**table)?;
// Get the corresponding source catalog.
let source_id = if let Some(id) = table.associated_source_id {
id.table_id()
} else {
bail!("ALTER STREAMING_RATE_LIMIT is not for table without source")
};
(StatementType::ALTER_SOURCE, source_id)
}
_ => bail!("Unsupported throttle target: {:?}", kind),
};

let meta_client = session.env().meta_client();

let rate_limit = if rate_limit < 0 {
None
} else {
Some(rate_limit as u32)
};

meta_client.apply_throttle(kind, id, rate_limit).await?;

Ok(PgResponse::empty_result(stmt_type))
}
Loading
Loading