feat(management): add application services with SpiceDB authorization (AIHCM-183)#299
feat(management): add application services with SpiceDB authorization (AIHCM-183)#299
Conversation
Add KnowledgeGraphServiceProbe and DataSourceServiceProbe following the Domain-Oriented Observability pattern established in IAM. Each includes a Protocol interface and DefaultXxxProbe implementation using structlog with _get_context_kwargs for kwarg collision avoidance. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add KnowledgeGraphService application service implementing CRUD operations with SpiceDB permission checks (EDIT, VIEW, MANAGE). Includes cascade delete of DataSources, read_relationships for workspace-scoped listing, and IntegrityError-to-domain exception mapping. Also adds UnauthorizedError to management ports. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add DataSourceService application service implementing CRUD + trigger_sync operations with SpiceDB permission checks. Handles credential storage via ISecretStoreRepository, KG existence/tenant validation, sync run creation with DataSourceSyncRequested event emission. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add FastAPI dependency injection factories for KnowledgeGraphService and DataSourceService, wiring repositories, secret store, authz, and tenant scoping from CurrentUser. Update architecture test to exclude management.dependencies from cross-context isolation checks since DI wiring is a presentation-layer concern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sulate sync request Security fixes: - Add tenant_id verification in KG get/update/delete methods - Add tenant_id verification in DS get/update/delete/trigger_sync methods - Return None on permission denied in get() to prevent existence leakage DDD fix: - Add DataSource.request_sync() domain method instead of directly accessing _pending_events from the application service Tests: - Add tenant scoping unit tests for all affected methods - Add get() returns None on permission denied tests - Add DataSource.request_sync() unit tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a Management application layer: two application services (DataSourceService, KnowledgeGraphService) with tenant scoping, authorization checks, transactional persistence, secret-store credential handling, and observability probes. Introduces observability probes and default implementations, FastAPI dependency providers that wire services, a DataSource.request_sync domain method emitting a sync event, an UnauthorizedError exception, and extensive unit tests for services and domain behavior. Sequence Diagram(s)sequenceDiagram
participant User
participant DataSourceService
participant AuthzProvider
participant KnowledgeGraphRepo
participant DataSourceRepo
participant SecretStore
participant Probe
User->>DataSourceService: create(user_id, kg_id, name, config, credentials)
DataSourceService->>AuthzProvider: check EDIT permission on KG
alt Permission Denied
AuthzProvider-->>DataSourceService: denied
DataSourceService->>Probe: permission_denied(user_id, kg_id, EDIT)
DataSourceService-->>User: UnauthorizedError
else Permission Granted
AuthzProvider-->>DataSourceService: allowed
DataSourceService->>KnowledgeGraphRepo: verify KG exists & tenant
DataSourceService->>DataSourceRepo: create DataSource aggregate
alt Raw Credentials Provided
DataSourceService->>SecretStore: store encrypted credentials
end
DataSourceService->>DataSourceRepo: persist DataSource
DataSourceService->>Probe: data_source_created(ds_id, kg_id, tenant_id, name)
DataSourceService-->>User: DataSource
end
sequenceDiagram
participant User
participant DataSourceService
participant AuthzProvider
participant DataSourceRepo
participant SecretStore
participant Probe
User->>DataSourceService: delete(user_id, ds_id)
DataSourceService->>AuthzProvider: check MANAGE permission on DS
alt Permission Denied
AuthzProvider-->>DataSourceService: denied
DataSourceService->>Probe: permission_denied(user_id, ds_id, MANAGE)
DataSourceService-->>User: UnauthorizedError
else Permission Granted
AuthzProvider-->>DataSourceService: allowed
DataSourceService->>DataSourceRepo: fetch DataSource
DataSourceService->>DataSourceRepo: validate tenant
alt Credentials Stored
DataSourceService->>SecretStore: delete credentials
end
DataSourceService->>DataSourceRepo: mark & persist deletion
DataSourceService->>Probe: data_source_deleted(ds_id)
DataSourceService-->>User: success
end
sequenceDiagram
participant User
participant KnowledgeGraphService
participant AuthzProvider
participant KnowledgeGraphRepo
participant DataSourceRepo
participant Probe
User->>KnowledgeGraphService: delete(user_id, kg_id)
KnowledgeGraphService->>AuthzProvider: check MANAGE permission on KG
alt Permission Denied
AuthzProvider-->>KnowledgeGraphService: denied
KnowledgeGraphService->>Probe: permission_denied(user_id, kg_id, MANAGE)
KnowledgeGraphService-->>User: UnauthorizedError
else Permission Granted
AuthzProvider-->>KnowledgeGraphService: allowed
KnowledgeGraphService->>KnowledgeGraphRepo: fetch & validate tenant
alt DataSourceRepository available
KnowledgeGraphService->>DataSourceRepo: list related data sources
loop For each data source
KnowledgeGraphService->>DataSourceRepo: mark & delete data source
end
end
KnowledgeGraphService->>KnowledgeGraphRepo: delete knowledge graph
KnowledgeGraphService->>Probe: knowledge_graph_deleted(kg_id)
KnowledgeGraphService-->>User: success
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (4)
src/api/management/ports/exceptions.py (1)
31-39: KeepUnauthorizedErrorout of the port-exception module.This file is documented as repository/port failures handled by the application layer, but
UnauthorizedErroris an application/API concern and its docstring now mentions HTTP 403. Moving it to an application/common exceptions module would keep the boundary clear.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/ports/exceptions.py` around lines 31 - 39, UnauthorizedError is an application-level/API concern and should be removed from the port-level exceptions; delete the UnauthorizedError class from this port exceptions module and add it to the application/common exceptions module (create or update a common exceptions file) with its HTTP-403-focused docstring. Update any imports referencing UnauthorizedError to point to the new application/common exceptions module, and ensure the original port exceptions module only contains repository/port-related exception classes.src/api/management/application/observability/data_source_service_probe.py (1)
33-38: Prefer stable failure codes over rawerrorstrings.These probe methods accept arbitrary strings and log them verbatim. On create/delete paths that now touch persistence and the secret store, that makes it too easy to push backend details into logs and it creates high-cardinality telemetry. Prefer structured fields like
error_code/error_type, and capture stack traces at the catch site only when needed. The same contract exists insrc/api/management/application/observability/knowledge_graph_service_probe.py.Also applies to: 64-68, 155-169, 209-221
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/application/observability/data_source_service_probe.py` around lines 33 - 38, The probe methods (e.g., data_source_creation_failed) currently accept raw error strings; change their signature to accept a stable structured identifier (e.g., error_code: str or error_type: str) and an optional sanitized message flag instead of raw error text, update the body to log only the structured field and any safe, low-cardinality metadata, and remove/avoid logging stack traces here; apply the same change to the other probe methods mentioned (the similar create/delete/secret-store failure methods referenced in this file and the corresponding methods in knowledge_graph_service_probe.py), and update all callers/tests to pass an error_code (and only capture full stacktrace at the catch site if needed).src/api/management/application/services/data_source_service.py (1)
436-448: Move ULID import to module level.The
from ulid import ULIDimport on Line 437 is placed inside the function body. This should be moved to the top of the file with other imports for consistency and slight performance benefit (avoiding repeated import lookups).♻️ Proposed fix
Add at the top of the file with other imports:
from ulid import ULIDThen remove Line 437 and update usage:
async with self._session.begin(): - from ulid import ULID - sync_run = DataSourceSyncRun( id=str(ULID()),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/application/services/data_source_service.py` around lines 436 - 448, Move the local import "from ulid import ULID" out of the function and add it to the module-level imports; remove the in-function import inside the async with block that creates DataSourceSyncRun and keep using ULID() when building the DataSourceSyncRun (id=str(ULID())) before calling self._sync_run_repo.save(sync_run) so behavior is unchanged but the import is at the top of the file.src/api/management/application/services/knowledge_graph_service.py (1)
254-259: Consider batch fetching KGs to avoid N+1 queries.The current implementation fetches each KG individually in a loop. For workspaces with many knowledge graphs, this could result in performance degradation. Consider adding a batch fetch method to the repository if this becomes a concern in production.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/application/services/knowledge_graph_service.py` around lines 254 - 259, The loop that calls self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each kg_id causes N+1 queries; add and use a batch fetch on the repository (e.g., get_by_ids or get_many) to retrieve all KnowledgeGraph objects in one call, convert the kg_ids into the appropriate KnowledgeGraphId list, then filter the returned list by tenant_id == self._scope_to_tenant and assign to kgs instead of appending in a loop; update the repository interface and usages of _kg_repo.get_by_id accordingly (keep KnowledgeGraphId and _scope_to_tenant as the identity/tenant checks).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 194-198: The docstring for DataSourceService.get is inconsistent:
it claims it raises UnauthorizedError but the implementation returns None to
avoid leaking existence; update the docstring (for the method named get on
DataSourceService) to reflect that it returns None when the caller lacks
permission or the resource doesn't exist and remove or replace the "Raises:
UnauthorizedError" section accordingly, mirroring the style used in
KnowledgeGraphService.get.
In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 176-179: Update the docstring on the method in
KnowledgeGraphService that currently claims it raises UnauthorizedError: change
the "Raises" section to instead state that the method returns None when the
caller lacks VIEW permission (to avoid existence leakage) and clarify the return
contract as "KnowledgeGraph aggregate, or None if not found or if the caller
lacks VIEW permission"; reference the UnauthorizedError symbol and the method in
KnowledgeGraphService so the implementer updates the docstring to match the
existing permission-check behavior that returns None.
In `@src/api/management/domain/aggregates/data_source.py`:
- Around line 215-236: The DataSourceSyncRequested event emitted by request_sync
lacks the sync-run identifier which prevents consumers from correlating
overlapping runs; update the event payload and call site to include sync_run_id:
extend the DataSourceSyncRequested event definition to accept sync_run_id,
change the request_sync method in the DataSource aggregate to obtain the
currently pending DataSourceSyncRun id (or accept sync_run_id as an explicit
parameter) and include it when appending the DataSourceSyncRequested event, and
then propagate this change through the trigger_sync flow, the service that
invokes request_sync, and affected tests to assert the sync_run_id is present.
---
Nitpick comments:
In `@src/api/management/application/observability/data_source_service_probe.py`:
- Around line 33-38: The probe methods (e.g., data_source_creation_failed)
currently accept raw error strings; change their signature to accept a stable
structured identifier (e.g., error_code: str or error_type: str) and an optional
sanitized message flag instead of raw error text, update the body to log only
the structured field and any safe, low-cardinality metadata, and remove/avoid
logging stack traces here; apply the same change to the other probe methods
mentioned (the similar create/delete/secret-store failure methods referenced in
this file and the corresponding methods in knowledge_graph_service_probe.py),
and update all callers/tests to pass an error_code (and only capture full
stacktrace at the catch site if needed).
In `@src/api/management/application/services/data_source_service.py`:
- Around line 436-448: Move the local import "from ulid import ULID" out of the
function and add it to the module-level imports; remove the in-function import
inside the async with block that creates DataSourceSyncRun and keep using ULID()
when building the DataSourceSyncRun (id=str(ULID())) before calling
self._sync_run_repo.save(sync_run) so behavior is unchanged but the import is at
the top of the file.
In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 254-259: The loop that calls
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each kg_id causes N+1
queries; add and use a batch fetch on the repository (e.g., get_by_ids or
get_many) to retrieve all KnowledgeGraph objects in one call, convert the kg_ids
into the appropriate KnowledgeGraphId list, then filter the returned list by
tenant_id == self._scope_to_tenant and assign to kgs instead of appending in a
loop; update the repository interface and usages of _kg_repo.get_by_id
accordingly (keep KnowledgeGraphId and _scope_to_tenant as the identity/tenant
checks).
In `@src/api/management/ports/exceptions.py`:
- Around line 31-39: UnauthorizedError is an application-level/API concern and
should be removed from the port-level exceptions; delete the UnauthorizedError
class from this port exceptions module and add it to the application/common
exceptions module (create or update a common exceptions file) with its
HTTP-403-focused docstring. Update any imports referencing UnauthorizedError to
point to the new application/common exceptions module, and ensure the original
port exceptions module only contains repository/port-related exception classes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: da219060-93a1-45dc-8842-854445b3d163
📒 Files selected for processing (17)
src/api/management/application/__init__.pysrc/api/management/application/observability/__init__.pysrc/api/management/application/observability/data_source_service_probe.pysrc/api/management/application/observability/knowledge_graph_service_probe.pysrc/api/management/application/services/__init__.pysrc/api/management/application/services/data_source_service.pysrc/api/management/application/services/knowledge_graph_service.pysrc/api/management/dependencies/__init__.pysrc/api/management/dependencies/data_source.pysrc/api/management/dependencies/knowledge_graph.pysrc/api/management/domain/aggregates/data_source.pysrc/api/management/ports/exceptions.pysrc/api/tests/unit/management/application/__init__.pysrc/api/tests/unit/management/application/test_data_source_service.pysrc/api/tests/unit/management/application/test_knowledge_graph_service.pysrc/api/tests/unit/management/test_architecture.pysrc/api/tests/unit/management/test_data_source.py
src/api/management/application/services/knowledge_graph_service.py
Outdated
Show resolved
Hide resolved
| def request_sync(self, *, requested_by: str | None = None) -> None: | ||
| """Request a sync for this data source. | ||
|
|
||
| Records a DataSourceSyncRequested event. | ||
|
|
||
| Args: | ||
| requested_by: The user who requested the sync (optional) | ||
|
|
||
| Raises: | ||
| AggregateDeletedError: If the data source has been marked for deletion | ||
| """ | ||
| if self._deleted: | ||
| raise AggregateDeletedError("Cannot request sync on a deleted data source") | ||
| self._pending_events.append( | ||
| DataSourceSyncRequested( | ||
| data_source_id=self.id.value, | ||
| knowledge_graph_id=self.knowledge_graph_id, | ||
| tenant_id=self.tenant_id, | ||
| occurred_at=datetime.now(UTC), | ||
| requested_by=requested_by, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Include the sync-run identifier in the emitted event.
Per the new trigger_sync flow, a pending DataSourceSyncRun exists before this event is raised, but the payload only carries data_source_id. If two requests for the same source overlap, downstream consumers cannot tell which run they should reconcile. Add sync_run_id to DataSourceSyncRequested, or enforce a one-pending-run invariant before emitting it.
Possible direction
- def request_sync(self, *, requested_by: str | None = None) -> None:
+ def request_sync(
+ self,
+ *,
+ sync_run_id: str,
+ requested_by: str | None = None,
+ ) -> None:
"""Request a sync for this data source."""
if self._deleted:
raise AggregateDeletedError("Cannot request sync on a deleted data source")
self._pending_events.append(
DataSourceSyncRequested(
data_source_id=self.id.value,
+ sync_run_id=sync_run_id,
knowledge_graph_id=self.knowledge_graph_id,
tenant_id=self.tenant_id,
occurred_at=datetime.now(UTC),
requested_by=requested_by,
)
)This also needs to be threaded through the event definition, service call site, and tests.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/domain/aggregates/data_source.py` around lines 215 - 236,
The DataSourceSyncRequested event emitted by request_sync lacks the sync-run
identifier which prevents consumers from correlating overlapping runs; update
the event payload and call site to include sync_run_id: extend the
DataSourceSyncRequested event definition to accept sync_run_id, change the
request_sync method in the DataSource aggregate to obtain the currently pending
DataSourceSyncRun id (or accept sync_run_id as an explicit parameter) and
include it when appending the DataSourceSyncRequested event, and then propagate
this change through the trigger_sync flow, the service that invokes
request_sync, and affected tests to assert the sync_run_id is present.
…level - Update get() docstrings in both services to document None return on permission denied (removed stale UnauthorizedError raises section) - Move ULID import from local scope to module level in DataSourceService Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/api/management/application/services/knowledge_graph_service.py (1)
254-257: Consider batch fetching to avoid N+1 queries.The loop fetches each KnowledgeGraph individually, resulting in N database queries. For workspaces with many knowledge graphs, consider adding a batch method like
get_by_ids()to the repository.♻️ Suggested approach
- # Fetch each KG from repo and filter by tenant - kgs: list[KnowledgeGraph] = [] - for kg_id in kg_ids: - kg = await self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) - if kg is not None and kg.tenant_id == self._scope_to_tenant: - kgs.append(kg) + # Batch fetch all KGs and filter by tenant + all_kgs = await self._kg_repo.get_by_ids( + [KnowledgeGraphId(value=kg_id) for kg_id in kg_ids] + ) + kgs = [kg for kg in all_kgs if kg.tenant_id == self._scope_to_tenant]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/management/application/services/knowledge_graph_service.py` around lines 254 - 257, The current loop over kg_ids calls self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each id causing N+1 queries; add a batch repository method (e.g., get_by_ids(ids: List[KnowledgeGraphId]) or get_by_ids_raw(ids: List[str])) on the KG repo and replace the loop with a single call that returns all KGs, then filter returned items by tenant_id == self._scope_to_tenant and extend kgs. Ensure you use the existing KnowledgeGraphId type when constructing ids for the batch call and update any callers/tests accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/management/application/services/data_source_service.py`:
- Around line 153-172: The secret-store calls (self._secret_store.store and
delete) are executed inside the DB transaction begun with async with
self._session.begin(), risking divergence if DB commit fails; refactor so the DB
save (self._ds_repo.save / DataSource.create flow) completes and the transaction
is committed first, then perform secret-store.store/delete afterwards and, on
secret-store failure, apply a compensating DB update or enqueue an outbox
message to reconcile; locate usages around the async with self._session.begin()
block that creates/saves DataSource and move credential persistence out of that
block, ensuring ds.credentials_path is updated only after successful secret
storage or handled via compensating action/outbox if storage fails.
- Around line 310-314: The call to ds.update_connection uses truthy fallbacks
that ignore intentional falsy values; change the arguments to use explicit None
checks so empty-but-valid values are passed through—e.g., replace name=name or
ds.name with name=name if name is not None else ds.name and
connection_config=connection_config if connection_config is not None else
ds.connection_config (leave credentials_path as-is), and keep the outer
conditional that checks whether name is not None or connection_config is not
None.
- Line 252: Add an explicit tenant ownership check in list_for_knowledge_graph
before calling self._ds_repo.find_by_knowledge_graph(kg_id): load the KG via
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) and if it is None or
kg.tenant_id != self._scope_to_tenant raise UnauthorizedError; only then call
_ds_repo.find_by_knowledge_graph to ensure the knowledge graph belongs to the
current tenant (defense-in-depth consistent with create()).
---
Nitpick comments:
In `@src/api/management/application/services/knowledge_graph_service.py`:
- Around line 254-257: The current loop over kg_ids calls
self._kg_repo.get_by_id(KnowledgeGraphId(value=kg_id)) for each id causing N+1
queries; add a batch repository method (e.g., get_by_ids(ids:
List[KnowledgeGraphId]) or get_by_ids_raw(ids: List[str])) on the KG repo and
replace the loop with a single call that returns all KGs, then filter returned
items by tenant_id == self._scope_to_tenant and extend kgs. Ensure you use the
existing KnowledgeGraphId type when constructing ids for the batch call and
update any callers/tests accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2c1e79f1-9b95-4a87-8a82-7d98a58c7c58
📒 Files selected for processing (2)
src/api/management/application/services/data_source_service.pysrc/api/management/application/services/knowledge_graph_service.py
| async with self._session.begin(): | ||
| ds = DataSource.create( | ||
| knowledge_graph_id=kg_id, | ||
| tenant_id=self._scope_to_tenant, | ||
| name=name, | ||
| adapter_type=adapter_type, | ||
| connection_config=connection_config, | ||
| created_by=user_id, | ||
| ) | ||
|
|
||
| if raw_credentials is not None: | ||
| cred_path = f"datasource/{ds.id.value}/credentials" | ||
| await self._secret_store.store( | ||
| path=cred_path, | ||
| tenant_id=self._scope_to_tenant, | ||
| credentials=raw_credentials, | ||
| ) | ||
| ds.credentials_path = cred_path | ||
|
|
||
| await self._ds_repo.save(ds) |
There was a problem hiding this comment.
Non-atomic secret-store side effects can desynchronize credentials from DB state.
self._secret_store.store/delete(...) is executed inside async with self._session.begin() (Line 165, Line 320, Line 379). If DB commit fails after a secret operation succeeds, DB and secret store diverge (orphaned/deleted credentials with opposite DB state). This is a critical consistency risk.
Please isolate secret-store side effects from DB transaction boundaries (e.g., post-commit with compensating actions, or transactional outbox workflow).
Also applies to: 318-327, 377-386
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/management/application/services/data_source_service.py` around lines
153 - 172, The secret-store calls (self._secret_store.store and delete) are
executed inside the DB transaction begun with async with self._session.begin(),
risking divergence if DB commit fails; refactor so the DB save
(self._ds_repo.save / DataSource.create flow) completes and the transaction is
committed first, then perform secret-store.store/delete afterwards and, on
secret-store failure, apply a compensating DB update or enqueue an outbox
message to reconcile; locate usages around the async with self._session.begin()
block that creates/saves DataSource and move credential persistence out of that
block, ensuring ds.credentials_path is updated only after successful secret
storage or handled via compensating action/outbox if storage fails.
…eck in list - Use `if x is not None` instead of `x or default` in update_connection to preserve intentional falsy values (e.g., empty dict for connection_config) - Add tenant ownership verification in list_for_knowledge_graph as defense-in-depth, consistent with create() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/api/tests/unit/management/application/test_data_source_service.py (1)
368-424: Consider adding test for cross-tenant KG rejection in list_for_knowledge_graph.The service now includes a defense-in-depth tenant check (lines 252-255 in the service), but there's no test verifying
UnauthorizedErroris raised when the KG belongs to a different tenant after permission check passes.💡 Suggested test to add
`@pytest.mark.asyncio` async def test_list_raises_unauthorized_for_different_tenant_kg( self, service, mock_authz, mock_kg_repo, user_id, kg_id ): """list_for_knowledge_graph() raises UnauthorizedError when KG belongs to different tenant.""" mock_authz.check_permission.return_value = True mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id="other-tenant") with pytest.raises(UnauthorizedError, match="not accessible"): await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/tests/unit/management/application/test_data_source_service.py` around lines 368 - 424, Add a unit test to verify list_for_knowledge_graph enforces the defense-in-depth tenant check: create a pytest async test (e.g., test_list_raises_unauthorized_for_different_tenant_kg) that sets mock_authz.check_permission.return_value = True, sets mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id="other-tenant"), then asserts that awaiting service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) raises UnauthorizedError (optionally matching "not accessible"); ensure the test references list_for_knowledge_graph, mock_authz, mock_kg_repo and _make_kg so it fails if the tenant rejection is not enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/api/tests/unit/management/application/test_data_source_service.py`:
- Around line 368-424: Add a unit test to verify list_for_knowledge_graph
enforces the defense-in-depth tenant check: create a pytest async test (e.g.,
test_list_raises_unauthorized_for_different_tenant_kg) that sets
mock_authz.check_permission.return_value = True, sets
mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id,
tenant_id="other-tenant"), then asserts that awaiting
service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) raises
UnauthorizedError (optionally matching "not accessible"); ensure the test
references list_for_knowledge_graph, mock_authz, mock_kg_repo and _make_kg so it
fails if the tenant rejection is not enforced.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5b121947-39cd-4187-80c1-92b3da94a178
⛔ Files ignored due to path filters (1)
src/api/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
src/api/management/application/services/data_source_service.pysrc/api/tests/unit/management/application/test_data_source_service.py
Verify that list_for_knowledge_graph raises UnauthorizedError when the knowledge graph belongs to a different tenant (defense-in-depth check). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/api/tests/unit/management/application/test_data_source_service.py`:
- Around line 368-437: Add a test to TestDataSourceServiceListForKnowledgeGraph
covering the "KG not found" case: ensure mock_authz.check_permission returns
True and mock_kg_repo.get_by_id returns None, then call
DataSourceService.list_for_knowledge_graph and assert it raises a ValueError
(match "not found"); reference the existing pattern used in other tests (e.g.,
test_list_raises_unauthorized_for_different_tenant_kg) and use the same fixtures
(service, mock_authz, mock_kg_repo, user_id, kg_id).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b43ede58-aa59-4eb4-ac2c-8a634c66e8ac
📒 Files selected for processing (1)
src/api/tests/unit/management/application/test_data_source_service.py
| class TestDataSourceServiceListForKnowledgeGraph: | ||
| """Tests for DataSourceService.list_for_knowledge_graph.""" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_list_checks_view_permission_on_kg( | ||
| self, service, mock_authz, mock_ds_repo, mock_kg_repo, user_id, kg_id, tenant_id | ||
| ): | ||
| """list_for_knowledge_graph() checks VIEW on the KG.""" | ||
| mock_authz.check_permission.return_value = True | ||
| mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) | ||
| mock_ds_repo.find_by_knowledge_graph.return_value = [] | ||
|
|
||
| await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) | ||
|
|
||
| mock_authz.check_permission.assert_called_once_with( | ||
| resource=f"knowledge_graph:{kg_id}", | ||
| permission=Permission.VIEW, | ||
| subject=f"user:{user_id}", | ||
| ) | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_list_raises_unauthorized_when_denied( | ||
| self, service, mock_authz, user_id, kg_id | ||
| ): | ||
| """list_for_knowledge_graph() raises UnauthorizedError when denied.""" | ||
| mock_authz.check_permission.return_value = False | ||
|
|
||
| with pytest.raises(UnauthorizedError): | ||
| await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_list_raises_unauthorized_for_different_tenant_kg( | ||
| self, service, mock_authz, mock_kg_repo, user_id, kg_id | ||
| ): | ||
| """list_for_knowledge_graph() rejects KG belonging to different tenant.""" | ||
| mock_authz.check_permission.return_value = True | ||
| mock_kg_repo.get_by_id.return_value = _make_kg( | ||
| kg_id=kg_id, tenant_id="other-tenant" | ||
| ) | ||
|
|
||
| with pytest.raises(UnauthorizedError, match="not accessible"): | ||
| await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_list_returns_data_sources( | ||
| self, | ||
| service, | ||
| mock_authz, | ||
| mock_ds_repo, | ||
| mock_kg_repo, | ||
| mock_probe, | ||
| user_id, | ||
| kg_id, | ||
| tenant_id, | ||
| ): | ||
| """list_for_knowledge_graph() returns data sources from repo.""" | ||
| mock_authz.check_permission.return_value = True | ||
| mock_kg_repo.get_by_id.return_value = _make_kg(kg_id=kg_id, tenant_id=tenant_id) | ||
| ds1 = _make_ds(ds_id="ds-001") | ||
| ds2 = _make_ds(ds_id="ds-002") | ||
| mock_ds_repo.find_by_knowledge_graph.return_value = [ds1, ds2] | ||
|
|
||
| result = await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id) | ||
|
|
||
| assert len(result) == 2 | ||
| mock_probe.data_sources_listed.assert_called_once_with( | ||
| kg_id=kg_id, | ||
| count=2, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Missing test case: KG not found scenario.
The test class covers authorization denial and cross-tenant rejection, but doesn't test the case where the knowledge graph doesn't exist at all (kg_repo.get_by_id returns None). The create tests have this coverage (test_create_verifies_kg_exists_and_belongs_to_tenant), but it's missing here.
Consider adding a test like:
`@pytest.mark.asyncio`
async def test_list_raises_value_error_when_kg_not_found(
self, service, mock_authz, mock_kg_repo, user_id, kg_id
):
"""list_for_knowledge_graph() raises ValueError when KG not found."""
mock_authz.check_permission.return_value = True
mock_kg_repo.get_by_id.return_value = None
with pytest.raises(ValueError, match="not found"):
await service.list_for_knowledge_graph(user_id=user_id, kg_id=kg_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/api/tests/unit/management/application/test_data_source_service.py` around
lines 368 - 437, Add a test to TestDataSourceServiceListForKnowledgeGraph
covering the "KG not found" case: ensure mock_authz.check_permission returns
True and mock_kg_repo.get_by_id returns None, then call
DataSourceService.list_for_knowledge_graph and assert it raises a ValueError
(match "not found"); reference the existing pattern used in other tests (e.g.,
test_list_raises_unauthorized_for_different_tenant_kg) and use the same fixtures
(service, mock_authz, mock_kg_repo, user_id, kg_id).
Verify that list_for_knowledge_graph raises UnauthorizedError (not ValueError) when the KG doesn't exist, preventing existence leakage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/api/tests/unit/management/application/test_data_source_service.py (1)
236-266: Strengthen secret-store write assertions with tenant scoping checksThese tests verify that credentials are stored, but they don’t assert that the
tenant_idis passed tosecret_store.store. Adding that assertion in both create/update credential tests would better protect tenant-isolation guarantees from regressions.Also applies to: 524-540
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/tests/unit/management/application/test_data_source_service.py` around lines 236 - 266, Add assertions to the existing tests (e.g., test_create_stores_credentials_when_provided and the corresponding update test) to verify the tenant scoping is passed to the secret store: after calling service.create/service.update, inspect mock_secret_store.store.call_args and assert that the tenant_id appears in the call (either in kwargs like call_args.kwargs.get("tenant_id") or in args) in addition to the existing "datasource/" path check; reference mock_secret_store.store and the test function names to locate where to add the extra assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/api/tests/unit/management/application/test_data_source_service.py`:
- Around line 236-266: Add assertions to the existing tests (e.g.,
test_create_stores_credentials_when_provided and the corresponding update test)
to verify the tenant scoping is passed to the secret store: after calling
service.create/service.update, inspect mock_secret_store.store.call_args and
assert that the tenant_id appears in the call (either in kwargs like
call_args.kwargs.get("tenant_id") or in args) in addition to the existing
"datasource/" path check; reference mock_secret_store.store and the test
function names to locate where to add the extra assertion.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3147f813-26d0-4a1b-af35-d8724b73ff19
📒 Files selected for processing (1)
src/api/tests/unit/management/application/test_data_source_service.py
Verify that create() and update() pass the correct tenant_id and credentials to the secret store, not just the path prefix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
KnowledgeGraphServiceandDataSourceServicewith SpiceDB authorization on all CRUD operationscredentials_path— all in same transactiontrigger_synccreatesDataSourceSyncRun(status=pending) and emitsDataSourceSyncRequestedevent via newDataSource.request_sync()domain methodget()returnsNonefor both not-found and access-denied (prevents resource existence leakage)Test plan
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests
Chores