From f01bf9a248cecf9d0bb3d10dad6cc5ba9aa01b51 Mon Sep 17 00:00:00 2001 From: bsatapat Date: Wed, 17 Sep 2025 16:14:16 +0530 Subject: [PATCH] Added RAG chunks in streaming_query response --- src/app/endpoints/streaming_query.py | 33 +++++---- src/utils/rag_processing.py | 105 +++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 15 deletions(-) create mode 100644 src/utils/rag_processing.py diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 60d9d4d6..f960fe93 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -35,6 +35,10 @@ from utils.transcripts import store_transcript from utils.types import TurnSummary from utils.endpoints import validate_model_provider_override +from utils.rag_processing import ( + process_rag_chunks_for_streaming, + build_referenced_documents_list_for_streaming, +) from app.endpoints.query import ( get_rag_toolgroups, @@ -94,36 +98,35 @@ def stream_start_event(conversation_id: str) -> str: ) -def stream_end_event(metadata_map: dict) -> str: +def stream_end_event(metadata_map: dict, summary: TurnSummary) -> str: """ Yield the end of the data stream. Format and return the end event for a streaming response, - including referenced document metadata and placeholder token + including referenced document metadata, RAG chunks, and placeholder token counts. Parameters: metadata_map (dict): A mapping containing metadata about referenced documents. + summary (TurnSummary): Summary containing RAG chunks and other turn data. Returns: str: A Server-Sent Events (SSE) formatted string representing the end of the data stream. """ + # Process RAG chunks for streaming response + rag_chunks = process_rag_chunks_for_streaming(summary) + + # Build complete referenced documents list + referenced_docs = build_referenced_documents_list_for_streaming(summary, metadata_map) + return format_stream_data( { "event": "end", "data": { - "referenced_documents": [ - { - "doc_url": v["docs_url"], - "doc_title": v["title"], - } - for v in filter( - lambda v: ("docs_url" in v) and ("title" in v), - metadata_map.values(), - ) - ], + "rag_chunks": rag_chunks, + "referenced_documents": referenced_docs, "truncated": None, # TODO(jboos): implement truncated "input_tokens": 0, # TODO(jboos): implement input tokens "output_tokens": 0, # TODO(jboos): implement output tokens @@ -639,7 +642,7 @@ async def response_generator( chunk_id += 1 yield event - yield stream_end_event(metadata_map) + yield stream_end_event(metadata_map, summary) if not is_transcripts_enabled(): logger.debug("Transcript collection is disabled in the configuration") @@ -653,7 +656,7 @@ async def response_generator( query=query_request.query, query_request=query_request, summary=summary, - rag_chunks=[], # TODO(lucasagomes): implement rag_chunks + rag_chunks=summary.rag_chunks, truncated=False, # TODO(lucasagomes): implement truncation as part # of quota work attachments=query_request.attachments or [], @@ -795,4 +798,4 @@ async def retrieve_response( ) response = cast(AsyncIterator[AgentTurnResponseStreamChunk], response) - return response, conversation_id + return response, conversation_id \ No newline at end of file diff --git a/src/utils/rag_processing.py b/src/utils/rag_processing.py new file mode 100644 index 00000000..a630d65a --- /dev/null +++ b/src/utils/rag_processing.py @@ -0,0 +1,105 @@ +"""Utility functions for processing RAG chunks and referenced documents in streaming responses.""" + +from typing import Optional, List, Dict, Set, Tuple, Any +from utils.types import TurnSummary + + +def process_rag_chunks_for_streaming(summary: TurnSummary) -> Optional[List[Dict[str, Any]]]: + """ + Process RAG chunks from TurnSummary into streaming response format. + + Parameters: + summary (TurnSummary): Summary containing RAG chunks data. + + Returns: + Optional[List[Dict[str, Any]]]: List of RAG chunks in streaming format, or None if empty. + """ + if not summary.rag_chunks: + return None + + rag_chunks = [ + { + "content": chunk.content, + "source": chunk.source, + "score": chunk.score + } + for chunk in summary.rag_chunks + ] + + return rag_chunks if rag_chunks else None + + +def _extract_referenced_documents_from_rag_chunks(summary: TurnSummary) -> Tuple[List[Dict[str, Any]], Set[str]]: + """ + Extract referenced documents from RAG chunks for streaming format. + + Parameters: + summary (TurnSummary): Summary containing RAG chunks data. + + Returns: + Tuple[List[Dict[str, Any]], Set[str]]: A tuple containing the list of referenced documents + and a set of document sources already processed. + """ + referenced_docs = [] + doc_sources = set() + + for chunk in summary.rag_chunks: + if chunk.source and chunk.source not in doc_sources: + doc_sources.add(chunk.source) + referenced_docs.append({ + "doc_url": chunk.source if chunk.source.startswith("http") else None, + "doc_title": chunk.source, + "chunk_count": sum(1 for c in summary.rag_chunks if c.source == chunk.source) + }) + + return referenced_docs, doc_sources + + +def _merge_legacy_referenced_documents(metadata_map: Dict[str, Any], doc_sources: Set[str]) -> List[Dict[str, Any]]: + """ + Merge legacy referenced documents from metadata_map with existing document sources for streaming. + + Parameters: + metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents. + doc_sources (Set[str]): Set of document sources already processed. + + Returns: + List[Dict[str, Any]]: List of additional referenced documents from legacy format. + """ + legacy_docs = [] + + for v in filter( + lambda v: ("docs_url" in v) and ("title" in v), + metadata_map.values(), + ): + if v["title"] not in doc_sources: + doc_sources.add(v["title"]) + legacy_docs.append({ + "doc_url": v["docs_url"], + "doc_title": v["title"], + }) + + return legacy_docs + + +def build_referenced_documents_list_for_streaming(summary: TurnSummary, metadata_map: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]: + """ + Build complete list of referenced documents from both RAG chunks and legacy metadata for streaming. + + Parameters: + summary (TurnSummary): Summary containing RAG chunks data. + metadata_map (Dict[str, Any]): A mapping containing metadata about referenced documents. + + Returns: + Optional[List[Dict[str, Any]]]: Complete list of referenced documents, or None if empty. + """ + # Extract documents from RAG chunks + rag_docs, doc_sources = _extract_referenced_documents_from_rag_chunks(summary) + + # Merge with legacy documents + legacy_docs = _merge_legacy_referenced_documents(metadata_map, doc_sources) + + # Combine all documents + all_docs = rag_docs + legacy_docs + + return all_docs if all_docs else None