Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions backend/crates/kalamdb-session/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,22 @@ pub fn can_downgrade_shared_to_user(role: Role) -> bool {
/// Check if a role can access a user table.
///
/// # Access Rules
/// - **System/Dba**: Full access
/// - **Service/User**: Allowed (row-level security restricts data visibility)
/// - **System/Dba/Service/User**: Allowed
/// - Reads remain scoped to the session subject for all roles.
/// - Cross-user access must use an explicit impersonation flow.
#[inline]
pub fn can_access_user_table(role: Role) -> bool {
matches!(role, Role::System | Role::Dba | Role::Service | Role::User)
}

/// Check if a role can read all user rows (RLS bypass).
/// User tables never bypass subject scoping implicitly.
///
/// Cross-user reads must be explicit via impersonation so independently
/// authenticated users can never share the same user-table view by role alone.
#[inline]
pub fn can_read_all_users(role: Role) -> bool {
matches!(role, Role::System | Role::Dba | Role::Service)
let _ = role;
false
}

/// Check if a role can execute DML statements.
Expand Down Expand Up @@ -324,4 +329,12 @@ mod tests {
assert!(can_write_shared_table(TableAccess::Public, Role::Service));
assert!(!can_write_shared_table(TableAccess::Public, Role::User));
}

#[test]
fn test_user_table_reads_never_bypass_subject_scope() {
assert!(!can_read_all_users(Role::System));
assert!(!can_read_all_users(Role::Dba));
assert!(!can_read_all_users(Role::Service));
assert!(!can_read_all_users(Role::User));
}
}
170 changes: 167 additions & 3 deletions backend/crates/kalamdb-tables/tests/provider_source_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use async_trait::async_trait;
use datafusion::{
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
arrow::{array::StringArray, datatypes::SchemaRef, record_batch::RecordBatch},
datasource::TableProvider,
execution::context::SessionContext,
physical_plan::collect,
Expand Down Expand Up @@ -294,16 +294,20 @@ struct OwnedServices {
_temp_dir: TempDir,
}

fn session_with_user(user_id: &UserId) -> SessionContext {
fn session_with_role(user_id: &UserId, role: Role) -> SessionContext {
let mut state = SessionContext::new().state().clone();
state.config_mut().options_mut().extensions.insert(SessionUserContext::new(
user_id.clone(),
Role::Dba,
role,
ReadContext::Internal,
));
SessionContext::new_with_state(state)
}

fn session_with_user(user_id: &UserId) -> SessionContext {
session_with_role(user_id, Role::Dba)
}

fn session_with_transaction(
user_id: &UserId,
tx_context: TransactionQueryContext,
Expand Down Expand Up @@ -589,6 +593,166 @@ async fn user_provider_scan_uses_deferred_batch_exec_and_returns_rows() {
assert_eq!(total_rows(&batches), 1);
}

#[tokio::test]
async fn user_provider_dba_session_reads_only_subject_rows() {
let backend: Arc<dyn StorageBackend> = Arc::new(InMemoryBackend::new());
let table_id = TableId::new(NamespaceId::new("app"), TableName::new("users_exec_scoped"));
let table_def = build_user_table_definition(&table_id);
let services = build_services(Arc::clone(&table_def), Arc::clone(&backend));
let store = Arc::new(new_indexed_user_table_store(Arc::clone(&backend), &table_id, "id"));
let provider = UserTableProvider::new(
Arc::new(TableProviderCore::new(
table_def,
Arc::clone(&services.services),
"id".to_string(),
Arc::clone(&services.schema),
HashMap::new(),
)),
Arc::clone(&store),
);

let root_user = UserId::new("root");
let dba_user = UserId::new("jamal-dba");

store
.insert(
&kalamdb_commons::ids::UserTableRowId::new(root_user.clone(), 1.into()),
&UserTableRow {
user_id: root_user.clone(),
_seq: 1.into(),
_commit_seq: 1,
_deleted: false,
fields: row(vec![
("id", ScalarValue::Int64(Some(1))),
("name", ScalarValue::Utf8(Some("root-row".to_string()))),
]),
},
)
.expect("seed root row");
store
.insert(
&kalamdb_commons::ids::UserTableRowId::new(dba_user.clone(), 2.into()),
&UserTableRow {
user_id: dba_user.clone(),
_seq: 2.into(),
_commit_seq: 2,
_deleted: false,
fields: row(vec![
("id", ScalarValue::Int64(Some(2))),
("name", ScalarValue::Utf8(Some("jamal-row".to_string()))),
]),
},
)
.expect("seed dba row");

let ctx = session_with_role(&dba_user, Role::Dba);
let state = ctx.state();
let plan = provider.scan(&state, None, &[], None).await.expect("build user plan");
let batches = collect(plan, state.task_ctx()).await.expect("collect user plan");

assert_eq!(total_rows(&batches), 1);

let batch = batches.first().expect("one batch");
let names = batch
.column_by_name("name")
.expect("name column")
.as_any()
.downcast_ref::<StringArray>()
.expect("utf8 name array");
assert_eq!(names.value(0), "jamal-row");
}

#[tokio::test]
async fn user_provider_delete_only_tombstones_subject_row() {
let backend: Arc<dyn StorageBackend> = Arc::new(InMemoryBackend::new());
let table_id = TableId::new(NamespaceId::new("app"), TableName::new("users_exec_delete_scoped"));
let table_def = build_user_table_definition(&table_id);
let services = build_services(Arc::clone(&table_def), Arc::clone(&backend));
let store = Arc::new(new_indexed_user_table_store(Arc::clone(&backend), &table_id, "id"));
let provider = UserTableProvider::new(
Arc::new(TableProviderCore::new(
table_def,
Arc::clone(&services.services),
"id".to_string(),
Arc::clone(&services.schema),
HashMap::new(),
)),
Arc::clone(&store),
);

let root_user = UserId::new("root");
let dba_user = UserId::new("jamal-dba");

store
.insert(
&kalamdb_commons::ids::UserTableRowId::new(root_user.clone(), 1.into()),
&UserTableRow {
user_id: root_user.clone(),
_seq: 1.into(),
_commit_seq: 1,
_deleted: false,
fields: row(vec![
("id", ScalarValue::Int64(Some(1))),
("name", ScalarValue::Utf8(Some("root-row".to_string()))),
]),
},
)
.expect("seed root row");
store
.insert(
&kalamdb_commons::ids::UserTableRowId::new(dba_user.clone(), 2.into()),
&UserTableRow {
user_id: dba_user.clone(),
_seq: 2.into(),
_commit_seq: 2,
_deleted: false,
fields: row(vec![
("id", ScalarValue::Int64(Some(1))),
("name", ScalarValue::Utf8(Some("jamal-row".to_string()))),
]),
},
)
.expect("seed dba row");

let (deleted_row_key, _) = provider
.delete_by_pk_value_deferred(&dba_user, "1", 3)
.await
.expect("delete dba row")
.expect("delete produced tombstone");
assert_eq!(deleted_row_key.user_id, dba_user);

let root_ctx = session_with_role(&root_user, Role::System);
let root_state = root_ctx.state();
let root_plan = provider
.scan(&root_state, None, &[], None)
.await
.expect("build root scan");
let root_batches = collect(root_plan, root_state.task_ctx())
.await
.expect("collect root rows");

assert_eq!(total_rows(&root_batches), 1);
let root_names = root_batches[0]
.column_by_name("name")
.expect("root name column")
.as_any()
.downcast_ref::<StringArray>()
.expect("root utf8 name array");
assert_eq!(root_names.value(0), "root-row");

let dba_ctx = session_with_role(&dba_user, Role::Dba);
let dba_state = dba_ctx.state();
let dba_plan = provider
.scan(&dba_state, None, &[], None)
.await
.expect("build dba scan");
let dba_batches = collect(dba_plan, dba_state.task_ctx())
.await
.expect("collect dba rows");

assert_eq!(total_rows(&dba_batches), 0);
}

#[tokio::test]
async fn user_provider_scan_with_overlay_uses_transaction_overlay_exec() {
let backend: Arc<dyn StorageBackend> = Arc::new(InMemoryBackend::new());
Expand Down