-
Notifications
You must be signed in to change notification settings - Fork 58
Added RAG chunks in streaming_query response #556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds RAG-aware data to the streaming end event. Updates the end-event function signature to accept a TurnSummary, computes rag_chunks and referenced_documents via new utilities, and includes available_quotas at the top level. Introduces a new rag_processing module and updates call sites to pass summary and store rag_chunks. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant E as StreamingQueryEndpoint
participant U as rag_processing utils
C->>E: Request (streaming query)
activate E
E-->>C: Stream tokens (partial)
E->>U: process_rag_chunks_for_streaming(summary)
U-->>E: rag_chunks (or None)
E->>U: build_referenced_documents_list_for_streaming(summary, metadata_map)
U-->>E: referenced_documents (or None)
E-->>C: End event { data: { rag_chunks, referenced_documents, ... }, available_quotas }
deactivate E
note over C,E: End-event structure now includes precomputed RAG data
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/app/endpoints/streaming_query.py (1)
659-663: Avoid AttributeError: use safe default for rag_chunks when storing transcriptsTurnSummary doesn't declare rag_chunks; accessing summary.rag_chunks can raise at runtime — pass an empty list if missing.
Location: src/app/endpoints/streaming_query.py (around lines 654–663)
- rag_chunks=summary.rag_chunks, + rag_chunks=(getattr(summary, "rag_chunks", None) or []),
🧹 Nitpick comments (3)
src/utils/rag_processing.py (3)
32-56: Duplicate detection and O(n²) counting; normalize keys and pre-count.
- Dedup set compares rag
chunk.sourcevs legacytitlelater, causing duplicates.chunk_countrecomputes counts per source inside the loop (O(n²)).Refactor to pre-count sources and use a unified dedup key (URL if present, else title).
@@ -def _extract_referenced_documents_from_rag_chunks(summary: TurnSummary) -> Tuple[List[Dict[str, Any]], Set[str]]: +def _extract_referenced_documents_from_rag_chunks(summary: TurnSummary) -> Tuple[List[Dict[str, Any]], Set[str]]: @@ - referenced_docs = [] - doc_sources = set() - - for chunk in summary.rag_chunks: - if chunk.source and chunk.source not in doc_sources: - doc_sources.add(chunk.source) - referenced_docs.append({ - "doc_url": chunk.source if chunk.source.startswith("http") else None, - "doc_title": chunk.source, - "chunk_count": sum(1 for c in summary.rag_chunks if c.source == chunk.source) - }) + referenced_docs: List[Dict[str, Any]] = [] + doc_sources: Set[str] = set() + rag_chunks = getattr(summary, "rag_chunks", []) or [] + # Pre-count occurrences per source + from collections import Counter + counts = Counter(c.source for c in rag_chunks if getattr(c, "source", None)) + for source, cnt in counts.items(): + key = str(source) # normalized dedup key (prefer URL if provided) + doc_sources.add(key) + referenced_docs.append( + { + "doc_url": key if key.startswith("http") else None, + "doc_title": key, + "chunk_count": cnt, + } + ) @@ - return referenced_docs, doc_sources + return referenced_docs, doc_sources
85-106: Optionally sort referenced documents for stability.Stable ordering (e.g., by
chunk_countdesc, then title) improves deterministic UIs/tests.@@ - all_docs = rag_docs + legacy_docs + all_docs = rag_docs + legacy_docs + # Stable order: higher chunk_count first, then title + all_docs.sort(key=lambda d: (-int(d.get("chunk_count", 0)), str(d.get("doc_title") or "")))
1-5: Minor typing/style nit (optional).You can use built-in generics (
list,dict,set) in Python 3.11+ to reduce imports fromtyping. Non-blocking.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/app/endpoints/streaming_query.py(5 hunks)src/utils/rag_processing.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/app/endpoints/streaming_query.py (2)
src/utils/rag_processing.py (2)
process_rag_chunks_for_streaming(7-29)build_referenced_documents_list_for_streaming(85-105)src/utils/types.py (1)
TurnSummary(59-78)
src/utils/rag_processing.py (1)
src/utils/types.py (1)
TurnSummary(59-78)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: e2e_tests
🔇 Additional comments (6)
src/app/endpoints/streaming_query.py (6)
645-646: Call site updated correctly.
801-801: LGTM. Return value unchanged; no concerns.
101-117: Public API change: stream_end_event now requires TurnSummary — verified single call site.
Repository search found only the function definition and a single call at src/app/endpoints/streaming_query.py:645; no other usages located.
118-123: RAG chunks missing from TurnSummary — verify and populate.process_rag_chunks_for_streaming(summary) expects summary.rag_chunks, but TurnSummary (src/utils/types.py, ~line 59) contains llm_response and no rag_chunks; a repo search returned "unrecognized file type: python" so presence of rag_chunks elsewhere couldn't be confirmed. Either add rag_chunks to TurnSummary and populate it from tool responses, or derive/attach rag_chunks here from metadata_map/tool outputs. I can draft the patch once you confirm the preferred source.
38-41: Import OK — utils package presentsrc/utils/init.py exists and src/utils/rag_processing.py is present; importing via from utils.rag_processing import ... is valid without re-exporting in init.py and other files already import from utils, so no change required.
128-135: Stabilize end-event schema — emit empty lists for RAG/referenced docsFile: src/app/endpoints/streaming_query.py (lines 128–135). process_rag_chunks_for_streaming / build_referenced_documents_list_for_streaming can return None while other code/tests expect lists; replace with empty arrays to avoid client conditionals.
- "rag_chunks": rag_chunks, - "referenced_documents": referenced_docs, + "rag_chunks": rag_chunks or [], + "referenced_documents": referenced_docs or [],Confirm whether "available_quotas" should be at the top level (no other usages found in repo).
| def process_rag_chunks_for_streaming(summary: TurnSummary) -> Optional[List[Dict[str, Any]]]: | ||
| """ | ||
| Process RAG chunks from TurnSummary into streaming response format. | ||
|
|
||
| Parameters: | ||
| summary (TurnSummary): Summary containing RAG chunks data. | ||
|
|
||
| Returns: | ||
| Optional[List[Dict[str, Any]]]: List of RAG chunks in streaming format, or None if empty. | ||
| """ | ||
| if not summary.rag_chunks: | ||
| return None | ||
|
|
||
| rag_chunks = [ | ||
| { | ||
| "content": chunk.content, | ||
| "source": chunk.source, | ||
| "score": chunk.score | ||
| } | ||
| for chunk in summary.rag_chunks | ||
| ] | ||
|
|
||
| return rag_chunks if rag_chunks else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify TurnSummary.rag_chunks existence and chunk field names.
These functions assume summary.rag_chunks exists and each chunk has .content, .source, .score. Please confirm the model/type defines these.
If missing, add a typed model and a default empty list on TurnSummary. I can supply a patch.
🏁 Script executed:
#!/bin/bash
echo "TurnSummary definition:"
rg -n -C3 'class\s+TurnSummary\b' src/utils/types.py || true
echo "RAG chunk model?"
rg -nP --type=python '(class|TypedDict)\s+\w*RAG\w*Chunk' src -C2 || trueLength of output: 216
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== ls src/utils ==="
ls -la src/utils || true
echo
echo "=== src/utils/types.py (1-400) ==="
sed -n '1,400p' src/utils/types.py || true
echo
echo "=== src/utils/rag_processing.py (1-200) ==="
sed -n '1,200p' src/utils/rag_processing.py || true
echo
echo "=== grep for 'rag_chunks' ==="
rg -n --hidden -S 'rag_chunks' || true
echo
echo "=== grep for classes or TypedDicts with 'Chunk' in the name ==="
rg -n --hidden -S 'class .*Chunk' || true
rg -n --hidden -S 'TypedDict.*Chunk' || true
echo
echo "=== grep for 'content', 'source', 'score' across repo ==="
rg -n --hidden -S '\bcontent\b' || true
rg -n --hidden -S '\bsource\b' || true
rg -n --hidden -S '\bscore\b' || trueLength of output: 235034
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== src/utils/transcripts.py (1-240) ==="
sed -n '1,240p' src/utils/transcripts.py || true
echo
echo "=== src/app/endpoints/streaming_query.py (600-740) ==="
sed -n '600,740p' src/app/endpoints/streaming_query.py || true
echo
echo "=== src/app/endpoints/query.py (220-520) ==="
sed -n '220,520p' src/app/endpoints/query.py || true
echo
echo "=== tests/unit/utils/test_transcripts.py (1-240) ==="
sed -n '1,240p' tests/unit/utils/test_transcripts.py || true
echo
echo "=== tests/unit/app/endpoints/test_streaming_query.py (1-260) ==="
sed -n '1,260p' tests/unit/app/endpoints/test_streaming_query.py || true
echo
echo "=== Search for 'rag_chunks =' assignments ==="
rg -n --hidden -S 'rag_chunks\s*=' || true
echo
echo "=== Search for 'rag_chunks' occurrences (context) ==="
rg -n --hidden -S -C3 '\brag_chunks\b' || trueLength of output: 42884
Add RagChunk model and rag_chunks field to TurnSummary
TurnSummary (src/utils/types.py) does not define rag_chunks and there is no RagChunk type in the repo; code in src/utils/rag_processing.py, src/app/endpoints/streaming_query.py and src/utils/transcripts.py assumes each chunk has .content, .source, .score — this is a real runtime/type bug.
- Define a RagChunk model in src/utils/types.py (BaseModel) with fields: content: str, source: str | None = None, score: float | None = None.
- Add rag_chunks: list[RagChunk] = Field(default_factory=list) to TurnSummary (import Field from pydantic).
- Update src/utils/transcripts.py signature/annotation rag_chunks: list[str] -> list[RagChunk] (or Optional[List[RagChunk]]) and adjust callers/tests as needed.
🤖 Prompt for AI Agents
In src/utils/types.py around where TurnSummary is defined (add new lines near
line 1–200), add a Pydantic BaseModel RagChunk with fields content: str, source:
Optional[str] = None, score: Optional[float] = None (import Optional from
typing), import Field from pydantic, and add rag_chunks: list[RagChunk] =
Field(default_factory=list) to TurnSummary; then update src/utils/transcripts.py
function signatures/annotations that currently use list[str] for rag_chunks to
use list[RagChunk] or Optional[List[RagChunk]] and update any imports/usages in
src/utils/rag_processing.py and src/app/endpoints/streaming_query.py to import
RagChunk and the new TurnSummary field so callers/tests reflect the new type.
| def _merge_legacy_referenced_documents(metadata_map: Dict[str, Any], doc_sources: Set[str]) -> List[Dict[str, Any]]: | ||
| """ | ||
| Merge legacy referenced documents from metadata_map with existing document sources for streaming. | ||
|
|
||
| Parameters: | ||
| metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents. | ||
| doc_sources (Set[str]): Set of document sources already processed. | ||
|
|
||
| Returns: | ||
| List[Dict[str, Any]]: List of additional referenced documents from legacy format. | ||
| """ | ||
| legacy_docs = [] | ||
|
|
||
| for v in filter( | ||
| lambda v: ("docs_url" in v) and ("title" in v), | ||
| metadata_map.values(), | ||
| ): | ||
| if v["title"] not in doc_sources: | ||
| doc_sources.add(v["title"]) | ||
| legacy_docs.append({ | ||
| "doc_url": v["docs_url"], | ||
| "doc_title": v["title"], | ||
| }) | ||
|
|
||
| return legacy_docs | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Legacy merge uses mismatched dedup key; align with RAG dedup and unify shape.
Compare against URL when available, else title; also include chunk_count for legacy docs (0) to keep schema consistent.
@@
- for v in filter(
- lambda v: ("docs_url" in v) and ("title" in v),
- metadata_map.values(),
- ):
- if v["title"] not in doc_sources:
- doc_sources.add(v["title"])
- legacy_docs.append({
- "doc_url": v["docs_url"],
- "doc_title": v["title"],
- })
+ for v in filter(lambda v: ("docs_url" in v) and ("title" in v), metadata_map.values()):
+ dedup_key = v.get("docs_url") or v.get("title")
+ if dedup_key and dedup_key not in doc_sources:
+ doc_sources.add(dedup_key)
+ legacy_docs.append(
+ {
+ "doc_url": v.get("docs_url"),
+ "doc_title": v.get("title"),
+ "chunk_count": 0,
+ }
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _merge_legacy_referenced_documents(metadata_map: Dict[str, Any], doc_sources: Set[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Merge legacy referenced documents from metadata_map with existing document sources for streaming. | |
| Parameters: | |
| metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents. | |
| doc_sources (Set[str]): Set of document sources already processed. | |
| Returns: | |
| List[Dict[str, Any]]: List of additional referenced documents from legacy format. | |
| """ | |
| legacy_docs = [] | |
| for v in filter( | |
| lambda v: ("docs_url" in v) and ("title" in v), | |
| metadata_map.values(), | |
| ): | |
| if v["title"] not in doc_sources: | |
| doc_sources.add(v["title"]) | |
| legacy_docs.append({ | |
| "doc_url": v["docs_url"], | |
| "doc_title": v["title"], | |
| }) | |
| return legacy_docs | |
| def _merge_legacy_referenced_documents(metadata_map: Dict[str, Any], doc_sources: Set[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Merge legacy referenced documents from metadata_map with existing document sources for streaming. | |
| Parameters: | |
| metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents. | |
| doc_sources (Set[str]): Set of document sources already processed. | |
| Returns: | |
| List[Dict[str, Any]]: List of additional referenced documents from legacy format. | |
| """ | |
| legacy_docs = [] | |
| for v in filter(lambda v: ("docs_url" in v) and ("title" in v), metadata_map.values()): | |
| dedup_key = v.get("docs_url") or v.get("title") | |
| if dedup_key and dedup_key not in doc_sources: | |
| doc_sources.add(dedup_key) | |
| legacy_docs.append( | |
| { | |
| "doc_url": v.get("docs_url"), | |
| "doc_title": v.get("title"), | |
| "chunk_count": 0, | |
| } | |
| ) | |
| return legacy_docs |
🤖 Prompt for AI Agents
In src/utils/rag_processing.py around lines 58 to 83, the legacy merge currently
deduplicates only by title and returns a different shape; update it to
deduplicate by URL when available else title (compute key = v.get("docs_url") or
v["title"]), check/add that key to doc_sources, and append legacy_docs entries
using the unified schema including "doc_url": v.get("docs_url"), "doc_title":
v["title"], and "chunk_count": 0 so legacy docs match the RAG shape.
|
I need to update this file as per the changes required in #550 |
Description
Type of change
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
New Features
Refactor