From 3432921049fd2b0e5f6dfc1f53426ca5d8d81cf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Fri, 14 Jun 2024 11:30:25 +0200 Subject: [PATCH 1/3] quality: Enhance comments --- helpers/logging.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/helpers/logging.py b/helpers/logging.py index 0398cf8..6453a31 100644 --- a/helpers/logging.py +++ b/helpers/logging.py @@ -21,11 +21,10 @@ ) # Configure Azure Application Insights exporter AioHttpClientInstrumentor().instrument() # Instrument aiohttp HTTPXClientInstrumentor().instrument() # Instrument httpx -# Instrument OpenAI environ["TRACELOOP_TRACE_CONTENT"] = str( True ) # Instrumentation logs prompts, completions, and embeddings to span attributes, set to False to lower monitoring costs or to avoid logging PII -OpenAIInstrumentor().instrument() +OpenAIInstrumentor().instrument() # Instrument OpenAI tracer = trace.get_tracer( instrumenting_module_name=f"com.github.clemlesne.{APP_NAME}", ) # Create a tracer that will be used in the app From 9f6c015fa0ca2319293fa1039b41e4affaebf3bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Fri, 14 Jun 2024 16:21:49 +0200 Subject: [PATCH 2/3] breaking: Use multiple destinations refacto: Manage SDK instanced in config models --- README.md | 10 +- function_app.py | 132 ++++-------------- helpers/config_models/ai_search.py | 7 - helpers/config_models/destination.py | 49 +++++++ .../config_models/document_intelligence.py | 22 ++- helpers/config_models/llm.py | 39 +++++- helpers/config_models/root.py | 4 +- 7 files changed, 142 insertions(+), 121 deletions(-) delete mode 100644 helpers/config_models/ai_search.py create mode 100644 helpers/config_models/destination.py diff --git a/README.md b/README.md index 7309358..7490214 100644 --- a/README.md +++ b/README.md @@ -242,10 +242,12 @@ llm: endpoint: https://xxx.openai.azure.com model: gpt-4o -ai_search: - access_key: xxx - endpoint: https://xxx.search.windows.net - index: trainings +destination: + mode: ai_search + ai_search: + access_key: xxx + endpoint: https://xxx.search.windows.net + index: trainings document_intelligence: access_key: xxx diff --git a/function_app.py b/function_app.py index cb3050c..edf216f 100644 --- a/function_app.py +++ b/function_app.py @@ -7,21 +7,17 @@ # General imports -from azure.ai.documentintelligence.aio import DocumentIntelligenceClient from azure.ai.documentintelligence.models import ( AnalyzeResult, ContentFormat, DocumentAnalysisFeature, ParagraphRole, ) -from azure.core.credentials import AzureKeyCredential from azure.core.exceptions import ResourceExistsError -from azure.search.documents.aio import SearchClient from azure.storage.blob import BlobProperties from azure.storage.blob.aio import BlobClient, ContainerClient from azurefunctions.extensions.bindings.blob import BlobClient as BlobClientTrigger from helpers.http import azure_transport -from openai import AsyncAzureOpenAI from openai.types.chat import ChatCompletionSystemMessageParam from os import getenv from pydantic import TypeAdapter, ValidationError @@ -43,11 +39,10 @@ PagedDocumentModel, SynthetisedDocumentModel, ) -import re -import pikepdf -from io import BytesIO from base64 import b64encode -from helpers.config_models.llm import ConfigModel as LlmConfigModel +from io import BytesIO +import pikepdf +import re # Azure Functions @@ -66,9 +61,6 @@ # Clients _container_client: Optional[ContainerClient] = None -_doc_client: Optional[DocumentIntelligenceClient] = None -_openai_clients: dict[str, AsyncAzureOpenAI] = {} -_search_client: Optional[SearchClient] = None # Custom types T = TypeVar("T") @@ -87,7 +79,7 @@ async def raw_to_sanitize(input: BlobClientTrigger) -> None: For PDF, QPDF (https://github.com/qpdf/qpdf) is used (from pikepdf) to save the document in a safe format. """ # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -108,7 +100,7 @@ async def raw_to_sanitize(input: BlobClientTrigger) -> None: ) # Store out_path = _replace_root_path(blob_name, SANITIZE_FOLDER) - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) await out_client.upload_blob( data=out_stream.getbuffer(), overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache @@ -116,7 +108,7 @@ async def raw_to_sanitize(input: BlobClientTrigger) -> None: else: # Store as is logger.info(f"Storing raw blob as is ({blob_name})") out_path = _replace_root_path(blob_name, SANITIZE_FOLDER) - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) await out_client.upload_blob( data=in_bytes.getbuffer(), overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache @@ -134,7 +126,7 @@ async def sanitize_to_extract(input: BlobClientTrigger) -> None: First, document content is extracted from its binary form. """ # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -152,7 +144,7 @@ async def sanitize_to_extract(input: BlobClientTrigger) -> None: features.append(DocumentAnalysisFeature.FORMULAS) features.append(DocumentAnalysisFeature.LANGUAGES) logger.info(f"Features enabled: {features}") - doc_client = await _use_doc_client() + doc_client = await CONFIG.document_intelligence.instance() doc_poller = await doc_client.begin_analyze_document( analyze_request=content, # type: ignore content_type="application/octet-stream", @@ -187,7 +179,7 @@ async def sanitize_to_extract(input: BlobClientTrigger) -> None: ) # Store out_path = f"{EXTRACT_FOLDER}/{blob_md5}.json" - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) try: await out_client.upload_blob(data=raw_text_model.model_dump_json()) except ResourceExistsError: @@ -205,7 +197,7 @@ async def extract_to_chunck(input: BlobClientTrigger) -> None: Second, document content is chunked into smaller parts to make it understandable by the configured LLM. """ # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -240,7 +232,7 @@ async def extract_to_chunck(input: BlobClientTrigger) -> None: out_path = _replace_root_path( _replace_extension(blob_name, f"-{i}.json"), CHUNCK_FOLDER ) - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) try: await out_client.upload_blob(data=out_model.model_dump_json()) except ResourceExistsError: @@ -258,7 +250,7 @@ async def chunck_to_synthesis(input: BlobClientTrigger) -> None: Third, chunks are synthesised into a coherent text. """ # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -331,7 +323,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[str]]: out_path = _replace_root_path( _replace_extension(blob_name, ".json"), SYNTHESIS_FOLDER ) - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) try: await out_client.upload_blob(data=synthesis_model.model_dump_json()) except ResourceExistsError: @@ -351,7 +343,7 @@ async def synthesis_to_page(input: BlobClientTrigger) -> None: Pages are cleaned and filtered for repetitions (indicating low-quality content). """ # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -399,7 +391,7 @@ async def synthesis_to_page(input: BlobClientTrigger) -> None: out_path = _replace_root_path( _replace_extension(blob_name, f"-{i}.json"), PAGE_FOLDER ) - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) try: await out_client.upload_blob(data=out_model.model_dump_json()) except ResourceExistsError: @@ -414,7 +406,7 @@ async def synthesis_to_page(input: BlobClientTrigger) -> None: ) async def page_to_fact(input: BlobClientTrigger) -> None: # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -502,7 +494,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[FactedL out_path = _replace_root_path( _replace_extension(blob_name, ".json"), FACT_FOLDER ) - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) try: await out_client.upload_blob(data=facted_document_model.model_dump_json()) except ResourceExistsError: @@ -517,7 +509,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[FactedL ) async def fact_to_critic(input: BlobClientTrigger) -> None: # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -580,25 +572,25 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[float]] Question: What is the capital of France? Answer: Paris Context: Paris, as the capital of France, is the political, economic, and cultural center of the country. - Score: 1.0 + Assistant: 1.0 ## Example 2 Question: What is the ISIN code for the stock? Answer: US0378331005 Context: The ISIN code for the stock is FR0000120172. - Score: 0.0 + Assistant: 0.0 ## Example 3 Question: In which year was the company founded? Answer: 1939 Context: The company, by its founder, was established during World War II to provide essential services to the population. Its exact founding date is unknown. - Score: 0.6 + Assistant: 0.6 ## Example 4 Question: What is the main product of the company? Answer: A software suite Context: The company is known for its software suite called "Office", which includes applications such as a text editor, a spreadsheet, and a presentation program. - Score: 0.8 + Assistant: 0.8 # Fact Question: {fact.question} @@ -622,7 +614,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[float]] out_path = _replace_root_path( _replace_extension(blob_name, ".json"), CRITIC_FOLDER ) - out_client = await _use_blob_async_client(out_path) + out_client = await _use_blob_client(out_path) try: await out_client.upload_blob(data=facted_model.model_dump_json()) except ResourceExistsError: @@ -637,7 +629,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[float]] ) async def critic_to_index(input: BlobClientTrigger) -> None: # Read - async with await _use_blob_async_client( + async with await _use_blob_client( name=input.blob_name, # type: ignore snapshot=input.snapshot, # type: ignore ) as blob_client: @@ -666,7 +658,7 @@ async def critic_to_index(input: BlobClientTrigger) -> None: indexed_dicts = TypeAdapter(list[IndexedDocumentModel]).dump_python( indexed_models, mode="json" ) - search_client = await _use_search_client() + search_client = await CONFIG.ai_search.instance() await search_client.merge_or_upload_documents(indexed_dicts) # Will overwrite existing documents @@ -717,7 +709,7 @@ async def _llm_generate( """ logger.info("LLM completion generation") # Generate - openai_client, config = await _use_openai_client(is_fast) + openai_client, config = await (CONFIG.llm.fast if is_fast else CONFIG.llm.slow).instance() messages = [ ChatCompletionSystemMessageParam( content=prompt, @@ -936,7 +928,7 @@ def _count_tokens(content: str, model: str) -> int: encoding_name = tiktoken.encoding_name_for_model(model) except KeyError: encoding_name = tiktoken.encoding_name_for_model("gpt-3.5") - logger.warning(f"Unknown model {model}, using {encoding_name} encoding") + logger.debug(f"Unknown model {model}, using {encoding_name} encoding") return len(tiktoken.get_encoding(encoding_name).encode(content)) @@ -969,7 +961,7 @@ def _detect_extension(file_path: str) -> str: return "." + file_path.lower().split(".")[-1] -async def _use_blob_async_client( +async def _use_blob_client( name: str, snapshot: Optional[str] = None, ) -> BlobClient: @@ -998,71 +990,3 @@ async def _use_container_async_client() -> ContainerClient: container_name=CONTAINER_NAME, ) return _container_client - - -async def _use_doc_client() -> DocumentIntelligenceClient: - """ - Create a DocumentIntelligenceClient client capable of async I/O. - - The client is cached for future use. - """ - global _doc_client - if not isinstance(_doc_client, DocumentIntelligenceClient): - _doc_client = DocumentIntelligenceClient( - # Deployment - endpoint=CONFIG.document_intelligence.endpoint, - # Performance - polling_interval=5, # 5 seconds - transport=await azure_transport(), - # Authentication - credential=AzureKeyCredential( - CONFIG.document_intelligence.access_key.get_secret_value() - ), - ) - return _doc_client - - -async def _use_search_client() -> SearchClient: - """ - Create a SearchClient client capable of async I/O. - - The client is cached for future use. - """ - global _search_client - if not isinstance(_search_client, SearchClient): - _search_client = SearchClient( - # Deployment - endpoint=CONFIG.ai_search.endpoint, - index_name=CONFIG.ai_search.index, - # Performance - transport=await azure_transport(), - # Authentication - credential=AzureKeyCredential( - CONFIG.ai_search.access_key.get_secret_value() - ), - ) - return _search_client - - -async def _use_openai_client(is_fast: bool) -> tuple[AsyncAzureOpenAI, LlmConfigModel]: - """ - Create a OpenAI client capable of async I/O. - - The client is cached for future use. - """ - global _openai_clients - config = CONFIG.llm.fast if is_fast else CONFIG.llm.slow - client_name = "fast" if is_fast else "slow" - if not (client_name in _openai_clients and isinstance(_openai_clients[client_name], AsyncAzureOpenAI)): - _openai_clients[client_name] = AsyncAzureOpenAI( - # Deployment - api_version="2023-12-01-preview", - azure_deployment=config.deployment, - azure_endpoint=config.endpoint, - # Reliability - max_retries=30, # We are patient, this is a background job :) - timeout=180, # 3 minutes - # Authentication - api_key=config.api_key.get_secret_value(), - ) - return (_openai_clients[client_name], config) diff --git a/helpers/config_models/ai_search.py b/helpers/config_models/ai_search.py deleted file mode 100644 index 65af0e6..0000000 --- a/helpers/config_models/ai_search.py +++ /dev/null @@ -1,7 +0,0 @@ -from pydantic import SecretStr, BaseModel - - -class AiSearchModel(BaseModel): - access_key: SecretStr - endpoint: str - index: str diff --git a/helpers/config_models/destination.py b/helpers/config_models/destination.py new file mode 100644 index 0000000..a25b0a2 --- /dev/null +++ b/helpers/config_models/destination.py @@ -0,0 +1,49 @@ +from azure.core.credentials import AzureKeyCredential +from azure.search.documents.aio import SearchClient +from enum import Enum +from functools import cache +from helpers.http import azure_transport +from pydantic import SecretStr, BaseModel, ValidationInfo, field_validator +from typing import Optional + + + +class ModeEnum(Enum): + AI_SEARCH = "ai_search" + + +class AiSearchModel(BaseModel, frozen=True): + _client: Optional[SearchClient] = None + access_key: SecretStr + endpoint: str + index: str + + async def instance(self) -> SearchClient: + if not self._client: + self._client = SearchClient( + # Deployment + endpoint=self.endpoint, + index_name=self.index, + # Performance + transport=await azure_transport(), + # Authentication + credential=AzureKeyCredential( + self.access_key.get_secret_value() + ), + ) + return self._client + + +class DestinationModel(BaseModel): + ai_search: AiSearchModel + mode: ModeEnum + + @field_validator("ai_search") + def _validate_sqlite( + cls, + ai_search: Optional[AiSearchModel], + info: ValidationInfo, + ) -> Optional[AiSearchModel]: + if not ai_search and info.data.get("mode", None) == ModeEnum.AI_SEARCH: + raise ValueError("AI Search config required") + return ai_search diff --git a/helpers/config_models/document_intelligence.py b/helpers/config_models/document_intelligence.py index 4982bf3..0b4ab3d 100644 --- a/helpers/config_models/document_intelligence.py +++ b/helpers/config_models/document_intelligence.py @@ -1,6 +1,26 @@ +from azure.ai.documentintelligence.aio import DocumentIntelligenceClient +from azure.core.credentials import AzureKeyCredential +from helpers.http import azure_transport from pydantic import SecretStr, BaseModel +from typing import Optional -class DocumentIntelligenceModel(BaseModel): +class DocumentIntelligenceModel(BaseModel, frozen=True): + _client: Optional[DocumentIntelligenceClient] = None access_key: SecretStr endpoint: str + + async def instance(self) -> DocumentIntelligenceClient: + if not self._client: + self._client = DocumentIntelligenceClient( + # Deployment + endpoint=self.endpoint, + # Performance + polling_interval=5, # 5 seconds + transport=await azure_transport(), + # Authentication + credential=AzureKeyCredential( + self.access_key.get_secret_value() + ), + ) + return self._client diff --git a/helpers/config_models/llm.py b/helpers/config_models/llm.py index 8ee34aa..8ac8fb2 100644 --- a/helpers/config_models/llm.py +++ b/helpers/config_models/llm.py @@ -1,14 +1,47 @@ +from enum import Enum +from openai import AsyncAzureOpenAI from pydantic import SecretStr, BaseModel +from typing import Optional -class ConfigModel(BaseModel): +class TypeEnum(Enum): + FAST = "fast" + SLOW = "slow" + + +class ConfigModel(BaseModel, frozen=True): + _client: Optional[AsyncAzureOpenAI] = None api_key: SecretStr context: int deployment: str endpoint: str model: str + type: TypeEnum + + async def instance(self) -> tuple[AsyncAzureOpenAI, "ConfigModel"]: + if not self._client: + self._client = AsyncAzureOpenAI( + # Deployment + api_version="2023-12-01-preview", + azure_deployment=self.deployment, + azure_endpoint=self.endpoint, + # Reliability + max_retries=30, # We are patient, this is a background job :) + timeout=180, # 3 minutes + # Authentication + api_key=self.api_key.get_secret_value(), + ) + return self._client, self + + +class FastModel(ConfigModel): + type: TypeEnum = TypeEnum.FAST + + +class SlowModel(ConfigModel): + type: TypeEnum = TypeEnum.SLOW class LlmModel(BaseModel): - fast: ConfigModel - slow: ConfigModel + fast: FastModel + slow: SlowModel diff --git a/helpers/config_models/root.py b/helpers/config_models/root.py index 03d2680..d7018c3 100644 --- a/helpers/config_models/root.py +++ b/helpers/config_models/root.py @@ -1,4 +1,4 @@ -from helpers.config_models.ai_search import AiSearchModel +from helpers.config_models.destination import DestinationModel from helpers.config_models.document_intelligence import DocumentIntelligenceModel from helpers.config_models.features import FeaturesModel from helpers.config_models.llm import LlmModel @@ -18,7 +18,7 @@ class RootModel(BaseSettings): # Immutable fields version: str = Field(default="0.0.0-unknown", frozen=True) # Editable fields - ai_search: AiSearchModel + destination: DestinationModel document_intelligence: DocumentIntelligenceModel features: FeaturesModel = ( FeaturesModel() From d0d51fe006b4ad5a09372b0d0d887036450d2938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Fri, 14 Jun 2024 16:23:24 +0200 Subject: [PATCH 3/3] quality: Sanitize blob file name --- function_app.py | 24 ++++++++++++++---------- requirements.txt | 1 + 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/function_app.py b/function_app.py index edf216f..ec86974 100644 --- a/function_app.py +++ b/function_app.py @@ -41,6 +41,7 @@ ) from base64 import b64encode from io import BytesIO +from unidecode import unidecode import pikepdf import re @@ -78,6 +79,14 @@ async def raw_to_sanitize(input: BlobClientTrigger) -> None: For PDF, QPDF (https://github.com/qpdf/qpdf) is used (from pikepdf) to save the document in a safe format. """ + async def _upload(data: memoryview, path: str) -> None: + out_path = unidecode(_replace_root_path(path, SANITIZE_FOLDER), replace_str="") # Decode possible non ASCII characters + out_client = await _use_blob_client(out_path) + await out_client.upload_blob( + data=data, + overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache + ) + # Read async with await _use_blob_client( name=input.blob_name, # type: ignore @@ -98,20 +107,15 @@ async def raw_to_sanitize(input: BlobClientTrigger) -> None: linearize=True, # Allows compliant readers to begin displaying a PDF file before it is fully downloaded min_version=CONFIG.features.sanitize_pdf_version, # Note, if a second PDF is created with a higher version, hash will be different and cache won't work ) - # Store - out_path = _replace_root_path(blob_name, SANITIZE_FOLDER) - out_client = await _use_blob_client(out_path) - await out_client.upload_blob( + await _upload( data=out_stream.getbuffer(), - overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache + path=blob_name, ) else: # Store as is logger.info(f"Storing raw blob as is ({blob_name})") - out_path = _replace_root_path(blob_name, SANITIZE_FOLDER) - out_client = await _use_blob_client(out_path) - await out_client.upload_blob( - data=in_bytes.getbuffer(), - overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache + await _upload( + data=out_stream.getbuffer(), + path=blob_name, ) diff --git a/requirements.txt b/requirements.txt index 7234594..f1af26a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ pydantic==2.7.1 python-dotenv==1.0.1 pyyaml==6.0.1 tiktoken==0.7.0 +unidecode==1.3.8