Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: redact SqlOption in SHOW CREATE #14310

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions e2e_test/source/basic/pubsub.slt
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,47 @@ select v1, v2 FROM s2;
statement ok
DROP TABLE s2;

# redact pubsub.credentials
statement ok
CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5980',
pubsub.credentials = '123456'
) FORMAT PLAIN ENCODE JSON;

# pubsub.credentials is not redacted for table owner.
query IT rowsort
SHOW CREATE TABLE s3;
----
public.s3 CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'localhost:5980', pubsub.credentials = '123456') FORMAT PLAIN ENCODE JSON

# pubsub.credentials is not redacted for table owner.
query IT rowsort
SELECT definition FROM rw_tables WHERE name='s3';
----
CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'localhost:5980', pubsub.credentials = '123456') FORMAT PLAIN ENCODE JSON

statement ok
CREATE USER other_user;

statement ok
ALTER TABLE s3 owner to other_user;

# pubsub.credentials is redacted for non table owner.
query IT rowsort
SHOW CREATE TABLE s3;
----
public.s3 CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (connector = [REDACTED], pubsub.subscription = [REDACTED], pubsub.emulator_host = [REDACTED], pubsub.credentials = [REDACTED]) FORMAT PLAIN ENCODE JSON

# pubsub.credentials is redacted for non table owner.
query IT rowsort
SELECT definition FROM rw_tables WHERE name='s3';
----
CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (connector = [REDACTED], pubsub.subscription = [REDACTED], pubsub.emulator_host = [REDACTED], pubsub.credentials = [REDACTED]) FORMAT PLAIN ENCODE JSON

statement ok
DROP TABLE s3;

statement ok
DROP USER other_user;
6 changes: 6 additions & 0 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};

use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
use crate::catalog::TableId;
use crate::error::Result;
use crate::user::UserId;
use crate::utils::redact::redact_definition;

/// This struct `SourceCatalog` is used in frontend.
/// Compared with `PbSource`, it only maintains information used during optimization.
Expand Down Expand Up @@ -53,6 +55,10 @@ impl SourceCatalog {
self.definition.clone()
}

pub fn redacted_create_sql(&self) -> Result<String> {
redact_definition(&self.definition)
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbSource {
PbSource {
id: self.id,
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub struct SysCatalogReaderImpl {
config: Arc<RwLock<ConfigMap>>,
// Read system params.
system_params: SystemParamsReaderRef,
user_id: UserId,
}

impl SysCatalogReaderImpl {
Expand All @@ -124,6 +125,7 @@ impl SysCatalogReaderImpl {
auth_context: Arc<AuthContext>,
config: Arc<RwLock<ConfigMap>>,
system_params: SystemParamsReaderRef,
user_id: UserId,
) -> Self {
Self {
catalog_reader,
Expand All @@ -133,6 +135,7 @@ impl SysCatalogReaderImpl {
auth_context,
config,
system_params,
user_id,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_pb::user::grant_privilege::Object;
use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl};
use crate::error::Result;
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::utils::redact::redact_definition;

#[derive(Fields)]
struct RwSink {
Expand All @@ -45,6 +46,7 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
let user_reader = reader.user_info_reader.read_guard();
let users = user_reader.get_all_users();
let username_map = user_reader.get_user_name_map();
let user_id = reader.user_id;

Ok(schemas
.flat_map(|schema| {
Expand All @@ -61,7 +63,12 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {
.to_uppercase(),
sink_type: sink.sink_type.to_proto().as_str_name().into(),
connection_id: sink.connection_id.map(|id| id.connection_id() as i32),
definition: sink.create_sql(),
definition: if sink.owner.user_id == user_id {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this (only showing raw SQL if the user is the owner) is a common behavior in other DBMS? Even so, I guess it would be based on the permission system but not simply determined by the ownership. cc @yezizp2012

In my personal opinion I would suggest adding a new column of redacted_definition. Users are allowed to decide which form to reveal or share with us.

sink.create_sql()
} else {
redact_definition(&sink.create_sql())
.unwrap_or_else(|_| "unable to redact definition".into())
},
acl: get_acl_items(
&Object::SinkId(sink.id.sink_id),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>>
let user_reader = reader.user_info_reader.read_guard();
let users = user_reader.get_all_users();
let username_map = user_reader.get_user_name_map();
let user_id = reader.user_id;

Ok(schemas
.flat_map(|schema| {
Expand Down Expand Up @@ -78,7 +79,13 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>>
.map(|row_encode| row_encode.as_str_name().into()),
append_only: source.append_only,
connection_id: source.connection_id.map(|id| id as i32),
definition: source.create_sql(),
definition: if source.owner == user_id {
source.create_sql()
} else {
source
.redacted_create_sql()
.unwrap_or_else(|_| "unable to redact definition".into())
},
acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn read_rw_table_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwTable>> {
let user_reader = reader.user_info_reader.read_guard();
let users = user_reader.get_all_users();
let username_map = user_reader.get_user_name_map();
let user_id = reader.user_id;

Ok(schemas
.flat_map(|schema| {
Expand All @@ -49,7 +50,13 @@ fn read_rw_table_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwTable>> {
name: table.name().to_string(),
schema_id: schema.id() as i32,
owner: table.owner as i32,
definition: table.create_sql(),
definition: if table.owner == user_id {
table.create_sql()
} else {
table
.redacted_create_sql()
.unwrap_or_else(|_| "unable to redact definition".into())
},
acl: get_acl_items(
&Object::TableId(table.id.table_id),
false,
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;

use super::{ColumnId, DatabaseId, FragmentId, OwnedByUserCatalog, SchemaId, SinkId};
use crate::error::{ErrorCode, RwError};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::ExprImpl;
use crate::optimizer::property::Cardinality;
use crate::user::UserId;
use crate::utils::redact::redact_definition;

/// Includes full information about a table.
///
Expand Down Expand Up @@ -396,6 +397,10 @@ impl TableCatalog {
self.definition.clone()
}

pub fn redacted_create_sql(&self) -> Result<String> {
redact_definition(&self.definition)
}

/// Get a reference to the table catalog's version.
pub fn version(&self) -> Option<&TableVersion> {
self.version.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub fn alter_definition_add_column(definition: &str, column: ColumnDef) -> Resul
_ => unreachable!(),
}

Ok(stmt.to_string())
Ok(stmt.to_unredacted_string())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask how you find all references of to_string call on Statement? That is to say, how could we avoid showing an unredacted string where we should redact it in the future or vice versa?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how you find all references of to_string call on Statement?

Most of them are found by test failure 🥵

how could we avoid showing an unredacted string where we should redact it

to_string or display are redacted by default.

or vice versa

Caller is responsible for explicitly using to_unredacted_string. Similar to crdb's behavior: "everything gets redacted except for those bits which we know are safe"

}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_source_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub fn alter_definition_format_encode(
_ => unreachable!(),
}

Ok(stmt.to_string())
Ok(stmt.to_unredacted_string())
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/extended_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub fn handle_parse(
specific_param_types: Vec<Option<DataType>>,
) -> Result<PrepareStatement> {
session.clear_cancel_query_flag();
let sql: Arc<str> = Arc::from(statement.to_string());
let sql: Arc<str> = Arc::from(statement.to_unredacted_string());
let handler_args = HandlerArgs::new(session, &statement, sql)?;
match &statement {
Statement::Query(_)
Expand Down Expand Up @@ -177,7 +177,7 @@ pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result
Portal::Portal(portal) => {
session.clear_cancel_query_flag();
let _guard = session.txn_begin_implicit(); // TODO(bugen): is this behavior correct?
let sql: Arc<str> = Arc::from(portal.statement.to_string());
let sql: Arc<str> = Arc::from(portal.statement.to_unredacted_string());
let handler_args = HandlerArgs::new(session, &portal.statement, sql)?;
match &portal.statement {
Statement::Query(_)
Expand All @@ -188,7 +188,7 @@ pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result
}
}
Portal::PureStatement(stmt) => {
let sql: Arc<str> = Arc::from(stmt.to_string());
let sql: Arc<str> = Arc::from(stmt.to_unredacted_string());
handle(session, stmt, sql, vec![]).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl HandlerArgs {
}
_ => {}
}
stmt.to_string()
stmt.to_unredacted_string()
}
}

Expand Down
20 changes: 17 additions & 3 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::catalog::{CatalogError, IndexCatalog};
use crate::error::Result;
use crate::handler::HandlerArgs;
use crate::session::SessionImpl;
use crate::utils::redact::redact_definition;

pub fn get_columns_from_table(
session: &SessionImpl,
Expand Down Expand Up @@ -462,6 +463,7 @@ pub fn handle_show_create_object(
Binder::resolve_schema_qualified_name(session.database(), name.clone())?;
let schema_name = schema_name.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
let schema = catalog_reader.get_schema_by_name(session.database(), &schema_name)?;
let user_id = session.user_id();
let sql = match show_create_type {
ShowCreateType::MaterializedView => {
let mv = schema
Expand All @@ -481,20 +483,32 @@ pub fn handle_show_create_object(
.get_table_by_name(&object_name)
.filter(|t| t.is_table())
.ok_or_else(|| CatalogError::NotFound("table", name.to_string()))?;
table.create_sql()
if table.owner != user_id {
table.redacted_create_sql()?
} else {
table.create_sql()
}
}
ShowCreateType::Sink => {
let sink = schema
.get_sink_by_name(&object_name)
.ok_or_else(|| CatalogError::NotFound("sink", name.to_string()))?;
sink.create_sql()
if sink.owner.user_id != user_id {
redact_definition(&sink.create_sql())?
} else {
sink.create_sql()
}
}
ShowCreateType::Source => {
let source = schema
.get_source_by_name(&object_name)
.filter(|s| s.associated_table_id.is_none())
.ok_or_else(|| CatalogError::NotFound("source", name.to_string()))?;
source.create_sql()
if source.owner != user_id {
source.redacted_create_sql()?
} else {
source.create_sql()
}
}
ShowCreateType::Index => {
let index = schema
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl BatchTaskContext for FrontendBatchTaskContext {
self.session.auth_context(),
self.session.shared_config(),
self.session.env().system_params_manager().get_params(),
self.session.user_id(),
))
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ impl Session for SessionImpl {
stmt: Statement,
format: Format,
) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
let string = stmt.to_string();
let string = stmt.to_unredacted_string();
let sql_str = string.as_str();
let sql: Arc<str> = Arc::from(sql_str);
let rsp = handle(self, stmt, sql.clone(), vec![format])
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ mod index_set;
pub use index_set::*;
pub(crate) mod group_by;
pub mod overwrite_options;
pub mod redact;

pub use group_by::*;
pub use overwrite_options::*;

Expand Down
26 changes: 26 additions & 0 deletions src/frontend/src/utils/redact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use risingwave_sqlparser::ast::REDACT_SQL_OPTION;

use crate::error::Result;

pub fn redact_definition(definition: &str) -> Result<String> {
let [stmt]: [_; 1] = risingwave_sqlparser::parser::Parser::parse_sql(definition)
.context("unable to parse definition")?
.try_into()
.unwrap();
Ok(REDACT_SQL_OPTION.sync_scope(true, || stmt.to_string()))
}
4 changes: 2 additions & 2 deletions src/meta/src/controller/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub fn alter_relation_rename(definition: &str, new_name: &str) -> String {
_ => unreachable!(),
};

stmt.to_string()
stmt.to_unredacted_string()
}

/// `alter_relation_rename_refs` updates all references of renamed-relation in the definition of
Expand Down Expand Up @@ -113,7 +113,7 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
}
_ => unreachable!(),
};
stmt.to_string()
stmt.to_unredacted_string()
}

/// Replace the last ident in the `table_name` with the given name, the object name is ensured to be
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/manager/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn alter_relation_rename(definition: &str, new_name: &str) -> String {
_ => unreachable!(),
};

stmt.to_string()
stmt.to_unredacted_string()
}

/// `alter_relation_rename_refs` updates all references of renamed-relation in the definition of
Expand Down Expand Up @@ -141,7 +141,8 @@ pub fn alter_relation_rename_refs(definition: &str, from: &str, to: &str) -> Str
}
_ => unreachable!(),
};
stmt.to_string()

stmt.to_unredacted_string()
}

/// Replace the last ident in the `table_name` with the given name, the object name is ensured to be
Expand Down