diff --git a/infrastructure/rag/values.yaml b/infrastructure/rag/values.yaml index fe9b81f9..507e06d2 100644 --- a/infrastructure/rag/values.yaml +++ b/infrastructure/rag/values.yaml @@ -314,6 +314,13 @@ adminBackend: summarizer: SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000" SUMMARIZER_MAXIMUM_CONCURRENCY: "10" + SUMMARIZER_MAX_RETRIES: "5" + SUMMARIZER_RETRY_BASE_DELAY: "0.5" + SUMMARIZER_RETRY_MAX_DELAY: "600" + SUMMARIZER_BACKOFF_FACTOR: "2" + SUMMARIZER_ATTEMPT_CAP: "6" + SUMMARIZER_JITTER_MIN: "0.05" + SUMMARIZER_JITTER_MAX: "0.25" ragapi: RAG_API_HOST: "http://backend:8080" chunker: diff --git a/libs/README.md b/libs/README.md index e06214c1..f2b82e13 100644 --- a/libs/README.md +++ b/libs/README.md @@ -12,6 +12,7 @@ It consists of the following python packages: - [2.1 Requirements](#21-requirements) - [2.2 Endpoints](#22-endpoints) - [2.3 Replaceable parts](#23-replaceable-parts) + - [2.4 Summarizer retry behavior](#24-summarizer-retry-behavior) - [`3. Extractor API lib`](#3-extractor-api-lib) - [3.1 Requirements](#31-requirements) - [3.2 Endpoints](#32-endpoints) @@ -71,6 +72,7 @@ By default `OpenAI` is used by the evaluation. If you want to use the same LLM-c Endpoint to remove documents from the vector database. #### `/information_pieces/upload` + Endpoint to upload documents into the vector database. These documents need to have been parsed. For simplicity, a LangChain Documents like format is used. Uploaded documents are required to contain the following metadata: @@ -94,7 +96,7 @@ Uploaded documents are required to contain the following metadata: | chat_graph | [`rag_core_api.graph.graph_base.GraphBase`](./rag-core-api/src/rag_core_api/graph/graph_base.py) | [`rag_core_api.impl.graph.chat_graph.DefaultChatGraph`](./rag-core-api/src/rag_core_api/impl/graph/chat_graph.py) | Langgraph graph that contains the entire logic for question answering. | | traced_chat_graph | [`rag_core_lib.chains.async_chain.AsyncChain[Any, Any]`](./rag-core-lib/src/rag_core_lib/chains/async_chain.py)| [`rag_core_lib.impl.tracers.langfuse_traced_chain.LangfuseTracedGraph`](./rag-core-lib/src/rag_core_lib/impl/tracers/langfuse_traced_chain.py) | Wraps around the *chat_graph* and add langfuse tracing. | | evaluator | [`rag_core_api.impl.evaluator.langfuse_ragas_evaluator.LangfuseRagasEvaluator`](./rag-core-api/src/rag_core_api/impl/evaluator/langfuse_ragas_evaluator.py) | [`rag_core_api.impl.evaluator.langfuse_ragas_evaluator.LangfuseRagasEvaluator`](./rag-core-api/src/rag_core_api/impl/evaluator/langfuse_ragas_evaluator.py) | The evaulator used in the evaluate endpoint. | -| chat_endpoint | [ `rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | +| chat_endpoint | [`rag_core_api.api_endpoints.chat.Chat`](./rag-core-api/src/rag_core_api/api_endpoints/chat.py) | [`rag_core_api.impl.api_endpoints.default_chat.DefaultChat`](./rag-core-api/src/rag_core_api/impl/api_endpoints/default_chat.py) | Implementation of the chat endpoint. Default implementation just calls the *traced_chat_graph* | | ragas_llm | `langchain_core.language_models.chat_models.BaseChatModel` | `langchain_openai.ChatOpenAI` or `langchain_ollama.ChatOllama` | The LLM used for the ragas evaluation. | ## 2. Admin API Lib @@ -115,7 +117,7 @@ The following endpoints are provided by the *admin-api-lib*: All required python libraries can be found in the [pyproject.toml](./admin-api-lib/pyproject.toml) file. In addition to python libraries, the following system packages are required: -``` +```shell build-essential make ``` @@ -157,10 +159,10 @@ The extracted information will be summarized using LLM. The summary, as well as | key_value_store | [`admin_api_lib.impl.key_db.file_status_key_value_store.FileStatusKeyValueStore`](./admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py) | [`admin_api_lib.impl.key_db.file_status_key_value_store.FileStatusKeyValueStore`](./admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py) | Is used for storing the available sources and their current state. | | chunker | [`admin_api_lib.chunker.chunker.Chunker`](./admin-api-lib/src/admin_api_lib/chunker/chunker.py) | [`admin_api_lib.impl.chunker.text_chunker.TextChunker`](./admin-api-lib/src/admin_api_lib/impl/chunker/text_chunker.py) | Used for splitting the documents in managable chunks. | | document_extractor | [`admin_api_lib.extractor_api_client.openapi_client.api.extractor_api.ExtractorApi`](./admin-api-lib/src/admin_api_lib/extractor_api_client/openapi_client/api/extractor_api.py) | [`admin_api_lib.extractor_api_client.openapi_client.api.extractor_api.ExtractorApi`](./admin-api-lib/src/admin_api_lib/extractor_api_client/openapi_client/api/extractor_api.py) | Needs to be replaced if adjustments to the `extractor-api` is made. | -| rag_api | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | Needs to be replaced if changes to the `/information_pieces/remove` or `/information_pieces/upload` of the [`rag-core-api`](#rag-core-api) are made. | +| rag_api | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | [`admin_api_lib.rag_backend_client.openapi_client.api.rag_api.RagApi`](./admin-api-lib/src/admin_api_lib/rag_backend_client/openapi_client/api/rag_api.py) | Needs to be replaced if changes to the `/information_pieces/remove` or `/information_pieces/upload` of the [`rag-core-api`](#1-rag-core-api) are made. | | summarizer_prompt | `str` | [`admin_api_lib.prompt_templates.summarize_prompt.SUMMARIZE_PROMPT`](./admin-api-lib/src/admin_api_lib/prompt_templates/summarize_prompt.py) | The prompt used of the summarization. | | langfuse_manager | [`rag_core_lib.impl.langfuse_manager.langfuse_manager.LangfuseManager`](./rag-core-lib/src/rag_core_lib/impl/langfuse_manager/langfuse_manager.py) | [`rag_core_lib.impl.langfuse_manager.langfuse_manager.LangfuseManager`](./rag-core-lib/src/rag_core_lib/impl/langfuse_manager/langfuse_manager.py) | Retrieves additional settings, as well as the prompt from langfuse if available. | -| summarizer | [`admin_api_lib.summarizer.summarizer.Summarizer`](./admin-api-lib/src/admin_api_lib/summarizer/summarizer.py) | [`admin_api_lib.impl.summarizer.langchain_summarizer.LangchainSummarizer`](./admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py) | Creates the summaries. | +| summarizer | [`admin_api_lib.summarizer.summarizer.Summarizer`](./admin-api-lib/src/admin_api_lib/summarizer/summarizer.py) | [`admin_api_lib.impl.summarizer.langchain_summarizer.LangchainSummarizer`](./admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py) | Creates the summaries. Uses the shared retry decorator with optional per-summarizer overrides (see 2.4). | | untraced_information_enhancer |[`admin_api_lib.information_enhancer.information_enhancer.InformationEnhancer`](./admin-api-lib/src/admin_api_lib/information_enhancer/information_enhancer.py) | [`admin_api_lib.impl.information_enhancer.general_enhancer.GeneralEnhancer`](./admin-api-lib/src/admin_api_lib/impl/information_enhancer/general_enhancer.py) | Uses the *summarizer* to enhance the extracted documents. | | information_enhancer | [`rag_core_lib.chains.async_chain.AsyncChain[Any, Any]`](./rag-core-lib/src/rag_core_lib/chains/async_chain.py)| [`rag_core_lib.impl.tracers.langfuse_traced_chain.LangfuseTracedGraph`](./rag-core-lib/src/rag_core_lib/impl/tracers/langfuse_traced_chain.py) |Wraps around the *untraced_information_enhancer* and adds langfuse tracing. | | document_deleter |[`admin_api_lib.api_endpoints.document_deleter.DocumentDeleter`](./admin-api-lib/src/admin_api_lib/api_endpoints/document_deleter.py) | [`admin_api_lib.impl.api_endpoints.default_document_deleter.DefaultDocumentDeleter`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_document_deleter.py) | Handles deletion of sources. | @@ -169,6 +171,32 @@ The extracted information will be summarized using LLM. The summary, as well as | document_reference_retriever | [`admin_api_lib.api_endpoints.document_reference_retriever.DocumentReferenceRetriever`](./admin-api-lib/src/admin_api_lib/api_endpoints/document_reference_retriever.py) | [`admin_api_lib.impl.api_endpoints.default_document_reference_retriever.DefaultDocumentReferenceRetriever`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_document_reference_retriever.py) | Handles return of files from connected storage. | | file_uploader | [`admin_api_lib.api_endpoints.file_uploader.FileUploader`](./admin-api-lib/src/admin_api_lib/api_endpoints/file_uploader.py) | [`admin_api_lib.impl.api_endpoints.default_file_uploader.DefaultFileUploader`](./admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_file_uploader.py) | Handles upload and extraction of files. | +### 2.4 Summarizer retry behavior + +The default summarizer implementation (`LangchainSummarizer`) now uses the shared retry decorator with exponential backoff from the `rag-core-lib`. + +- Decorator: `rag_core_lib.impl.utils.retry_decorator.retry_with_backoff` +- Base settings (fallback): [`RetryDecoratorSettings`](./rag-core-lib/src/rag_core_lib/impl/settings/retry_decorator_settings.py) +- Per-summarizer overrides: [`SummarizerSettings`](./admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py) + +How it resolves settings + +- Each field in `SummarizerSettings` is optional. When a field is provided (not None), it overrides the corresponding value from `RetryDecoratorSettings`. +- When a field is not provided (None), the summarizer falls back to the value from `RetryDecoratorSettings`. + +Configuring via environment variables + +- Summarizer-specific (prefix `SUMMARIZER_`): + - `SUMMARIZER_MAX_RETRIES` + - `SUMMARIZER_RETRY_BASE_DELAY` + - `SUMMARIZER_RETRY_MAX_DELAY` + - `SUMMARIZER_BACKOFF_FACTOR` + - `SUMMARIZER_ATTEMPT_CAP` + - `SUMMARIZER_JITTER_MIN` + - `SUMMARIZER_JITTER_MAX` +- Global fallback (prefix `RETRY_DECORATOR_`): see section [4.2](#42-retry-decorator-exponential-backoff) for all keys and defaults. +- Helm chart: set the same keys under `adminBackend.envs.summarizer` in [infrastructure/rag/values.yaml](../infrastructure/rag/values.yaml). + ## 3. Extractor API Lib The Extractor Library contains components that provide document parsing capabilities for various file formats and web sources. It supports extracting content from PDF, DOCX, XML files, as well as web pages via sitemaps and Confluence pages. It also includes a default `dependency_container`, that is pre-configured and is a good starting point for most use-cases. This API should not be exposed by ingress and only used for internally. @@ -197,6 +225,7 @@ tesseract-ocr-eng ### 3.2 Endpoints #### `/extract_from_file` + This endpoint will extract the information from PDF,PTTX,WORD,XML files. It will load the files from the connected storage. The following types of information will be extracted: @@ -215,6 +244,7 @@ The following types of information can be extracted: - `IMAGE`: image found in the document For sitemap sources, additional parameters can be provided, e.g.: + - `web_path`: The URL of the XML sitemap to crawl - `filter_urls`: JSON array of URL patterns to filter pages (optional) - `header_template`: JSON object for custom HTTP headers (optional) diff --git a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py index 798cda18..ee86cb5f 100644 --- a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py +++ b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py @@ -64,6 +64,7 @@ from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore @@ -86,6 +87,7 @@ class DependencyContainer(DeclarativeContainer): key_value_store_settings = KeyValueSettings() summarizer_settings = SummarizerSettings() source_uploader_settings = SourceUploaderSettings() + retry_decorator_settings = RetryDecoratorSettings() key_value_store = Singleton(FileStatusKeyValueStore, key_value_store_settings) file_service = Singleton(S3Service, s3_settings=s3_settings) @@ -136,7 +138,9 @@ class DependencyContainer(DeclarativeContainer): LangchainSummarizer, langfuse_manager=langfuse_manager, chunker=summary_text_splitter, - semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrreny), + semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrency), + summarizer_settings=summarizer_settings, + retry_decorator_settings=retry_decorator_settings, ) summary_enhancer = List( diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py index 3617adb8..b3138ce0 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/settings/summarizer_settings.py @@ -1,6 +1,7 @@ """Contains settings for summarizer.""" -from pydantic import Field +from typing import Optional +from pydantic import Field, PositiveInt from pydantic_settings import BaseSettings @@ -12,8 +13,22 @@ class SummarizerSettings(BaseSettings): ---------- maximum_input_size : int The maximum size of the input that the summarizer can handle. Default is 8000. - maximum_concurrreny : int + maximum_concurrency : int The maximum number of concurrent summarization processes. Default is 10. + max_retries: Optional[PositiveInt] + Total retries, not counting the initial attempt. + retry_base_delay: Optional[float] + Base delay in seconds for the first retry. + retry_max_delay: Optional[float] + Maximum delay cap in seconds for any single wait. + backoff_factor: Optional[float] + Exponential backoff factor (>= 1). + attempt_cap: Optional[int] + Cap for exponent growth (backoff_factor ** attempt_cap). + jitter_min: Optional[float] + Minimum jitter in seconds. + jitter_max: Optional[float] + Maximum jitter in seconds. """ class Config: @@ -23,4 +38,45 @@ class Config: case_sensitive = False maximum_input_size: int = Field(default=8000) - maximum_concurrreny: int = Field(default=10) + maximum_concurrency: int = Field(default=10) + max_retries: Optional[PositiveInt] = Field( + default=None, + title="Max Retries", + description="Total retries, not counting the initial attempt.", + ) + retry_base_delay: Optional[float] = Field( + default=None, + ge=0, + title="Retry Base Delay", + description="Base delay in seconds for the first retry.", + ) + retry_max_delay: Optional[float] = Field( + default=None, + gt=0, + title="Retry Max Delay", + description="Maximum delay cap in seconds for any single wait.", + ) + backoff_factor: Optional[float] = Field( + default=None, + ge=1.0, + title="Backoff Factor", + description="Exponential backoff factor (>= 1).", + ) + attempt_cap: Optional[int] = Field( + default=None, + ge=0, + title="Attempt Cap", + description="Cap for exponent growth (backoff_factor ** attempt_cap).", + ) + jitter_min: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Min (s)", + description="Minimum jitter in seconds.", + ) + jitter_max: Optional[float] = Field( + default=None, + ge=0.0, + title="Jitter Max (s)", + description="Maximum jitter in seconds.", + ) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py index 1d5b5d09..0872ddaa 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py @@ -1,20 +1,24 @@ """Module for the LangchainSummarizer class.""" +import asyncio import logging -import traceback from typing import Optional from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.documents import Document from langchain_core.runnables import Runnable, RunnableConfig, ensure_config +from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError +from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings from admin_api_lib.summarizer.summarizer import ( Summarizer, SummarizerInput, SummarizerOutput, ) from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager +from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore +from rag_core_lib.impl.utils.retry_decorator import retry_with_backoff logger = logging.getLogger(__name__) @@ -32,10 +36,15 @@ def __init__( langfuse_manager: LangfuseManager, chunker: RecursiveCharacterTextSplitter, semaphore: AsyncThreadsafeSemaphore, + summarizer_settings: SummarizerSettings, + retry_decorator_settings: RetryDecoratorSettings, ): self._chunker = chunker self._langfuse_manager = langfuse_manager self._semaphore = semaphore + self._retry_decorator_settings = self._create_retry_decorator_settings( + summarizer_settings, retry_decorator_settings + ) async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput: """ @@ -65,40 +74,65 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] """ assert query, "Query is empty: %s" % query # noqa S101 config = ensure_config(config) - tries_remaining = config.get("configurable", {}).get("tries_remaining", 3) - logger.debug("Tries remaining %d" % tries_remaining) - if tries_remaining < 0: - raise Exception("Summary creation failed.") document = Document(page_content=query) langchain_documents = self._chunker.split_documents([document]) + logger.debug("Summarizing %d chunk(s)...", len(langchain_documents)) - outputs = [] - for langchain_document in langchain_documents: - async with self._semaphore: - try: - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) - except Exception as e: - logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc()) - config["tries_remaining"] = tries_remaining - 1 - result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config) - # Extract content from AIMessage if it's not already a string - content = result.content if hasattr(result, "content") else str(result) - outputs.append(content) + # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk + tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents] + outputs = await asyncio.gather(*tasks) if len(outputs) == 1: return outputs[0] - summary = " ".join(outputs) + + merged = " ".join(outputs) logger.debug( - "Reduced number of chars from %d to %d" - % (len("".join([x.page_content for x in langchain_documents])), len(summary)) + "Reduced number of chars from %d to %d", + len("".join([x.page_content for x in langchain_documents])), + len(merged), ) - return await self.ainvoke(summary, config) + return await self._summarize_chunk(merged, config) + + def _create_retry_decorator_settings( + self, summarizer_settings: SummarizerSettings, retry_decorator_settings: RetryDecoratorSettings + ): + fields = [ + "max_retries", + "retry_base_delay", + "retry_max_delay", + "backoff_factor", + "attempt_cap", + "jitter_min", + "jitter_max", + ] + settings_kwargs = { + field: getattr(summarizer_settings, field) + if getattr(summarizer_settings, field) is not None + else getattr(retry_decorator_settings, field) + for field in fields + } + return RetryDecoratorSettings(**settings_kwargs) def _create_chain(self) -> Runnable: return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm( self.__class__.__name__ ) + + def _retry_with_backoff_wrapper(self): + return retry_with_backoff( + settings=self._retry_decorator_settings, + exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError), + rate_limit_exceptions=(RateLimitError,), + logger=logger, + ) + + async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + @self._retry_with_backoff_wrapper() + async def _call(text: str, config: Optional[RunnableConfig]) -> SummarizerOutput: + response = await self._create_chain().ainvoke({"text": text}, config) + return response.content if hasattr(response, "content") else str(response) + + # Hold the semaphore for the entire retry lifecycle + async with self._semaphore: + return await _call(text, config)