Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Jun 14, 2024
2 parents 2bbe9c8 + d0d51fe commit 26e39e2
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 131 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,12 @@ llm:
endpoint: https://xxx.openai.azure.com
model: gpt-4o

ai_search:
access_key: xxx
endpoint: https://xxx.search.windows.net
index: trainings
destination:
mode: ai_search
ai_search:
access_key: xxx
endpoint: https://xxx.search.windows.net
index: trainings

document_intelligence:
access_key: xxx
Expand Down
152 changes: 40 additions & 112 deletions function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,17 @@


# General imports
from azure.ai.documentintelligence.aio import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import (
AnalyzeResult,
ContentFormat,
DocumentAnalysisFeature,
ParagraphRole,
)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import ResourceExistsError
from azure.search.documents.aio import SearchClient
from azure.storage.blob import BlobProperties
from azure.storage.blob.aio import BlobClient, ContainerClient
from azurefunctions.extensions.bindings.blob import BlobClient as BlobClientTrigger
from helpers.http import azure_transport
from openai import AsyncAzureOpenAI
from openai.types.chat import ChatCompletionSystemMessageParam
from os import getenv
from pydantic import TypeAdapter, ValidationError
Expand All @@ -43,11 +39,11 @@
PagedDocumentModel,
SynthetisedDocumentModel,
)
import re
import pikepdf
from io import BytesIO
from base64 import b64encode
from helpers.config_models.llm import ConfigModel as LlmConfigModel
from io import BytesIO
from unidecode import unidecode
import pikepdf
import re


# Azure Functions
Expand All @@ -66,9 +62,6 @@

# Clients
_container_client: Optional[ContainerClient] = None
_doc_client: Optional[DocumentIntelligenceClient] = None
_openai_clients: dict[str, AsyncAzureOpenAI] = {}
_search_client: Optional[SearchClient] = None

# Custom types
T = TypeVar("T")
Expand All @@ -86,8 +79,16 @@ async def raw_to_sanitize(input: BlobClientTrigger) -> None:
For PDF, QPDF (https://github.com/qpdf/qpdf) is used (from pikepdf) to save the document in a safe format.
"""
async def _upload(data: memoryview, path: str) -> None:
out_path = unidecode(_replace_root_path(path, SANITIZE_FOLDER), replace_str="") # Decode possible non ASCII characters
out_client = await _use_blob_client(out_path)
await out_client.upload_blob(
data=data,
overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache
)

# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand All @@ -106,20 +107,15 @@ async def raw_to_sanitize(input: BlobClientTrigger) -> None:
linearize=True, # Allows compliant readers to begin displaying a PDF file before it is fully downloaded
min_version=CONFIG.features.sanitize_pdf_version, # Note, if a second PDF is created with a higher version, hash will be different and cache won't work
)
# Store
out_path = _replace_root_path(blob_name, SANITIZE_FOLDER)
out_client = await _use_blob_async_client(out_path)
await out_client.upload_blob(
await _upload(
data=out_stream.getbuffer(),
overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache
path=blob_name,
)
else: # Store as is
logger.info(f"Storing raw blob as is ({blob_name})")
out_path = _replace_root_path(blob_name, SANITIZE_FOLDER)
out_client = await _use_blob_async_client(out_path)
await out_client.upload_blob(
data=in_bytes.getbuffer(),
overwrite=True, # For the first upload, overwrite, next steps will validate MD5 for cache
await _upload(
data=out_stream.getbuffer(),
path=blob_name,
)


Expand All @@ -134,7 +130,7 @@ async def sanitize_to_extract(input: BlobClientTrigger) -> None:
First, document content is extracted from its binary form.
"""
# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand All @@ -152,7 +148,7 @@ async def sanitize_to_extract(input: BlobClientTrigger) -> None:
features.append(DocumentAnalysisFeature.FORMULAS)
features.append(DocumentAnalysisFeature.LANGUAGES)
logger.info(f"Features enabled: {features}")
doc_client = await _use_doc_client()
doc_client = await CONFIG.document_intelligence.instance()
doc_poller = await doc_client.begin_analyze_document(
analyze_request=content, # type: ignore
content_type="application/octet-stream",
Expand Down Expand Up @@ -187,7 +183,7 @@ async def sanitize_to_extract(input: BlobClientTrigger) -> None:
)
# Store
out_path = f"{EXTRACT_FOLDER}/{blob_md5}.json"
out_client = await _use_blob_async_client(out_path)
out_client = await _use_blob_client(out_path)
try:
await out_client.upload_blob(data=raw_text_model.model_dump_json())
except ResourceExistsError:
Expand All @@ -205,7 +201,7 @@ async def extract_to_chunck(input: BlobClientTrigger) -> None:
Second, document content is chunked into smaller parts to make it understandable by the configured LLM.
"""
# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand Down Expand Up @@ -240,7 +236,7 @@ async def extract_to_chunck(input: BlobClientTrigger) -> None:
out_path = _replace_root_path(
_replace_extension(blob_name, f"-{i}.json"), CHUNCK_FOLDER
)
out_client = await _use_blob_async_client(out_path)
out_client = await _use_blob_client(out_path)
try:
await out_client.upload_blob(data=out_model.model_dump_json())
except ResourceExistsError:
Expand All @@ -258,7 +254,7 @@ async def chunck_to_synthesis(input: BlobClientTrigger) -> None:
Third, chunks are synthesised into a coherent text.
"""
# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand Down Expand Up @@ -331,7 +327,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[str]]:
out_path = _replace_root_path(
_replace_extension(blob_name, ".json"), SYNTHESIS_FOLDER
)
out_client = await _use_blob_async_client(out_path)
out_client = await _use_blob_client(out_path)
try:
await out_client.upload_blob(data=synthesis_model.model_dump_json())
except ResourceExistsError:
Expand All @@ -351,7 +347,7 @@ async def synthesis_to_page(input: BlobClientTrigger) -> None:
Pages are cleaned and filtered for repetitions (indicating low-quality content).
"""
# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand Down Expand Up @@ -399,7 +395,7 @@ async def synthesis_to_page(input: BlobClientTrigger) -> None:
out_path = _replace_root_path(
_replace_extension(blob_name, f"-{i}.json"), PAGE_FOLDER
)
out_client = await _use_blob_async_client(out_path)
out_client = await _use_blob_client(out_path)
try:
await out_client.upload_blob(data=out_model.model_dump_json())
except ResourceExistsError:
Expand All @@ -414,7 +410,7 @@ async def synthesis_to_page(input: BlobClientTrigger) -> None:
)
async def page_to_fact(input: BlobClientTrigger) -> None:
# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand Down Expand Up @@ -502,7 +498,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[FactedL
out_path = _replace_root_path(
_replace_extension(blob_name, ".json"), FACT_FOLDER
)
out_client = await _use_blob_async_client(out_path)
out_client = await _use_blob_client(out_path)
try:
await out_client.upload_blob(data=facted_document_model.model_dump_json())
except ResourceExistsError:
Expand All @@ -517,7 +513,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[FactedL
)
async def fact_to_critic(input: BlobClientTrigger) -> None:
# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand Down Expand Up @@ -580,25 +576,25 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[float]]
Question: What is the capital of France?
Answer: Paris
Context: Paris, as the capital of France, is the political, economic, and cultural center of the country.
Score: 1.0
Assistant: 1.0
## Example 2
Question: What is the ISIN code for the stock?
Answer: US0378331005
Context: The ISIN code for the stock is FR0000120172.
Score: 0.0
Assistant: 0.0
## Example 3
Question: In which year was the company founded?
Answer: 1939
Context: The company, by its founder, was established during World War II to provide essential services to the population. Its exact founding date is unknown.
Score: 0.6
Assistant: 0.6
## Example 4
Question: What is the main product of the company?
Answer: A software suite
Context: The company is known for its software suite called "Office", which includes applications such as a text editor, a spreadsheet, and a presentation program.
Score: 0.8
Assistant: 0.8
# Fact
Question: {fact.question}
Expand All @@ -622,7 +618,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[float]]
out_path = _replace_root_path(
_replace_extension(blob_name, ".json"), CRITIC_FOLDER
)
out_client = await _use_blob_async_client(out_path)
out_client = await _use_blob_client(out_path)
try:
await out_client.upload_blob(data=facted_model.model_dump_json())
except ResourceExistsError:
Expand All @@ -637,7 +633,7 @@ def _validate(req: Optional[str]) -> tuple[bool, Optional[str], Optional[float]]
)
async def critic_to_index(input: BlobClientTrigger) -> None:
# Read
async with await _use_blob_async_client(
async with await _use_blob_client(
name=input.blob_name, # type: ignore
snapshot=input.snapshot, # type: ignore
) as blob_client:
Expand Down Expand Up @@ -666,7 +662,7 @@ async def critic_to_index(input: BlobClientTrigger) -> None:
indexed_dicts = TypeAdapter(list[IndexedDocumentModel]).dump_python(
indexed_models, mode="json"
)
search_client = await _use_search_client()
search_client = await CONFIG.ai_search.instance()
await search_client.merge_or_upload_documents(indexed_dicts) # Will overwrite existing documents


Expand Down Expand Up @@ -717,7 +713,7 @@ async def _llm_generate(
"""
logger.info("LLM completion generation")
# Generate
openai_client, config = await _use_openai_client(is_fast)
openai_client, config = await (CONFIG.llm.fast if is_fast else CONFIG.llm.slow).instance()
messages = [
ChatCompletionSystemMessageParam(
content=prompt,
Expand Down Expand Up @@ -936,7 +932,7 @@ def _count_tokens(content: str, model: str) -> int:
encoding_name = tiktoken.encoding_name_for_model(model)
except KeyError:
encoding_name = tiktoken.encoding_name_for_model("gpt-3.5")
logger.warning(f"Unknown model {model}, using {encoding_name} encoding")
logger.debug(f"Unknown model {model}, using {encoding_name} encoding")
return len(tiktoken.get_encoding(encoding_name).encode(content))


Expand Down Expand Up @@ -969,7 +965,7 @@ def _detect_extension(file_path: str) -> str:
return "." + file_path.lower().split(".")[-1]


async def _use_blob_async_client(
async def _use_blob_client(
name: str,
snapshot: Optional[str] = None,
) -> BlobClient:
Expand Down Expand Up @@ -998,71 +994,3 @@ async def _use_container_async_client() -> ContainerClient:
container_name=CONTAINER_NAME,
)
return _container_client


async def _use_doc_client() -> DocumentIntelligenceClient:
"""
Create a DocumentIntelligenceClient client capable of async I/O.
The client is cached for future use.
"""
global _doc_client
if not isinstance(_doc_client, DocumentIntelligenceClient):
_doc_client = DocumentIntelligenceClient(
# Deployment
endpoint=CONFIG.document_intelligence.endpoint,
# Performance
polling_interval=5, # 5 seconds
transport=await azure_transport(),
# Authentication
credential=AzureKeyCredential(
CONFIG.document_intelligence.access_key.get_secret_value()
),
)
return _doc_client


async def _use_search_client() -> SearchClient:
"""
Create a SearchClient client capable of async I/O.
The client is cached for future use.
"""
global _search_client
if not isinstance(_search_client, SearchClient):
_search_client = SearchClient(
# Deployment
endpoint=CONFIG.ai_search.endpoint,
index_name=CONFIG.ai_search.index,
# Performance
transport=await azure_transport(),
# Authentication
credential=AzureKeyCredential(
CONFIG.ai_search.access_key.get_secret_value()
),
)
return _search_client


async def _use_openai_client(is_fast: bool) -> tuple[AsyncAzureOpenAI, LlmConfigModel]:
"""
Create a OpenAI client capable of async I/O.
The client is cached for future use.
"""
global _openai_clients
config = CONFIG.llm.fast if is_fast else CONFIG.llm.slow
client_name = "fast" if is_fast else "slow"
if not (client_name in _openai_clients and isinstance(_openai_clients[client_name], AsyncAzureOpenAI)):
_openai_clients[client_name] = AsyncAzureOpenAI(
# Deployment
api_version="2023-12-01-preview",
azure_deployment=config.deployment,
azure_endpoint=config.endpoint,
# Reliability
max_retries=30, # We are patient, this is a background job :)
timeout=180, # 3 minutes
# Authentication
api_key=config.api_key.get_secret_value(),
)
return (_openai_clients[client_name], config)
7 changes: 0 additions & 7 deletions helpers/config_models/ai_search.py

This file was deleted.

Loading

0 comments on commit 26e39e2

Please sign in to comment.