Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions src/app/endpoints/streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
from utils.transcripts import store_transcript
from utils.types import TurnSummary
from utils.endpoints import validate_model_provider_override
from utils.rag_processing import (
process_rag_chunks_for_streaming,
build_referenced_documents_list_for_streaming,
)

from app.endpoints.query import (
get_rag_toolgroups,
Expand Down Expand Up @@ -94,36 +98,35 @@ def stream_start_event(conversation_id: str) -> str:
)


def stream_end_event(metadata_map: dict) -> str:
def stream_end_event(metadata_map: dict, summary: TurnSummary) -> str:
"""
Yield the end of the data stream.

Format and return the end event for a streaming response,
including referenced document metadata and placeholder token
including referenced document metadata, RAG chunks, and placeholder token
counts.

Parameters:
metadata_map (dict): A mapping containing metadata about
referenced documents.
summary (TurnSummary): Summary containing RAG chunks and other turn data.

Returns:
str: A Server-Sent Events (SSE) formatted string
representing the end of the data stream.
"""
# Process RAG chunks for streaming response
rag_chunks = process_rag_chunks_for_streaming(summary)

# Build complete referenced documents list
referenced_docs = build_referenced_documents_list_for_streaming(summary, metadata_map)

return format_stream_data(
{
"event": "end",
"data": {
"referenced_documents": [
{
"doc_url": v["docs_url"],
"doc_title": v["title"],
}
for v in filter(
lambda v: ("docs_url" in v) and ("title" in v),
metadata_map.values(),
)
],
"rag_chunks": rag_chunks,
"referenced_documents": referenced_docs,
"truncated": None, # TODO(jboos): implement truncated
"input_tokens": 0, # TODO(jboos): implement input tokens
"output_tokens": 0, # TODO(jboos): implement output tokens
Expand Down Expand Up @@ -639,7 +642,7 @@ async def response_generator(
chunk_id += 1
yield event

yield stream_end_event(metadata_map)
yield stream_end_event(metadata_map, summary)

if not is_transcripts_enabled():
logger.debug("Transcript collection is disabled in the configuration")
Expand All @@ -653,7 +656,7 @@ async def response_generator(
query=query_request.query,
query_request=query_request,
summary=summary,
rag_chunks=[], # TODO(lucasagomes): implement rag_chunks
rag_chunks=summary.rag_chunks,
truncated=False, # TODO(lucasagomes): implement truncation as part
# of quota work
attachments=query_request.attachments or [],
Expand Down Expand Up @@ -795,4 +798,4 @@ async def retrieve_response(
)
response = cast(AsyncIterator[AgentTurnResponseStreamChunk], response)

return response, conversation_id
return response, conversation_id
105 changes: 105 additions & 0 deletions src/utils/rag_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Utility functions for processing RAG chunks and referenced documents in streaming responses."""

from typing import Optional, List, Dict, Set, Tuple, Any
from utils.types import TurnSummary


def process_rag_chunks_for_streaming(summary: TurnSummary) -> Optional[List[Dict[str, Any]]]:
"""
Process RAG chunks from TurnSummary into streaming response format.

Parameters:
summary (TurnSummary): Summary containing RAG chunks data.

Returns:
Optional[List[Dict[str, Any]]]: List of RAG chunks in streaming format, or None if empty.
"""
if not summary.rag_chunks:
return None

rag_chunks = [
{
"content": chunk.content,
"source": chunk.source,
"score": chunk.score
}
for chunk in summary.rag_chunks
]

return rag_chunks if rag_chunks else None
Comment on lines +7 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Verify TurnSummary.rag_chunks existence and chunk field names.

These functions assume summary.rag_chunks exists and each chunk has .content, .source, .score. Please confirm the model/type defines these.

If missing, add a typed model and a default empty list on TurnSummary. I can supply a patch.


🏁 Script executed:

#!/bin/bash
echo "TurnSummary definition:"
rg -n -C3 'class\s+TurnSummary\b' src/utils/types.py || true
echo "RAG chunk model?"
rg -nP --type=python '(class|TypedDict)\s+\w*RAG\w*Chunk' src -C2 || true

Length of output: 216


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== ls src/utils ==="
ls -la src/utils || true
echo

echo "=== src/utils/types.py (1-400) ==="
sed -n '1,400p' src/utils/types.py || true
echo

echo "=== src/utils/rag_processing.py (1-200) ==="
sed -n '1,200p' src/utils/rag_processing.py || true
echo

echo "=== grep for 'rag_chunks' ==="
rg -n --hidden -S 'rag_chunks' || true
echo

echo "=== grep for classes or TypedDicts with 'Chunk' in the name ==="
rg -n --hidden -S 'class .*Chunk' || true
rg -n --hidden -S 'TypedDict.*Chunk' || true
echo

echo "=== grep for 'content', 'source', 'score' across repo ==="
rg -n --hidden -S '\bcontent\b' || true
rg -n --hidden -S '\bsource\b' || true
rg -n --hidden -S '\bscore\b' || true

Length of output: 235034


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== src/utils/transcripts.py (1-240) ==="
sed -n '1,240p' src/utils/transcripts.py || true
echo

echo "=== src/app/endpoints/streaming_query.py (600-740) ==="
sed -n '600,740p' src/app/endpoints/streaming_query.py || true
echo

echo "=== src/app/endpoints/query.py (220-520) ==="
sed -n '220,520p' src/app/endpoints/query.py || true
echo

echo "=== tests/unit/utils/test_transcripts.py (1-240) ==="
sed -n '1,240p' tests/unit/utils/test_transcripts.py || true
echo

echo "=== tests/unit/app/endpoints/test_streaming_query.py (1-260) ==="
sed -n '1,260p' tests/unit/app/endpoints/test_streaming_query.py || true
echo

echo "=== Search for 'rag_chunks =' assignments ==="
rg -n --hidden -S 'rag_chunks\s*=' || true
echo

echo "=== Search for 'rag_chunks' occurrences (context) ==="
rg -n --hidden -S -C3 '\brag_chunks\b' || true

Length of output: 42884


Add RagChunk model and rag_chunks field to TurnSummary

TurnSummary (src/utils/types.py) does not define rag_chunks and there is no RagChunk type in the repo; code in src/utils/rag_processing.py, src/app/endpoints/streaming_query.py and src/utils/transcripts.py assumes each chunk has .content, .source, .score — this is a real runtime/type bug.

  • Define a RagChunk model in src/utils/types.py (BaseModel) with fields: content: str, source: str | None = None, score: float | None = None.
  • Add rag_chunks: list[RagChunk] = Field(default_factory=list) to TurnSummary (import Field from pydantic).
  • Update src/utils/transcripts.py signature/annotation rag_chunks: list[str] -> list[RagChunk] (or Optional[List[RagChunk]]) and adjust callers/tests as needed.
🤖 Prompt for AI Agents
In src/utils/types.py around where TurnSummary is defined (add new lines near
line 1–200), add a Pydantic BaseModel RagChunk with fields content: str, source:
Optional[str] = None, score: Optional[float] = None (import Optional from
typing), import Field from pydantic, and add rag_chunks: list[RagChunk] =
Field(default_factory=list) to TurnSummary; then update src/utils/transcripts.py
function signatures/annotations that currently use list[str] for rag_chunks to
use list[RagChunk] or Optional[List[RagChunk]] and update any imports/usages in
src/utils/rag_processing.py and src/app/endpoints/streaming_query.py to import
RagChunk and the new TurnSummary field so callers/tests reflect the new type.



def _extract_referenced_documents_from_rag_chunks(summary: TurnSummary) -> Tuple[List[Dict[str, Any]], Set[str]]:
"""
Extract referenced documents from RAG chunks for streaming format.

Parameters:
summary (TurnSummary): Summary containing RAG chunks data.

Returns:
Tuple[List[Dict[str, Any]], Set[str]]: A tuple containing the list of referenced documents
and a set of document sources already processed.
"""
referenced_docs = []
doc_sources = set()

for chunk in summary.rag_chunks:
if chunk.source and chunk.source not in doc_sources:
doc_sources.add(chunk.source)
referenced_docs.append({
"doc_url": chunk.source if chunk.source.startswith("http") else None,
"doc_title": chunk.source,
"chunk_count": sum(1 for c in summary.rag_chunks if c.source == chunk.source)
})

return referenced_docs, doc_sources


def _merge_legacy_referenced_documents(metadata_map: Dict[str, Any], doc_sources: Set[str]) -> List[Dict[str, Any]]:
"""
Merge legacy referenced documents from metadata_map with existing document sources for streaming.

Parameters:
metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents.
doc_sources (Set[str]): Set of document sources already processed.

Returns:
List[Dict[str, Any]]: List of additional referenced documents from legacy format.
"""
legacy_docs = []

for v in filter(
lambda v: ("docs_url" in v) and ("title" in v),
metadata_map.values(),
):
if v["title"] not in doc_sources:
doc_sources.add(v["title"])
legacy_docs.append({
"doc_url": v["docs_url"],
"doc_title": v["title"],
})

return legacy_docs

Comment on lines +58 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Legacy merge uses mismatched dedup key; align with RAG dedup and unify shape.

Compare against URL when available, else title; also include chunk_count for legacy docs (0) to keep schema consistent.

@@
-    for v in filter(
-        lambda v: ("docs_url" in v) and ("title" in v),
-        metadata_map.values(),
-    ):
-        if v["title"] not in doc_sources:
-            doc_sources.add(v["title"])
-            legacy_docs.append({
-                "doc_url": v["docs_url"],
-                "doc_title": v["title"],
-            })
+    for v in filter(lambda v: ("docs_url" in v) and ("title" in v), metadata_map.values()):
+        dedup_key = v.get("docs_url") or v.get("title")
+        if dedup_key and dedup_key not in doc_sources:
+            doc_sources.add(dedup_key)
+            legacy_docs.append(
+                {
+                    "doc_url": v.get("docs_url"),
+                    "doc_title": v.get("title"),
+                    "chunk_count": 0,
+                }
+            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _merge_legacy_referenced_documents(metadata_map: Dict[str, Any], doc_sources: Set[str]) -> List[Dict[str, Any]]:
"""
Merge legacy referenced documents from metadata_map with existing document sources for streaming.
Parameters:
metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents.
doc_sources (Set[str]): Set of document sources already processed.
Returns:
List[Dict[str, Any]]: List of additional referenced documents from legacy format.
"""
legacy_docs = []
for v in filter(
lambda v: ("docs_url" in v) and ("title" in v),
metadata_map.values(),
):
if v["title"] not in doc_sources:
doc_sources.add(v["title"])
legacy_docs.append({
"doc_url": v["docs_url"],
"doc_title": v["title"],
})
return legacy_docs
def _merge_legacy_referenced_documents(metadata_map: Dict[str, Any], doc_sources: Set[str]) -> List[Dict[str, Any]]:
"""
Merge legacy referenced documents from metadata_map with existing document sources for streaming.
Parameters:
metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents.
doc_sources (Set[str]): Set of document sources already processed.
Returns:
List[Dict[str, Any]]: List of additional referenced documents from legacy format.
"""
legacy_docs = []
for v in filter(lambda v: ("docs_url" in v) and ("title" in v), metadata_map.values()):
dedup_key = v.get("docs_url") or v.get("title")
if dedup_key and dedup_key not in doc_sources:
doc_sources.add(dedup_key)
legacy_docs.append(
{
"doc_url": v.get("docs_url"),
"doc_title": v.get("title"),
"chunk_count": 0,
}
)
return legacy_docs
🤖 Prompt for AI Agents
In src/utils/rag_processing.py around lines 58 to 83, the legacy merge currently
deduplicates only by title and returns a different shape; update it to
deduplicate by URL when available else title (compute key = v.get("docs_url") or
v["title"]), check/add that key to doc_sources, and append legacy_docs entries
using the unified schema including "doc_url": v.get("docs_url"), "doc_title":
v["title"], and "chunk_count": 0 so legacy docs match the RAG shape.


def build_referenced_documents_list_for_streaming(summary: TurnSummary, metadata_map: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]:
"""
Build complete list of referenced documents from both RAG chunks and legacy metadata for streaming.

Parameters:
summary (TurnSummary): Summary containing RAG chunks data.
metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents.

Returns:
Optional[List[Dict[str, Any]]]: Complete list of referenced documents, or None if empty.
"""
# Extract documents from RAG chunks
rag_docs, doc_sources = _extract_referenced_documents_from_rag_chunks(summary)

# Merge with legacy documents
legacy_docs = _merge_legacy_referenced_documents(metadata_map, doc_sources)

# Combine all documents
all_docs = rag_docs + legacy_docs

return all_docs if all_docs else None
Loading