diff --git a/docs/byok_guide.md b/docs/byok_guide.md new file mode 100644 index 00000000..d81b6b6b --- /dev/null +++ b/docs/byok_guide.md @@ -0,0 +1,438 @@ +# BYOK (Bring Your Own Knowledge) Feature Documentation + +## Overview + +The BYOK (Bring Your Own Knowledge) feature in Lightspeed Core enables users to integrate their own knowledge sources into the AI system through Retrieval-Augmented Generation (RAG) functionality. This feature allows the AI to access and utilize custom knowledge bases to provide more accurate, contextual, and domain-specific responses. + +--- + +## Table of Contents + +* [What is BYOK?](#what-is-byok) +* [How BYOK Works](#how-byok-works) +* [Prerequisites](#prerequisites) +* [Configuration Guide](#configuration-guide) + * [Step 1: Prepare Your Knowledge Sources](#step-1-prepare-your-knowledge-sources) + * [Step 2: Create Vector Database](#step-2-create-vector-database) + * [Step 3: Configure Embedding Model](#step-3-configure-embedding-model) + * [Step 4: Configure Llama Stack](#step-4-configure-llama-stack) + * [Step 5: Enable RAG Tools](#step-5-enable-rag-tools) +* [Supported Vector Database Types](#supported-vector-database-types) +* [Configuration Examples](#configuration-examples) +* [Conclusion](#conclusion) + +--- + +## What is BYOK? + +BYOK (Bring Your Own Knowledge) is Lightspeed Core's implementation of Retrieval-Augmented Generation (RAG) that allows you to: + +- **Integrate custom knowledge sources**: Add your organization's documentation, manuals, FAQs, or any text-based knowledge +- **Enhance AI responses**: Provide contextual, accurate answers based on your specific domain knowledge +- **Maintain data control**: Keep your knowledge sources within your infrastructure +- **Improve relevance**: Get responses that are tailored to your organization's context and terminology + +## How BYOK Works + +The BYOK system operates through a sophisticated chain of components: + +1. **Agent Orchestrator**: The AI agent acts as the central coordinator, using the LLM as its reasoning engine +2. **Knowledge Search**: When the agent needs external information, it queries your custom vector database +3. **Vector Database**: Your indexed knowledge sources, stored as vector embeddings for semantic search +4. **Embedding Model**: Converts queries and documents into vector representations for similarity matching +5. **Context Integration**: Retrieved knowledge is integrated into the AI's response generation process + +```mermaid +graph TD + A[User Query] --> B[AI Agent] + B --> C{Need External Knowledge?} + C -->|Yes| D[Knowledge Search Tool] + C -->|No| E[Generate Response] + D --> F[Vector Database] + F --> G[Retrieve Relevant Context] + G --> H[Integrate Context] + H --> E + E --> I[Response to User] +``` + +--- + +## Prerequisites + +Before implementing BYOK, ensure you have: + +### Required Tools +- **rag-content tool**: For creating compatible vector databases + - Repository: https://github.com/lightspeed-core/rag-content + - Used for indexing your knowledge sources + +### System Requirements +- **Llama Stack**: Compatible vector database backend +- **Embedding Model**: Local or downloadable embedding model +- **LLM Provider**: OpenAI, vLLM, or other supported inference provider + +### Knowledge Sources +- Text-based documents (PDFs, Markdown, TXT, etc.) +- Structured data that can be converted to text +- Documentation, manuals, FAQs, knowledge bases + +--- + +## Configuration Guide + +### Step 1: Prepare Your Knowledge Sources + +1. **Collect your documents**: Gather all text-based knowledge sources you want to include +2. **Organize content**: Structure your documents for optimal indexing +3. **Format validation**: Ensure documents are in supported formats (PDF, TXT, MD, etc.) + +### Step 2: Create Vector Database + +Use the `rag-content` tool to create a compatible vector database: +Please refer https://github.com/lightspeed-core/rag-content to create your vector database + +**Important Notes:** +- The vector database must be compatible with Llama Stack +- Supported formats: + You can generate the vector database either using: + Llama-Index Faiss Vector Store + Llama-Index Postgres (PGVector) Vector Store + Llama-Stack Faiss Vector-IO + Llama-Stack SQLite-vec Vector-IO +- The same embedding model must be used for both creation and querying + +### Step 3: Configure Embedding Model + +Download and configure your embedding model: +Use the embedding generation step mentioned in the rag-content repo. +For example: +```bash +mkdir ./embeddings_model +pdm run python ./scripts/download_embeddings_model.py -l ./embeddings_model/ -r sentence-transformers/all-mpnet-base-v2 +``` + +### Step 4: Configure Llama Stack + +Edit your `run.yaml` file to include BYOK configuration: + +```yaml +version: 2 +image_name: byok-configuration + +# Required APIs for BYOK +apis: +- agents +- inference +- vector_io +- tool_runtime +- safety + +models: + # Your LLM model + - model_id: your-llm-model + provider_id: openai # or your preferred provider + model_type: llm + provider_model_id: gpt-4o-mini + + # Embedding model for BYOK + - model_id: sentence-transformers/all-mpnet-base-v2 + metadata: + embedding_dimension: 768 + model_type: embedding + provider_id: sentence-transformers + provider_model_id: /path/to/embedding_models/all-mpnet-base-v2 + +providers: + inference: + # Embedding model provider + - provider_id: sentence-transformers + provider_type: inline::sentence-transformers + config: {} + + # LLM provider (example: OpenAI) + - provider_id: openai + provider_type: remote::openai + config: + api_key: ${env.OPENAI_API_KEY} + + agents: + - provider_id: meta-reference + provider_type: inline::meta-reference + config: + persistence_store: + type: sqlite + db_path: .llama/distributions/ollama/agents_store.db + responses_store: + type: sqlite + db_path: .llama/distributions/ollama/responses_store.db + + safety: + - provider_id: llama-guard + provider_type: inline::llama-guard + config: + excluded_categories: [] + + # Vector database configuration + vector_io: + - provider_id: your-knowledge-base + provider_type: inline::faiss # or remote::pgvector + config: + kvstore: + type: sqlite + db_path: /path/to/vector_db/faiss_store.db + namespace: null + + tool_runtime: + - provider_id: rag-runtime + provider_type: inline::rag-runtime + config: {} + +# Enable RAG tools +tool_groups: +- provider_id: rag-runtime + toolgroup_id: builtin::rag + +# Vector database configuration +vector_dbs: +- embedding_dimension: 768 + embedding_model: sentence-transformers/all-mpnet-base-v2 + provider_id: your-knowledge-base + vector_db_id: your-index-id # ID used during index generation +``` + +### Step 5: Enable RAG Tools + +The configuration above automatically enables the RAG tools. The system will: + +1. **Detect RAG availability**: Automatically identify when knowledge search is available +2. **Enhance prompts**: Encourage the AI to use knowledge search tools +3. **Force knowledge usage**: Modify queries to ensure knowledge base consultation + +--- + +## Supported Vector Database Types + +### 1. FAISS (Recommended) +- **Type**: Local vector database with SQLite metadata +- **Best for**: Small to medium-sized knowledge bases +- **Configuration**: `inline::faiss` +- **Storage**: SQLite database file + +```yaml +vector_io: +- provider_id: faiss-knowledge + provider_type: inline::faiss + config: + kvstore: + type: sqlite + db_path: /path/to/faiss_store.db + namespace: null +``` + +### 2. pgvector (PostgreSQL) +- **Type**: PostgreSQL with pgvector extension +- **Best for**: Large-scale deployments, shared knowledge bases +- **Configuration**: `remote::pgvector` +- **Requirements**: PostgreSQL with pgvector extension + +```yaml +vector_io: +- provider_id: pgvector-knowledge + provider_type: remote::pgvector + config: + host: localhost + port: 5432 + db: knowledge_db + user: lightspeed_user + password: ${env.DB_PASSWORD} + kvstore: + type: sqlite + db_path: .llama/distributions/pgvector/registry.db +``` + +**pgvector Table Schema:** +- `id` (text): UUID identifier of the chunk +- `document` (jsonb): JSON containing content and metadata +- `embedding` (vector(n)): The embedding vector (n = embedding dimension) + +--- + +## Configuration Examples + +### Example 1: OpenAI + FAISS +Complete configuration for OpenAI LLM with local FAISS knowledge base: + +```yaml +version: 2 +image_name: openai-faiss-byok + +apis: +- agents +- inference +- vector_io +- tool_runtime +- safety + +models: +- model_id: gpt-4o-mini + provider_id: openai + model_type: llm + provider_model_id: gpt-4o-mini + +- model_id: sentence-transformers/all-mpnet-base-v2 + metadata: + embedding_dimension: 768 + model_type: embedding + provider_id: sentence-transformers + provider_model_id: /home/user/embedding_models/all-mpnet-base-v2 + +providers: + inference: + - provider_id: sentence-transformers + provider_type: inline::sentence-transformers + config: {} + - provider_id: openai + provider_type: remote::openai + config: + api_key: ${env.OPENAI_API_KEY} + + agents: + - provider_id: meta-reference + provider_type: inline::meta-reference + config: + persistence_store: + type: sqlite + db_path: .llama/distributions/ollama/agents_store.db + responses_store: + type: sqlite + db_path: .llama/distributions/ollama/responses_store.db + + safety: + - provider_id: llama-guard + provider_type: inline::llama-guard + config: + excluded_categories: [] + + vector_io: + - provider_id: company-docs + provider_type: inline::faiss + config: + kvstore: + type: sqlite + db_path: /home/user/vector_dbs/company_docs/faiss_store.db + namespace: null + + tool_runtime: + - provider_id: rag-runtime + provider_type: inline::rag-runtime + config: {} + +tool_groups: +- provider_id: rag-runtime + toolgroup_id: builtin::rag + +vector_dbs: +- embedding_dimension: 768 + embedding_model: sentence-transformers/all-mpnet-base-v2 + provider_id: company-docs + vector_db_id: company-knowledge-index +``` + +### Example 2: vLLM + pgvector +Configuration for local vLLM inference with PostgreSQL knowledge base: + +```yaml +version: 2 +image_name: vllm-pgvector-byok + +apis: +- agents +- inference +- vector_io +- tool_runtime +- safety + +models: +- model_id: meta-llama/Llama-3.1-8B-Instruct + provider_id: vllm + model_type: llm + provider_model_id: null + +- model_id: sentence-transformers/all-mpnet-base-v2 + metadata: + embedding_dimension: 768 + model_type: embedding + provider_id: sentence-transformers + provider_model_id: sentence-transformers/all-mpnet-base-v2 + +providers: + inference: + - provider_id: sentence-transformers + provider_type: inline::sentence-transformers + config: {} + - provider_id: vllm + provider_type: remote::vllm + config: + url: http://localhost:8000/v1/ + api_token: your-token-here + + agents: + - provider_id: meta-reference + provider_type: inline::meta-reference + config: + persistence_store: + type: sqlite + db_path: .llama/distributions/ollama/agents_store.db + responses_store: + type: sqlite + db_path: .llama/distributions/ollama/responses_store.db + + safety: + - provider_id: llama-guard + provider_type: inline::llama-guard + config: + excluded_categories: [] + + vector_io: + - provider_id: enterprise-knowledge + provider_type: remote::pgvector + config: + host: postgres.company.com + port: 5432 + db: enterprise_kb + user: rag_user + password: ${env.POSTGRES_PASSWORD} + kvstore: + type: sqlite + db_path: .llama/distributions/pgvector/registry.db + + tool_runtime: + - provider_id: rag-runtime + provider_type: inline::rag-runtime + config: {} + +tool_groups: +- provider_id: rag-runtime + toolgroup_id: builtin::rag + args: null + mcp_endpoint: null + +vector_dbs: +- embedding_dimension: 768 + embedding_model: sentence-transformers/all-mpnet-base-v2 + provider_id: enterprise-knowledge + vector_db_id: enterprise-docs +``` + +--- + +## Conclusion + +The BYOK (Bring Your Own Knowledge) feature in Lightspeed Core provides powerful capabilities for integrating custom knowledge sources through RAG technology. By following this guide, you can successfully implement and configure BYOK to enhance your AI system with domain-specific knowledge. + +For additional support and advanced configurations, refer to: +- [RAG Configuration Guide](rag_guide.md) +- [Llama Stack Documentation](https://llama-stack.readthedocs.io/) +- [rag-content Tool Repository](https://github.com/lightspeed-core/rag-content) + +Remember to regularly update your knowledge sources and monitor system performance to maintain optimal BYOK functionality. \ No newline at end of file diff --git a/src/app/endpoints/streaming_query.py b/src/app/endpoints/streaming_query.py index 60d9d4d6..db9023fe 100644 --- a/src/app/endpoints/streaming_query.py +++ b/src/app/endpoints/streaming_query.py @@ -9,11 +9,9 @@ from llama_stack_client import APIConnectionError from llama_stack_client import AsyncLlamaStackClient # type: ignore from llama_stack_client.types import UserMessage # type: ignore +from llama_stack_client.types.agents import AgentTurnResponseStreamChunk # type: ignore from llama_stack_client.lib.agents.event_logger import interleaved_content_as_str -from llama_stack_client.types.agents.agent_turn_response_stream_chunk import ( - AgentTurnResponseStreamChunk, -) from llama_stack_client.types.shared import ToolCall from llama_stack_client.types.shared.interleaved_content_item import TextContentItem @@ -26,26 +24,25 @@ from client import AsyncLlamaStackClientHolder from configuration import configuration import metrics -from metrics.utils import update_llm_token_count_from_turn from models.config import Action -from models.requests import QueryRequest from models.database.conversations import UserConversation -from utils.endpoints import check_configuration_loaded, get_agent, get_system_prompt +from models.requests import QueryRequest +from utils.endpoints import check_configuration_loaded, get_agent, get_system_prompt, validate_conversation_ownership, validate_model_provider_override from utils.mcp_headers import mcp_headers_dependency, handle_mcp_headers_with_toolgroups -from utils.transcripts import store_transcript from utils.types import TurnSummary -from utils.endpoints import validate_model_provider_override +from metrics.utils import update_llm_token_count_from_turn +from models.responses import RAGChunk, ReferencedDocument from app.endpoints.query import ( + evaluate_model_hints, get_rag_toolgroups, is_input_shield, is_output_shield, is_transcripts_enabled, + store_transcript, select_model_and_provider_id, validate_attachments_metadata, - validate_conversation_ownership, persist_user_conversation_details, - evaluate_model_hints, ) logger = logging.getLogger("app.endpoints.handlers") @@ -94,7 +91,7 @@ def stream_start_event(conversation_id: str) -> str: ) -def stream_end_event(metadata_map: dict) -> str: +def stream_end_event(metadata_map: dict, summary: TurnSummary) -> str: """ Yield the end of the data stream. @@ -110,20 +107,44 @@ def stream_end_event(metadata_map: dict) -> str: str: A Server-Sent Events (SSE) formatted string representing the end of the data stream. """ + # Process RAG chunks + rag_chunks = [ + { + "content": chunk.content, + "source": chunk.source, + "score": chunk.score + } + for chunk in summary.rag_chunks + ] + + # Extract referenced documents from RAG chunks + referenced_docs = [] + doc_sources = set() + for chunk in summary.rag_chunks: + if chunk.source and chunk.source not in doc_sources: + doc_sources.add(chunk.source) + referenced_docs.append({ + "doc_url": chunk.source if chunk.source.startswith("http") else None, + "doc_title": chunk.source.split("/")[-1] if chunk.source else None, + }) + + # Add any additional referenced documents from metadata_map + for v in filter( + lambda v: ("docs_url" in v) and ("title" in v), + metadata_map.values(), + ): + if v["docs_url"] not in doc_sources: + referenced_docs.append({ + "doc_url": v["docs_url"], + "doc_title": v["title"], + }) + return format_stream_data( { "event": "end", "data": { - "referenced_documents": [ - { - "doc_url": v["docs_url"], - "doc_title": v["title"], - } - for v in filter( - lambda v: ("docs_url" in v) and ("title" in v), - metadata_map.values(), - ) - ], + "rag_chunks": rag_chunks, + "referenced_documents": referenced_docs, "truncated": None, # TODO(jboos): implement truncated "input_tokens": 0, # TODO(jboos): implement input tokens "output_tokens": 0, # TODO(jboos): implement output tokens @@ -154,28 +175,23 @@ def stream_build_event(chunk: Any, chunk_id: int, metadata_map: dict) -> Iterato """ if hasattr(chunk, "error"): yield from _handle_error_event(chunk, chunk_id) + return event_type = chunk.event.payload.event_type step_type = getattr(chunk.event.payload, "step_type", None) - match (event_type, step_type): - case (("turn_start" | "turn_awaiting_input"), _): - yield from _handle_turn_start_event(chunk_id) - case ("turn_complete", _): - yield from _handle_turn_complete_event(chunk, chunk_id) - case (_, "shield_call"): - yield from _handle_shield_event(chunk, chunk_id) - case (_, "inference"): - yield from _handle_inference_event(chunk, chunk_id) - case (_, "tool_execution"): - yield from _handle_tool_execution_event(chunk, chunk_id, metadata_map) - case _: - logger.debug( - "Unhandled event combo: event_type=%s, step_type=%s", - event_type, - step_type, - ) - yield from _handle_heartbeat_event(chunk_id) + if event_type in {"turn_start", "turn_awaiting_input"}: + yield from _handle_turn_start_event(chunk_id) + elif event_type == "turn_complete": + yield from _handle_turn_complete_event(chunk, chunk_id) + elif step_type == "shield_call": + yield from _handle_shield_event(chunk, chunk_id) + elif step_type == "inference": + yield from _handle_inference_event(chunk, chunk_id) + elif step_type == "tool_execution": + yield from _handle_tool_execution_event(chunk, chunk_id, metadata_map) + else: + yield from _handle_heartbeat_event(chunk_id) # ----------------------------------- @@ -611,6 +627,9 @@ async def response_generator( complete response for transcript storage if enabled. """ chunk_id = 0 + complete_response = "No response from the model" + + # Initialize TurnSummary to collect RAG chunks and tool calls summary = TurnSummary( llm_response="No response from the model", tool_calls=[] ) @@ -623,7 +642,7 @@ async def response_generator( if p.event_type == "turn_complete": summary.llm_response = interleaved_content_as_str( p.turn.output_message.content - ) + ) if hasattr(p.turn, 'output_message') and hasattr(p.turn.output_message, 'content') else complete_response system_prompt = get_system_prompt(query_request, configuration) try: update_llm_token_count_from_turn( @@ -631,19 +650,45 @@ async def response_generator( ) except Exception: # pylint: disable=broad-except logger.exception("Failed to update token usage metrics") + + # Process steps from the completed turn + steps = p.turn.steps or [] + logger.info("Turn complete - processing %d steps", len(steps)) + for step in steps: + logger.info("Processing step: %s", step.step_type) + if step.step_type == "tool_execution": + logger.info("Found tool_execution step with %d tool_calls", len(step.tool_calls)) + logger.info("Tool calls: %s", [tc.tool_name for tc in step.tool_calls]) + logger.info("RAG chunks before: %d", len(summary.rag_chunks)) + summary.append_tool_calls_from_llama(step) + logger.info("RAG chunks after: %d", len(summary.rag_chunks)) elif p.event_type == "step_complete": - if p.step_details.step_type == "tool_execution": + if hasattr(p, 'step_details') and p.step_details.step_type == "tool_execution": + logger.info("Step complete - tool_execution with %d tool_calls", len(p.step_details.tool_calls)) + logger.info("Tool calls: %s", [tc.tool_name for tc in p.step_details.tool_calls]) + logger.info("RAG chunks before: %d", len(summary.rag_chunks)) summary.append_tool_calls_from_llama(p.step_details) + logger.info("RAG chunks after: %d", len(summary.rag_chunks)) for event in stream_build_event(chunk, chunk_id, metadata_map): chunk_id += 1 yield event - yield stream_end_event(metadata_map) + yield stream_end_event(metadata_map, summary) if not is_transcripts_enabled(): logger.debug("Transcript collection is disabled in the configuration") else: + # Convert RAG chunks to serializable format for store_transcript + rag_chunks_for_transcript = [ + { + "content": chunk.content, + "source": chunk.source, + "score": chunk.score + } + for chunk in summary.rag_chunks + ] + store_transcript( user_id=user_id, conversation_id=conversation_id, @@ -653,7 +698,7 @@ async def response_generator( query=query_request.query, query_request=query_request, summary=summary, - rag_chunks=[], # TODO(lucasagomes): implement rag_chunks + rag_chunks=rag_chunks_for_transcript, truncated=False, # TODO(lucasagomes): implement truncation as part # of quota work attachments=query_request.attachments or [], @@ -776,18 +821,59 @@ async def retrieve_response( ), } - vector_db_ids = [ - vector_db.identifier for vector_db in await client.vector_dbs.list() - ] - toolgroups = (get_rag_toolgroups(vector_db_ids) or []) + [ + vector_dbs = await client.vector_dbs.list() + vector_db_ids = [vector_db.identifier for vector_db in vector_dbs] + # Try to get RAG toolgroups, but handle the case where they're not available + rag_toolgroups = [] + if vector_db_ids: + try: + # Check if builtin::rag tool group is available + available_toolgroups = await client.toolgroups.list() + # Try different possible attribute names for toolgroup ID + available_toolgroup_ids = [] + for tg in available_toolgroups: + if hasattr(tg, 'toolgroup_id'): + available_toolgroup_ids.append(tg.toolgroup_id) + elif hasattr(tg, 'identifier'): + available_toolgroup_ids.append(tg.identifier) + elif hasattr(tg, 'name'): + available_toolgroup_ids.append(tg.name) + elif hasattr(tg, 'id'): + available_toolgroup_ids.append(tg.id) + + logger.info("Available toolgroups: %s", available_toolgroup_ids) + + if "builtin::rag" in available_toolgroup_ids: + rag_toolgroups = get_rag_toolgroups(vector_db_ids) or [] + else: + logger.warning("builtin::rag tool group not available, skipping RAG functionality") + # Still try to create RAG toolgroups as they might work anyway + rag_toolgroups = get_rag_toolgroups(vector_db_ids) or [] + except Exception as e: + logger.warning("Failed to check available toolgroups, skipping RAG: %s", e) + # Still try to create RAG toolgroups as they might work anyway + rag_toolgroups = get_rag_toolgroups(vector_db_ids) or [] + + toolgroups = rag_toolgroups + [ mcp_server.name for mcp_server in configuration.mcp_servers ] # Convert empty list to None for consistency with existing behavior if not toolgroups: toolgroups = None + # Enhance system prompt to encourage tool usage when RAG is available + if toolgroups and any("knowledge_search" in str(tg) for tg in toolgroups): + system_prompt += "\n\nIMPORTANT: When answering questions, you MUST use the knowledge_search tool to find the most accurate and up-to-date information from the knowledge base. Always search for relevant information before providing your answer." + logger.info("Enhanced system prompt to encourage RAG tool usage") + + # Force RAG usage by modifying the query when toolgroups are available + user_query = query_request.query + if toolgroups and any("knowledge_search" in str(tg) for tg in toolgroups): + user_query = f"Please use the knowledge_search tool to find relevant information about: {query_request.query}" + logger.info("Modified query to force RAG usage: %s", user_query) + response = await agent.create_turn( - messages=[UserMessage(role="user", content=query_request.query)], + messages=[UserMessage(role="user", content=user_query)], session_id=session_id, documents=query_request.get_documents(), stream=True, @@ -795,4 +881,4 @@ async def retrieve_response( ) response = cast(AsyncIterator[AgentTurnResponseStreamChunk], response) - return response, conversation_id + return response, conversation_id \ No newline at end of file