diff --git a/e2e_test/batch/catalog/has_privilege.slt.part b/e2e_test/batch/catalog/has_privilege.slt.part new file mode 100644 index 000000000000..21c12bac4559 --- /dev/null +++ b/e2e_test/batch/catalog/has_privilege.slt.part @@ -0,0 +1,228 @@ +statement ok +CREATE USER test_user; + +statement ok +CREATE SCHEMA test_schema; + +statement ok +CREATE TABLE foo (id INT, name VARCHAR); + +statement ok +CREATE VIEW foo_view AS SELECT * FROM foo; + +statement ok +CREATE INDEX foo_index ON foo(id); + +statement ok +CREATE MATERIALIZED VIEW foo_mv AS SELECT * FROM foo; + +statement ok +CREATE SOURCE foo_source (a int, b int) with ( + connector = 'datagen', + datagen.rows.per.second = '1', + datagen.split.num = '1' +); + +statement ok +CREATE TABLE bar (id INT); + +statement ok +GRANT ALL PRIVILEGES ON foo TO test_user GRANTED BY root; + +statement ok +GRANT INSERT ON bar TO test_user WITH GRANT OPTION GRANTED BY root; + +statement ok +GRANT SELECT ON ALL TABLES IN SCHEMA public TO test_user WITH GRANT OPTION GRANTED BY root; + +statement ok +GRANT SELECT ON ALL MATERIALIZED VIEWS IN SCHEMA public TO test_user WITH GRANT OPTION GRANTED BY root; + +statement ok +GRANT SELECT ON ALL SOURCES IN SCHEMA public TO test_user WITH GRANT OPTION GRANTED BY root; + +statement ok +GRANT CREATE ON SCHEMA test_schema TO test_user; + +query error Invalid parameter user: User test_user_err not found +SELECT has_table_privilege('test_user_err', 'foo', 'SELECT'); + +query error Invalid parameter name: class not found: foo_err +SELECT has_table_privilege('test_user', 'foo_err', 'SELECT'); + +query error Invalid parameter privilege: unrecognized privilege type: "SELE CT" +SELECT has_table_privilege('test_user', 'foo', 'SELE CT'); + +query error Invalid parameter privilege: unrecognized privilege type: "SELECT INSERT" +SELECT has_table_privilege('test_user', 'foo', 'SELECT INSERT'); + +query error Invalid parameter privilege +SELECT has_table_privilege('test_user', 'foo', 'SELECT, INSERT WITH GRANT OPTION'); + +query error Invalid parameter user: User test_user_err not found +SELECT has_schema_privilege('test_user_err', 'test_schema', 'CREATE'); + +query error Invalid parameter schema: schema not found: test_schema_err +SELECT has_schema_privilege('test_user', 'test_schema_err', 'CREATE'); + +query error Invalid parameter privilege: unrecognized privilege type: "INSERT" +SELECT has_schema_privilege('test_user', 'test_schema', 'INSERT'); + +query error Invalid parameter privilege: unrecognized privilege type: "DELETE" +SELECT has_any_column_privilege('test_user', 'foo_mv'::regclass, 'DELETE'); + +query I +SELECT has_table_privilege('test_user', 'foo', 'SELECT'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo', 'SELECT WITH GRANT OPTION'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo', 'INSERT WITH GRANT OPTION'); +---- +f + +query I +SELECT has_table_privilege('test_user', 'foo', 'INSERT, SELECT WITH GRANT OPTION'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo', 'DELETE, INSERT, SELECT WITH GRANT OPTION'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo', 'DELETE WITH GRANT OPTION, INSERT, SELECT WITH GRANT OPTION'); +---- +f + +# FIXME(Kexiang): Currently, RW's grant privilege on all table doesn't apply to VIEWS. +query I +SELECT has_table_privilege('test_user', 'foo_view', 'SELECT'); +---- +f + +query I +SELECT has_table_privilege('test_user', 'foo_view'::regclass, 'INSERT'); +---- +f + +query I +SELECT has_any_column_privilege('test_user', 'foo_view'::regclass, 'INSERT'); +---- +f + +query I +SELECT has_table_privilege('test_user', 'foo_mv', 'SELECT'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo_mv'::regclass, 'SELECT WITH GRANT OPTION'); +---- +t + +query I +SELECT has_any_column_privilege('test_user', 'foo_mv'::regclass, 'SELECT WITH GRANT OPTION'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo_mv', 'INSERT'); +---- +f + +query I +SELECT has_table_privilege('test_user', 'foo_source'::regclass, 'SELECT'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo_source', 'INSERT'); +---- +f + +# Indexes are granted by `GRANT SELECT ON ALL MATERIALIZED VIEWS` +query I +SELECT has_table_privilege('test_user', 'foo_index'::regclass, 'SELECT'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'foo_index', 'INSERT'); +---- +f + +query I +SELECT has_table_privilege('test_user', 'bar', 'INSERT'); +---- +t + +query I +SELECT has_table_privilege('bar', 'INSERT'); +---- +t + +query I +SELECT has_table_privilege('bar'::regclass, 'SELECT'); +---- +t + +query I +SELECT has_table_privilege('bar'::regclass, 'SELECT'); +---- +t + +query I +SELECT has_table_privilege('test_user', 'bar', 'UPDATE'); +---- +f + +query I +SELECT has_table_privilege('test_user', 'bar'::regclass, 'INSERT WITH GRANT OPTION'); +---- +t + +query I +SELECT has_schema_privilege('public', 'USAGE'); +---- +t + +query I +SELECT has_schema_privilege('test_user', 'test_schema', 'USAGE'); +---- +f + +query I +SELECT has_schema_privilege('test_user', 'test_schema', 'CREATE'); +---- +t + +statement ok +DROP SOURCE foo_source; + +statement ok +DROP MATERIALIZED VIEW foo_mv; + +statement ok +DROP INDEX foo_index; + +statement ok +DROP VIEW foo_view; + +statement ok +DROP TABLE foo; + +statement ok +DROP TABLE bar; + +statement ok +DROP SCHEMA test_schema; + +statement ok +DROP USER test_user; diff --git a/proto/expr.proto b/proto/expr.proto index c8a9887c1663..1ba1f052eb9c 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -299,6 +299,9 @@ message ExprNode { PG_INDEXES_SIZE = 2404; PG_RELATION_SIZE = 2405; PG_GET_SERIAL_SEQUENCE = 2406; + HAS_TABLE_PRIVILEGE = 2407; + HAS_ANY_COLUMN_PRIVILEGE = 2408; + HAS_SCHEMA_PRIVILEGE = 2409; // EXTERNAL ICEBERG_TRANSFORM = 2201; diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 2134e08fe8c6..393e27081572 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1303,6 +1303,51 @@ impl Binder { ("pg_get_partkeydef", raw_literal(ExprImpl::literal_null(DataType::Varchar))), ("pg_encoding_to_char", raw_literal(ExprImpl::literal_varchar("UTF8".into()))), ("has_database_privilege", raw_literal(ExprImpl::literal_bool(true))), + ("has_table_privilege", raw(|binder, mut inputs|{ + if inputs.len() == 2 { + inputs.insert(0, ExprImpl::literal_varchar(binder.auth_context.user_name.clone())); + } + if inputs.len() == 3 { + if inputs[1].return_type() == DataType::Varchar { + inputs[1].cast_to_regclass_mut()?; + } + Ok(FunctionCall::new(ExprType::HasTablePrivilege, inputs)?.into()) + } else { + Err(ErrorCode::ExprError( + "Too many/few arguments for pg_catalog.has_table_privilege()".into(), + ) + .into()) + } + })), + ("has_any_column_privilege", raw(|binder, mut inputs|{ + if inputs.len() == 2 { + inputs.insert(0, ExprImpl::literal_varchar(binder.auth_context.user_name.clone())); + } + if inputs.len() == 3 { + if inputs[1].return_type() == DataType::Varchar { + inputs[1].cast_to_regclass_mut()?; + } + Ok(FunctionCall::new(ExprType::HasAnyColumnPrivilege, inputs)?.into()) + } else { + Err(ErrorCode::ExprError( + "Too many/few arguments for pg_catalog.has_any_column_privilege()".into(), + ) + .into()) + } + })), + ("has_schema_privilege", raw(|binder, mut inputs|{ + if inputs.len() == 2 { + inputs.insert(0, ExprImpl::literal_varchar(binder.auth_context.user_name.clone())); + } + if inputs.len() == 3 { + Ok(FunctionCall::new(ExprType::HasSchemaPrivilege, inputs)?.into()) + } else { + Err(ErrorCode::ExprError( + "Too many/few arguments for pg_catalog.has_schema_privilege()".into(), + ) + .into()) + } + })), ("pg_stat_get_numscans", raw_literal(ExprImpl::literal_bigint(0))), ("pg_backend_pid", raw(|binder, _inputs| { // FIXME: the session id is not global unique in multi-frontend env. diff --git a/src/frontend/src/catalog/database_catalog.rs b/src/frontend/src/catalog/database_catalog.rs index ec040e309eba..c562e5fc77a8 100644 --- a/src/frontend/src/catalog/database_catalog.rs +++ b/src/frontend/src/catalog/database_catalog.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use itertools::Itertools; use risingwave_common::catalog::PG_CATALOG_SCHEMA_NAME; use risingwave_pb::catalog::{PbDatabase, PbSchema}; +use risingwave_pb::user::grant_privilege::Object; use super::OwnedByUserCatalog; use crate::catalog::schema_catalog::SchemaCatalog; @@ -99,6 +100,16 @@ impl DatabaseCatalog { .find(|schema| schema.get_table_by_id(table_id).is_some()) } + pub fn get_grant_object_by_oid(&self, oid: u32) -> Option { + for schema in self.schema_by_name.values() { + let object = schema.get_grant_object_by_oid(oid); + if object.is_some() { + return object; + } + } + None + } + pub fn update_schema(&mut self, prost: &PbSchema) { let id = prost.id; let name = prost.name.clone(); diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 56de1d743da4..20a99ad820af 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -24,6 +24,7 @@ pub use risingwave_expr::sig::*; use risingwave_pb::catalog::{ PbConnection, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbSubscription, PbTable, PbView, }; +use risingwave_pb::user::grant_privilege::Object; use super::subscription_catalog::SubscriptionCatalog; use super::{OwnedByUserCatalog, SubscriptionId}; @@ -703,6 +704,23 @@ impl SchemaCatalog { .map(|s| s.to_owned()) } + pub fn get_grant_object_by_oid(&self, oid: u32) -> Option { + #[allow(clippy::manual_map)] + if self.get_table_by_id(&TableId::new(oid)).is_some() + || self.get_index_by_id(&IndexId::new(oid)).is_some() + { + Some(Object::TableId(oid)) + } else if self.get_source_by_id(&oid).is_some() { + Some(Object::SourceId(oid)) + } else if self.get_sink_by_id(&oid).is_some() { + Some(Object::SinkId(oid)) + } else if self.get_view_by_id(&oid).is_some() { + Some(Object::ViewId(oid)) + } else { + None + } + } + pub fn id(&self) -> SchemaId { self.id } diff --git a/src/frontend/src/expr/function_impl/has_privilege.rs b/src/frontend/src/expr/function_impl/has_privilege.rs new file mode 100644 index 000000000000..735a9dd1b6d9 --- /dev/null +++ b/src/frontend/src/expr/function_impl/has_privilege.rs @@ -0,0 +1,189 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use risingwave_expr::{capture_context, function, ExprError, Result}; +use risingwave_pb::user::grant_privilege::{Action, Object}; +use thiserror_ext::AsReport; + +use super::context::{CATALOG_READER, DB_NAME, USER_INFO_READER}; +use crate::catalog::CatalogReader; +use crate::user::user_service::UserInfoReader; + +#[inline(always)] +pub fn user_not_found_err(inner_err: &str) -> ExprError { + ExprError::InvalidParam { + name: "user", + reason: inner_err.into(), + } +} + +#[function("has_table_privilege(int4, int4, varchar) -> boolean")] +fn has_table_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result { + // does user have privilege for table + let user_name = get_user_name_by_id_captured(user_id)?; + has_table_privilege_1(user_name.as_str(), table_oid, privileges) +} + +#[function("has_table_privilege(varchar, int4, varchar) -> boolean")] +fn has_table_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result { + let allowed_actions = HashSet::new(); + let actions = parse_privilege(privileges, &allowed_actions)?; + // currently, we haven't support grant for view. + has_privilege_impl_captured( + user_name, + &get_grant_object_by_oid_captured(table_oid)?, + &actions, + ) +} + +#[function("has_any_column_privilege(int4, int4, varchar) -> boolean")] +fn has_any_column_privilege(user_id: i32, table_oid: i32, privileges: &str) -> Result { + // does user have privilege for any column of table + let user_name = get_user_name_by_id_captured(user_id)?; + has_any_column_privilege_1(user_name.as_str(), table_oid, privileges) +} + +#[function("has_any_column_privilege(varchar, int4, varchar) -> boolean")] +fn has_any_column_privilege_1(user_name: &str, table_oid: i32, privileges: &str) -> Result { + let allowed_actions = HashSet::from_iter([Action::Select, Action::Insert, Action::Update]); + let actions = parse_privilege(privileges, &allowed_actions)?; + has_privilege_impl_captured( + user_name, + &get_grant_object_by_oid_captured(table_oid)?, + &actions, + ) +} + +#[function("has_schema_privilege(varchar, int4, varchar) -> boolean")] +fn has_schema_privilege(user_name: &str, schema_oid: i32, privileges: &str) -> Result { + // does user have privilege for schema + let allowed_actions = HashSet::from_iter([Action::Create, Action::Usage]); + let actions = parse_privilege(privileges, &allowed_actions)?; + has_privilege_impl_captured(user_name, &Object::SchemaId(schema_oid as u32), &actions) +} + +#[function("has_schema_privilege(int4, varchar, varchar) -> boolean")] +fn has_schema_privilege_1(user_id: i32, schema_name: &str, privileges: &str) -> Result { + let user_name = get_user_name_by_id_captured(user_id)?; + let schema_oid = get_schema_id_by_name_captured(schema_name)?; + has_schema_privilege(user_name.as_str(), schema_oid, privileges) +} + +#[function("has_schema_privilege(int4, int4, varchar) -> boolean")] +fn has_schema_privilege_2(user_id: i32, schema_oid: i32, privileges: &str) -> Result { + let user_name = get_user_name_by_id_captured(user_id)?; + has_schema_privilege(user_name.as_str(), schema_oid, privileges) +} + +#[function("has_schema_privilege(varchar, varchar, varchar) -> boolean")] +fn has_schema_privilege_3(user_name: &str, schema_name: &str, privileges: &str) -> Result { + let schema_oid = get_schema_id_by_name_captured(schema_name)?; + has_schema_privilege(user_name, schema_oid, privileges) +} + +#[capture_context(USER_INFO_READER)] +fn has_privilege_impl( + user_info_reader: &UserInfoReader, + user_name: &str, + object: &Object, + actions: &Vec<(Action, bool)>, +) -> Result { + let user_info = &user_info_reader.read_guard(); + let user_catalog = user_info + .get_user_by_name(user_name) + .ok_or(user_not_found_err( + format!("User {} not found", user_name).as_str(), + ))?; + Ok(user_catalog.check_privilege_with_grant_option(object, actions)) +} + +#[capture_context(USER_INFO_READER)] +fn get_user_name_by_id(user_info_reader: &UserInfoReader, user_id: i32) -> Result { + let user_info = &user_info_reader.read_guard(); + user_info + .get_user_name_by_id(user_id as u32) + .ok_or(user_not_found_err( + format!("User {} not found", user_id).as_str(), + )) +} + +#[capture_context(CATALOG_READER, DB_NAME)] +fn get_grant_object_by_oid( + catalog_reader: &CatalogReader, + db_name: &str, + oid: i32, +) -> Result { + catalog_reader + .read_guard() + .get_database_by_name(db_name) + .map_err(|e| ExprError::InvalidParam { + name: "oid", + reason: e.to_report_string().into(), + })? + .get_grant_object_by_oid(oid as u32) + .ok_or(ExprError::InvalidParam { + name: "oid", + reason: format!("Table {} not found", oid).as_str().into(), + }) +} + +#[capture_context(CATALOG_READER, DB_NAME)] +fn get_schema_id_by_name( + catalog_reader: &CatalogReader, + db_name: &str, + schema_name: &str, +) -> Result { + let reader = &catalog_reader.read_guard(); + Ok(reader + .get_schema_by_name(db_name, schema_name) + .map_err(|e| ExprError::InvalidParam { + name: "schema", + reason: e.to_report_string().into(), + })? + .id() as i32) +} + +fn parse_privilege( + privilege_string: &str, + allowed_actions: &HashSet, +) -> Result> { + let mut privileges = Vec::new(); + for part in privilege_string.split(',').map(str::trim) { + let (privilege_type, grant_option) = match part.rsplit_once(" WITH GRANT OPTION") { + Some((p, _)) => (p, true), + None => (part, false), + }; + match Action::from_str_name(privilege_type.to_uppercase().as_str()) { + Some(Action::Unspecified) | None => { + return Err(ExprError::InvalidParam { + name: "privilege", + reason: format!("unrecognized privilege type: \"{}\"", part).into(), + }) + } + Some(action) => { + if allowed_actions.is_empty() || allowed_actions.contains(&action) { + privileges.push((action, grant_option)) + } else { + return Err(ExprError::InvalidParam { + name: "privilege", + reason: format!("unrecognized privilege type: \"{}\"", part).into(), + }); + } + } + } + } + Ok(privileges) +} diff --git a/src/frontend/src/expr/function_impl/mod.rs b/src/frontend/src/expr/function_impl/mod.rs index ad0b3b7a853d..3a70ebf5db47 100644 --- a/src/frontend/src/expr/function_impl/mod.rs +++ b/src/frontend/src/expr/function_impl/mod.rs @@ -15,6 +15,7 @@ mod cast_regclass; mod col_description; pub mod context; +mod has_privilege; mod pg_get_indexdef; mod pg_get_userbyid; mod pg_get_viewdef; diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index bc18959e5be5..d80ef31b04fd 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -267,6 +267,9 @@ impl ExprVisitor for ImpureAnalyzer { | Type::PgIndexesSize | Type::PgRelationSize | Type::PgGetSerialSequence + | Type::HasTablePrivilege + | Type::HasAnyColumnPrivilege + | Type::HasSchemaPrivilege | Type::MakeTimestamptz => self.impure = true, } } diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index e30cdb0b6e31..7045bb226efd 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -301,6 +301,9 @@ impl Strong { | ExprType::PgRelationSize | ExprType::PgGetSerialSequence | ExprType::IcebergTransform + | ExprType::HasTablePrivilege + | ExprType::HasAnyColumnPrivilege + | ExprType::HasSchemaPrivilege | ExprType::InetAton | ExprType::InetNtoa => false, ExprType::Unspecified => unreachable!(), diff --git a/src/frontend/src/user/user_catalog.rs b/src/frontend/src/user/user_catalog.rs index a33e9423fc6e..319d60fc56ae 100644 --- a/src/frontend/src/user/user_catalog.rs +++ b/src/frontend/src/user/user_catalog.rs @@ -16,7 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use risingwave_common::acl::{AclMode, AclModeSet}; -use risingwave_pb::user::grant_privilege::{Object as GrantObject, Object}; +use risingwave_pb::user::grant_privilege::{Action, Object as GrantObject, Object}; use risingwave_pb::user::{PbAuthInfo, PbGrantPrivilege, PbUserInfo}; use crate::catalog::{DatabaseId, SchemaId}; @@ -168,4 +168,34 @@ impl UserCatalog { self.get_acl(object) .map_or(false, |acl_set| acl_set.has_mode(mode)) } + + pub fn check_privilege_with_grant_option( + &self, + object: &GrantObject, + actions: &Vec<(Action, bool)>, + ) -> bool { + if self.is_super { + return true; + } + let mut action_map: HashMap<_, _> = actions.iter().map(|action| (action, false)).collect(); + + for privilege in &self.grant_privileges { + if privilege.get_object().unwrap() != object { + continue; + } + for awo in &privilege.action_with_opts { + let action = awo.get_action().unwrap(); + let with_grant_option = awo.with_grant_option; + + for (&key, found) in &mut action_map { + let (required_action, required_grant_option) = *key; + + if action == required_action && (!required_grant_option | with_grant_option) { + *found = true; + } + } + } + } + action_map.values().all(|&found| found) + } }