diff --git a/backend/crates/kalamdb-session/src/permissions.rs b/backend/crates/kalamdb-session/src/permissions.rs index 504d36c8..3a4fc1ef 100644 --- a/backend/crates/kalamdb-session/src/permissions.rs +++ b/backend/crates/kalamdb-session/src/permissions.rs @@ -118,17 +118,22 @@ pub fn can_downgrade_shared_to_user(role: Role) -> bool { /// Check if a role can access a user table. /// /// # Access Rules -/// - **System/Dba**: Full access -/// - **Service/User**: Allowed (row-level security restricts data visibility) +/// - **System/Dba/Service/User**: Allowed +/// - Reads remain scoped to the session subject for all roles. +/// - Cross-user access must use an explicit impersonation flow. #[inline] pub fn can_access_user_table(role: Role) -> bool { matches!(role, Role::System | Role::Dba | Role::Service | Role::User) } -/// Check if a role can read all user rows (RLS bypass). +/// User tables never bypass subject scoping implicitly. +/// +/// Cross-user reads must be explicit via impersonation so independently +/// authenticated users can never share the same user-table view by role alone. #[inline] pub fn can_read_all_users(role: Role) -> bool { - matches!(role, Role::System | Role::Dba | Role::Service) + let _ = role; + false } /// Check if a role can execute DML statements. @@ -324,4 +329,12 @@ mod tests { assert!(can_write_shared_table(TableAccess::Public, Role::Service)); assert!(!can_write_shared_table(TableAccess::Public, Role::User)); } + + #[test] + fn test_user_table_reads_never_bypass_subject_scope() { + assert!(!can_read_all_users(Role::System)); + assert!(!can_read_all_users(Role::Dba)); + assert!(!can_read_all_users(Role::Service)); + assert!(!can_read_all_users(Role::User)); + } } diff --git a/backend/crates/kalamdb-tables/tests/provider_source_models.rs b/backend/crates/kalamdb-tables/tests/provider_source_models.rs index 55fae917..fbc0e612 100644 --- a/backend/crates/kalamdb-tables/tests/provider_source_models.rs +++ b/backend/crates/kalamdb-tables/tests/provider_source_models.rs @@ -8,7 +8,7 @@ use std::{ use async_trait::async_trait; use datafusion::{ - arrow::{datatypes::SchemaRef, record_batch::RecordBatch}, + arrow::{array::StringArray, datatypes::SchemaRef, record_batch::RecordBatch}, datasource::TableProvider, execution::context::SessionContext, physical_plan::collect, @@ -294,16 +294,20 @@ struct OwnedServices { _temp_dir: TempDir, } -fn session_with_user(user_id: &UserId) -> SessionContext { +fn session_with_role(user_id: &UserId, role: Role) -> SessionContext { let mut state = SessionContext::new().state().clone(); state.config_mut().options_mut().extensions.insert(SessionUserContext::new( user_id.clone(), - Role::Dba, + role, ReadContext::Internal, )); SessionContext::new_with_state(state) } +fn session_with_user(user_id: &UserId) -> SessionContext { + session_with_role(user_id, Role::Dba) +} + fn session_with_transaction( user_id: &UserId, tx_context: TransactionQueryContext, @@ -589,6 +593,166 @@ async fn user_provider_scan_uses_deferred_batch_exec_and_returns_rows() { assert_eq!(total_rows(&batches), 1); } +#[tokio::test] +async fn user_provider_dba_session_reads_only_subject_rows() { + let backend: Arc = Arc::new(InMemoryBackend::new()); + let table_id = TableId::new(NamespaceId::new("app"), TableName::new("users_exec_scoped")); + let table_def = build_user_table_definition(&table_id); + let services = build_services(Arc::clone(&table_def), Arc::clone(&backend)); + let store = Arc::new(new_indexed_user_table_store(Arc::clone(&backend), &table_id, "id")); + let provider = UserTableProvider::new( + Arc::new(TableProviderCore::new( + table_def, + Arc::clone(&services.services), + "id".to_string(), + Arc::clone(&services.schema), + HashMap::new(), + )), + Arc::clone(&store), + ); + + let root_user = UserId::new("root"); + let dba_user = UserId::new("jamal-dba"); + + store + .insert( + &kalamdb_commons::ids::UserTableRowId::new(root_user.clone(), 1.into()), + &UserTableRow { + user_id: root_user.clone(), + _seq: 1.into(), + _commit_seq: 1, + _deleted: false, + fields: row(vec![ + ("id", ScalarValue::Int64(Some(1))), + ("name", ScalarValue::Utf8(Some("root-row".to_string()))), + ]), + }, + ) + .expect("seed root row"); + store + .insert( + &kalamdb_commons::ids::UserTableRowId::new(dba_user.clone(), 2.into()), + &UserTableRow { + user_id: dba_user.clone(), + _seq: 2.into(), + _commit_seq: 2, + _deleted: false, + fields: row(vec![ + ("id", ScalarValue::Int64(Some(2))), + ("name", ScalarValue::Utf8(Some("jamal-row".to_string()))), + ]), + }, + ) + .expect("seed dba row"); + + let ctx = session_with_role(&dba_user, Role::Dba); + let state = ctx.state(); + let plan = provider.scan(&state, None, &[], None).await.expect("build user plan"); + let batches = collect(plan, state.task_ctx()).await.expect("collect user plan"); + + assert_eq!(total_rows(&batches), 1); + + let batch = batches.first().expect("one batch"); + let names = batch + .column_by_name("name") + .expect("name column") + .as_any() + .downcast_ref::() + .expect("utf8 name array"); + assert_eq!(names.value(0), "jamal-row"); +} + +#[tokio::test] +async fn user_provider_delete_only_tombstones_subject_row() { + let backend: Arc = Arc::new(InMemoryBackend::new()); + let table_id = TableId::new(NamespaceId::new("app"), TableName::new("users_exec_delete_scoped")); + let table_def = build_user_table_definition(&table_id); + let services = build_services(Arc::clone(&table_def), Arc::clone(&backend)); + let store = Arc::new(new_indexed_user_table_store(Arc::clone(&backend), &table_id, "id")); + let provider = UserTableProvider::new( + Arc::new(TableProviderCore::new( + table_def, + Arc::clone(&services.services), + "id".to_string(), + Arc::clone(&services.schema), + HashMap::new(), + )), + Arc::clone(&store), + ); + + let root_user = UserId::new("root"); + let dba_user = UserId::new("jamal-dba"); + + store + .insert( + &kalamdb_commons::ids::UserTableRowId::new(root_user.clone(), 1.into()), + &UserTableRow { + user_id: root_user.clone(), + _seq: 1.into(), + _commit_seq: 1, + _deleted: false, + fields: row(vec![ + ("id", ScalarValue::Int64(Some(1))), + ("name", ScalarValue::Utf8(Some("root-row".to_string()))), + ]), + }, + ) + .expect("seed root row"); + store + .insert( + &kalamdb_commons::ids::UserTableRowId::new(dba_user.clone(), 2.into()), + &UserTableRow { + user_id: dba_user.clone(), + _seq: 2.into(), + _commit_seq: 2, + _deleted: false, + fields: row(vec![ + ("id", ScalarValue::Int64(Some(1))), + ("name", ScalarValue::Utf8(Some("jamal-row".to_string()))), + ]), + }, + ) + .expect("seed dba row"); + + let (deleted_row_key, _) = provider + .delete_by_pk_value_deferred(&dba_user, "1", 3) + .await + .expect("delete dba row") + .expect("delete produced tombstone"); + assert_eq!(deleted_row_key.user_id, dba_user); + + let root_ctx = session_with_role(&root_user, Role::System); + let root_state = root_ctx.state(); + let root_plan = provider + .scan(&root_state, None, &[], None) + .await + .expect("build root scan"); + let root_batches = collect(root_plan, root_state.task_ctx()) + .await + .expect("collect root rows"); + + assert_eq!(total_rows(&root_batches), 1); + let root_names = root_batches[0] + .column_by_name("name") + .expect("root name column") + .as_any() + .downcast_ref::() + .expect("root utf8 name array"); + assert_eq!(root_names.value(0), "root-row"); + + let dba_ctx = session_with_role(&dba_user, Role::Dba); + let dba_state = dba_ctx.state(); + let dba_plan = provider + .scan(&dba_state, None, &[], None) + .await + .expect("build dba scan"); + let dba_batches = collect(dba_plan, dba_state.task_ctx()) + .await + .expect("collect dba rows"); + + assert_eq!(total_rows(&dba_batches), 0); +} + #[tokio::test] async fn user_provider_scan_with_overlay_uses_transaction_overlay_exec() { let backend: Arc = Arc::new(InMemoryBackend::new());