feat(rag): add multi-tenancy support and vision caching#285
Conversation
Add team-level data isolation in Cognee through tenant management, enabling each team to have its own isolated data context. Also add caching for Vision API results to avoid redundant API calls during image and PDF processing.
Schedule async deletion of document data from RAG service when a document is deleted from the database.
📝 WalkthroughWalkthroughThis pull request refactors the multi-tenant architecture across the platform and RAG services, replacing dataset-based identifiers with team-based identifiers throughout the codebase. Changes include: replacing Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Comment |
There was a problem hiding this comment.
Actionable comments posted: 16
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
services/platform/convex/agent_tools/rag/query_rag_context.ts (1)
138-203: Enforce team IDs requirement to maintain isolation.The comment says team IDs are required, but the request is sent without
team_idswhen empty/undefined, which can produce unscoped results. Fail fast or skip the query when team IDs are missing.🛡️ Suggested patch
- if (options?.teamIds && options.teamIds.length > 0) { - requestPayload.team_ids = options.teamIds; - } + if (!options?.teamIds || options.teamIds.length === 0) { + debugLog('RAG context skipped: no team IDs provided', { + userId: options?.userId, + }); + return undefined; + } + requestPayload.team_ids = options.teamIds;services/rag/app/services/cognee/service.py (1)
820-843: Critical: Undefined variableexplicit_userand unreachable dead code.This
elsebranch referencesexplicit_userwhich is never defined in thesearchmethod. This would cause aNameErrorat runtime.However, this branch is also unreachable because lines 718-720 return early when
not datasets. This entire block appears to be dead code from a previous refactor.🐛 Proposed fix: remove unreachable dead code
chunks_extracted += 1 logger.debug(f"Dataset '{dataset}' returned {chunks_extracted} chunks") - else: - # No datasets specified - search globally - global_search_kwargs: dict[str, Any] = { - "query_type": cognee_search_type, - "top_k": effective_top_k, - "system_prompt": system_prompt, - } - if explicit_user: - global_search_kwargs["user"] = explicit_user - - global_results = await cognee.search(query, **global_search_kwargs) - # Extract actual chunks from cognee's nested structure - for item in global_results or []: - if isinstance(item, dict) and "search_result" in item: - search_results = item.get("search_result", []) - if search_results: - # Handle both nested [[chunk]] and flat [chunk] structures - if isinstance(search_results[0], list): - raw_results.extend(search_results[0]) - else: - raw_results.extend(search_results) - else: - raw_results.append(item)
🤖 Fix all issues with AI agents
In `@services/platform/convex/documents/mutations.ts`:
- Around line 139-142: The scheduled action args are serialized and expect
documentId as a string, but we're passing args.documentId directly; update the
call to ctx.scheduler.runAfter(...) that invokes
internal.documents.actions.deleteDocumentFromRag to pass a serialized string
(e.g., String(args.documentId) or args.documentId.toString()) for the documentId
field so the action's v.string() validation always receives a string.
In `@services/platform/convex/lib/rag_prefetch/index.ts`:
- Around line 247-262: The code may call fetchRagResults with an empty teamIds
array (derived from options.userTeamIds) which can return unscoped data before
rag_search enforces teams; add a guard immediately after computing const teamIds
= options.userTeamIds to short‑circuit the prefetch when teamIds is falsy or
teamIds.length === 0 (e.g., skip fetchRagResults and return or continue without
prefetch), ensuring no call to fetchRagResults occurs with an empty teamIds;
reference teamIds, options.userTeamIds, and fetchRagResults in your change so
the early-return is obvious and preserves existing debugLog usage.
In `@services/rag/app/models.py`:
- Around line 74-99: Extract the duplicated tenant validation/sanitization into
a module-level helper (e.g., _validate_and_sanitize_tenant_ids) and update each
model_validator method (validate_and_sanitize on DocumentAddRequest,
QueryRequest, GenerateRequest) to call that helper; the helper should accept
user_id and team_ids, perform the existing checks and sanitization (including
importing sanitize_team_id), return sanitized team_ids or None, and raise the
same ValueError messages on failure, and each validator should set self.team_ids
only when the helper returns a non-None sanitized list and then return self.
In `@services/rag/app/routers/documents.py`:
- Around line 184-199: Extract the duplicated comma-separated team_id
parsing/sanitization from upload_document and delete_document into a helper
function parse_team_ids(team_ids: str | None, required: bool = False) that uses
sanitize_team_id to trim, sanitize and collect non-empty IDs, returns List[str]
or None, and when required=True raises
HTTPException(status.HTTP_400_BAD_REQUEST) with "At least one valid team_id is
required" if no valid IDs are found; replace the inline loops in upload_document
and delete_document with calls to parse_team_ids(team_ids, required=True/False)
and ensure the helper imports sanitize_team_id and HTTPException/status as
needed.
- Around line 415-430: When parsing team_ids in the router before calling
cognee_service.delete_document, detect the case where team_ids was provided but
all entries sanitized to empty (i.e., team_id_list is an empty list) and reject
it the same way upload_document does: raise an HTTPException (400/appropriate)
indicating invalid team_ids instead of passing [] into
cognee_service.delete_document. Update the logic around sanitize_team_id and
team_id_list so that if team_ids is truthy but team_id_list ends up empty you
raise the error; keep the call to cognee_service.delete_document(document_id,
mode=mode, team_ids=team_id_list) unchanged otherwise.
In `@services/rag/app/services/cognee/service.py`:
- Around line 472-478: The except block is catching the deprecated alias
asyncio.TimeoutError; update the exception handler to catch the builtin
TimeoutError instead (replace "except asyncio.TimeoutError:" with "except
TimeoutError:" in the vision pre-processing timeout handling around the block
that computes vision_elapsed and raises the more descriptive TimeoutError for
document_id), and remove any now-unused asyncio import if applicable.
- Line 305: Remove the redundant local import of sql_delete inside the function
in services/rag/app/services/cognee/service.py: the module already imports
"delete as sql_delete" at the top-level, so delete the in-function line "from
sqlalchemy import delete as sql_delete" and rely on the existing top-level
symbol (used with sql_delete(...) where present), leaving all references to
sql_delete unchanged.
In `@services/rag/app/services/cognee/tenant_manager.py`:
- Around line 66-147: The get_or_create_team_context and
get_or_create_user_for_context flows can race across processes when both miss
_get_user_by_email and attempt create_user/create_tenant; wrap the create_user
and create_tenant calls in try/except blocks that catch uniqueness/conflict
errors, and on catching such an error re-run the lookup steps (e.g., call
_get_user_by_email and/or get_user) to fetch the already-created user/tenant so
the function becomes idempotent; specifically modify get_or_create_team_context
to catch exceptions around create_user and create_tenant (referencing
create_user, create_tenant, _get_user_by_email, get_user) and similarly update
get_or_create_user_for_context (referencing the same helpers) so both functions
retry lookup on conflict and return the existing user rather than failing.
In `@services/rag/app/services/cognee/utils.py`:
- Around line 182-211: sanitize_team_id may return an empty string for inputs
made entirely of stripped characters, causing dataset collisions; update
sanitize_team_id to guard against empty results by computing a deterministic
fallback (e.g., a stable hash like sha256 hex of the original team_id, truncated
to a safe length) or by raising a ValueError; ensure the fallback is
deterministic and still matches allowed characters
(alphanumeric/underscore/hyphen), and document the behavior in the function
docstring so callers know they may receive a hashed fallback instead of an empty
string.
In `@services/rag/app/services/vision/cache.py`:
- Around line 68-83: The current LRU uses list.pop(0) and list.remove(key) in
_evict_if_needed and _update_access_order causing O(n) operations; replace the
list-based approach with collections.OrderedDict for O(1) operations: change the
caches (e.g., _ocr_cache and _description_cache in the VisionCache) to
OrderedDict[str, str], remove the access_order parameter and related helpers,
implement _evict_if_needed to call cache.popitem(last=False) until under
max_size, and update get/set methods to use cache.move_to_end(key) when
accessed/inserted; update signatures of _evict_if_needed and any callers to
accept an OrderedDict and max_size instead of (cache, access_order, max_size).
- Around line 29-47: Remove the dead lru_cache-wrapped helper functions
_get_cached_ocr and _get_cached_description: they always return None, are not
used anywhere, and conflict with the actual caching implemented in VisionCache;
delete both function definitions and any associated imports (e.g., lru_cache,
OCR_CACHE_SIZE, DESCRIPTION_CACHE_SIZE if now unused) so VisionCache remains the
single source of truth for OCR/description caching.
In `@services/rag/app/services/vision/openai_client.py`:
- Around line 143-146: Replace the asyncio-specific exception with the builtin
TimeoutError: in the exception handler that currently catches
asyncio.TimeoutError (the block raising "Vision API OCR request timed out after
{settings.vision_request_timeout}s"), change the except to catch TimeoutError
directly so the code uses the builtin alias (replace asyncio.TimeoutError with
TimeoutError) while leaving the existing error message and
settings.vision_request_timeout usage intact.
- Around line 217-220: The except block in the describe_image function is
catching asyncio.TimeoutError; change it to catch the built-in TimeoutError
(same fix as for ocr_image) so the code uses the modern builtin exception
type—update the except clause in describe_image (and any other occurrences like
ocr_image) to "except TimeoutError:" and ensure no conflicting local
TimeoutError is defined/imported so the builtin is used.
In `@services/rag/app/services/vision/pdf_extractor.py`:
- Around line 188-220: Wrap the image extraction and Vision API call in targeted
try/except blocks so a single bad image or a transient describe_image failure
doesn’t abort processing for the whole page: around the doc.extract_image(xref)
and the await vision_client.describe_image(image_bytes) inside the for img_index
loop in pdf_extractor.py, catch exceptions (log them with context including
img_index and page.number via logger.debug/error) and continue to the next
image; keep the existing semaphore usage and only append to descriptions when
the call succeeds.
- Around line 91-98: The current loop over results in the PDF extractor now
raises any page-level Exception and aborts entire PDF processing; change the
handling in the for i, result in enumerate(results): loop to log the error and
continue so partial results are preserved: when isinstance(result, Exception) do
logger.error(f"Failed to process page {i+1}: {result}") and append the page
number to a failed_pages list (do not raise), then continue; keep the existing
logic that sets page_text, page_vision_used, updates vision_used, and appends to
pages_text when page_text exists; after the loop, if failed_pages is non-empty
emit a logger.warning listing failed pages and ensure any return value or status
from the containing function (e.g., the PDF extraction function) reflects
partial success vs failures.
- Around line 168-170: The OCR try/except change causes Vision errors (including
the custom TimeoutError raised by openai_client.py) to propagate and fail
documents; update cognee/service.py to catch the custom TimeoutError in addition
to asyncio.TimeoutError where timeouts are handled (the catch around the
request/processing logic that currently handles asyncio.TimeoutError), by
importing the TimeoutError symbol from openai_client.py (or the module that
defines it) and adding it to the exception tuple (or a separate except block) so
the service falls back to the previous behavior instead of crashing the whole
document.
Trigger RAG reindex when a document's teamTags are updated to ensure team-based access control stays in sync with the RAG service.
Use explicit String() cast for cross-boundary ID serialization.
Short-circuit prefetch when no team IDs are available to prevent unscoped results leaking before rag_search enforces team isolation.
Consolidate duplicated validate_and_sanitize logic from DocumentAddRequest, QueryRequest, and GenerateRequest into _validate_and_sanitize_tenant_ids.
- Add _parse_team_ids helper to deduplicate parsing logic - Fix authorization bypass: when team_ids is provided but all values sanitize to empty, return None instead of empty list to prevent bypassing authorization checks
Remove the redundant local import of sql_delete that's already imported at module level.
asyncio.TimeoutError is a deprecated alias for the builtin TimeoutError. Using the builtin directly improves forward compatibility.
Wrap create_user and create_tenant calls in try-except blocks to handle race conditions when multiple workers try to create the same user/tenant simultaneously. On conflict, re-fetch the resource created by the other process, making the operation idempotent.
Raise ValueError when sanitize_team_id produces an empty result to prevent cross-tenant dataset collisions. Updated callers to handle the ValueError gracefully by skipping invalid team IDs.
- Remove unused @lru_cache decorated functions that always return None - Replace dict+list LRU with OrderedDict for O(1) operations via move_to_end() and popitem(last=False)
Replace deprecated asyncio.TimeoutError with builtin TimeoutError.
- Page-level errors now log and skip instead of failing entire PDF - OCR errors return empty text as fallback instead of propagating - Image description errors are caught and logged, continuing with remaining images
Summary
Test plan
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.