# Multimodal Retrieval Augmented Generation with Content Understanding

# Overview

Azure AI Content Understanding provides a powerful solution for extracting data from diverse content types, while preserving semantic integrity and contextual relationships ensuring optimal performance in Retrieval Augmented Generation (RAG) applications.

This sample demonstrates how to leverage Azure AI's Content Understanding capabilities to extract:

- OCR and layout information from documents
- Audio transcription with speaker diarization from audio files
- Shot detection, keyframe extraction, and audio transcription from videos

This notebook illustrates how to extract content from unstructured multimodal data and apply it to Retrieval Augmented Generation (RAG). The resulting markdown output can be used with LangChain's markdown header splitter for semantic chunking. These chunks are then indexed in Azure AI Search. When a user submits a query, Azure AI Search retrieves relevant chunks to generate a context-aware response.


# Scenario

SecureHome Insurance, a leading property insurance company, faces a significant challenge following a recent natural disaster that has led to an influx of insurance claims. The data analyst at SecureHome Insurance is tasked with accurately validating ingested data from claims and invoices being processed through the system. These claims include various multimodal content types, such as policy plans (text documents), photos of property damage (images), footage of the disaster impact (videos), and recorded statements from insurance adjusters (audio files). The goal is to streamline the process and ensure analysts have all necessary information at their fingertips to maintain accuracy and compliance.

To address this challenge, SecureHome Insurance uses Azure AI Content Understanding to create a unified system that extracts and analyzes data from multimodal sources. The system processes text documents to extract key information like policy details and invoice documents, analyzes images to assess the extent of property damage, processes videos to understand the impact of the disaster, and transcribes and analyzes audio files to capture adjuster reports and statements. By preserving semantic integrity and contextual relationships, the system ensures that all relevant information is accurately mapped to defined schemas such as policy plans, invoices, and insurance adjuster reports.

In practice, when a data analyst receives a batch of insurance claims, they use the integrated platform to search for relevant information. The system employs a Retrieval-Augmented Generation (RAG) approach, where it first retrieves relevant data from text documents, images, videos, and audio files.  This retrieved data is then used to generate comprehensive and contextually accurate responses to the analyst's queries. 

By leveraging the RAG approach, the system ensures that the analyst has access to the most relevant and up-to-date information, enabling them to accurately validate and process the claims efficiently. This integration of Azure AI Content Understanding with RAG significantly enhances the claims processing system, leading to improved accuracy and efficiency in handling insurance claims.


# Pre-requisites
1. Follow [README](../README.md#configure-azure-ai-service-resource) to create essential resource that will be used in this sample
2. Install required packages

In [None]:
%pip install -r ../requirements.txt
! pip install python-dotenv langchain langchain-community langchain-openai langchainhub openai tiktoken azure-identity azure-search-documents==11.6.0b3

# Load environment variables

In [None]:

import os
from dotenv import load_dotenv
load_dotenv()

# Load and validate Azure AI Services configs
AZURE_AI_SERVICE_ENDPOINT = os.getenv("AZURE_AI_SERVICE_ENDPOINT")
AZURE_AI_SERVICE_API_VERSION = os.getenv("AZURE_AI_SERVICE_API_VERSION") or "2024-12-01-preview"
AZURE_DOCUMENT_INTELLIGENCE_API_VERSION = os.getenv("AZURE_DOCUMENT_INTELLIGENCE_API_VERSION") or "2024-11-30"

# Load and validate Azure OpenAI configs
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_CHAT_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")
AZURE_OPENAI_CHAT_API_VERSION = os.getenv("AZURE_OPENAI_CHAT_API_VERSION") or "2024-08-01-preview"
AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME")
AZURE_OPENAI_EMBEDDING_API_VERSION = os.getenv("AZURE_OPENAI_EMBEDDING_API_VERSION") or "2023-05-15"

# Load and validate Azure Search Services configs
AZURE_SEARCH_ENDPOINT = os.getenv("AZURE_SEARCH_ENDPOINT")
AZURE_SEARCH_INDEX_NAME = os.getenv("AZURE_SEARCH_INDEX_NAME") or "sample-doc-index"

# Create custom analyzer

In [None]:
import json
import sys
import uuid
from pathlib import Path
from dotenv import find_dotenv, load_dotenv
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

# Add the parent directory to the path to use shared modules
parent_dir = Path(Path.cwd()).parent
sys.path.append(str(parent_dir))


### Create analyzer with pre-defined schemas.

In [None]:
from pathlib import Path
from python.content_understanding_client import AzureContentUnderstandingClient
credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(credential, "https://cognitiveservices.azure.com/.default")

#CREATE ANALYZERS
analyzer_configs = [
    {
        "id": "doc-analyzer" + str(uuid.uuid4()),
        "template_path": "../analyzer_templates/content_document.json",
        "location": Path("../data/sample_layout.pdf"),
    },
    {
        "id": "image-analyzer" + str(uuid.uuid4()),
        "template_path": "../analyzer_templates/content_image.json",
        "location": Path("../data/sample_image.png"),
    },
    {
        "id": "audio-analyzer" + str(uuid.uuid4()),
        "template_path": "../analyzer_templates/content_audio.json",
        "location": Path("../data/audio.wav"),
    },
    {
        "id": "video-analyzer" + str(uuid.uuid4()),
        "template_path": "../analyzer_templates/content_video.json",
        "location": Path("../data/FlightSimulator.mp4"),
    },
]
# Create Content Understanding client
content_understanding_client = AzureContentUnderstandingClient(
    endpoint=AZURE_AI_SERVICE_ENDPOINT,
    api_version=AZURE_AI_SERVICE_API_VERSION,
    token_provider=token_provider,
    x_ms_useragent="azure-ai-content-understanding-python/content_extraction", # This header is used for sample usage telemetry, please comment out this line if you want to opt out.
)

# Iterate through each analyzer and create it using the content understanding client
for analyzer in analyzer_configs:
    analyzer_id = analyzer["id"]
    template_path = analyzer["template_path"]

    try:
        
        # Create the analyzer using the content understanding client
        response = content_understanding_client.begin_create_analyzer(
            analyzer_id=analyzer_id,
            analyzer_template_path=template_path
        )
        result = content_understanding_client.poll_result(response)
        print(f"Successfully created analyzer: {analyzer_id}")
        
    except Exception as e:
        print(f"Failed to create analyzer: {analyzer_id}")
        print(f"Error: {e}")

### Use created analyzers to extract multimodal content

In [None]:
# Use analyzer to extract document content with layout analysis
#Iterate through each analyzer and analyze the content for each modality
analyzer_results =[]
extracted_markdown = []
analyzer_content = []
for analyzer in analyzer_configs:
    analyzer_id = analyzer["id"]
    template_path = analyzer["template_path"]
    file_location = analyzer["location"]
    try:
           # Analyze content
            response = content_understanding_client.begin_analyze(analyzer_id, file_location)
            result = content_understanding_client.poll_result(response)
            analyzer_results.append({"id":analyzer_id, "result": result.get("result", {})})
            analyzer_content.append({"id": analyzer_id, "content": result.get("result", {}).get("content", [])})

            # Extract markdown from the content list
            extracted_markdown.append({"id": analyzer_id, "markdown": analyzer_content.get("content", []).get("markdown", "")})
            print(f"Markdown", extracted_markdown.get("markdown", ""))   
             
    except Exception as e:
            print(e)
            print("Error in creating analyzer. Please double-check your analysis settings.\nIf there is a conflict, you can delete the analyzer and then recreate it, or move to the next cell and use the existing analyzer.")

            
# Delete the analyzer if it is no longer needed
#content_understanding_client.delete_analyzer(ANALYZER_ID)

## Organize multimodal content extraction markdown data

In [None]:
# Maintain separate indexes per modality, each optimized for its data type

### Split document content into semantic chunks

This is a simple starting point. Feel free to give your own chunking strategies a try!

In [None]:
from langchain import hub
from langchain_openai import AzureChatOpenAI
from langchain_openai import AzureOpenAIEmbeddings
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain.vectorstores.azuresearch import AzureSearch
from langchain.schema import Document
# Configure langchain text splitting settings
EMBEDDING_CHUNK_SIZE = 512
EMBEDDING_CHUNK_OVERLAP = 20

# Split the document into chunks base on markdown headers.
headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
]

text_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)

docs_string = extracted_markdown[0].get("markdown", "") #first item is in extracted_markdown list is the document analyzer markdown output
splits = text_splitter.split_text(docs_string)

print("Length of splits: " + str(len(splits)))

### Preprocess output data

In [None]:
def convert_values_to_strings(json_obj):
    return [str(value) for value in json_obj]


#convert image content to JSON object
def process_cu_image_output(contents):
    image_splits = [
        v for v in convert_values_to_strings(contents.get("content", []))
    ]
    image_content = [Document(page_content=v) for v in image_splits]
    return image_content

#convert audio content to JSON object        
def process_audio_output(contents):
    audio_splits = [
        v for v in convert_values_to_strings(contents.get("content", []))
    ]
    audio_content = [Document(page_content=v) for v in audio_splits]
    return audio_content

#convert video content to JSON object
def process_cu_video_scene_description(scene_description):
    audio_visual_segments = scene_description.get("content", [])
    audio_visual_splits = [
        "The following is a json string representing a video segment with scene description and transcript ```"
        + v
        + "```"
        for v in convert_values_to_strings(audio_visual_segments)
    ]
    docs = [Document(page_content=v) for v in audio_visual_splits]
    return docs

# Print the content analysis result
print(f"Video Content Understanding result: ", video_cu_result["result"]["contents"])

docs = process_cu_video_scene_description(video_cu_result)
print("There are " + str(len(docs)) + " documents.") 

for doc in docs:
    print(f"doc content", doc.page_content)

# Embed and index the chunks

In [None]:
# Embed the splitted documents and insert into Azure Search vector store
def embed_and_index_chunks(docs):
    aoai_embeddings = AzureOpenAIEmbeddings(
        azure_deployment=AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME,
        openai_api_version=AZURE_OPENAI_EMBEDDING_API_VERSION,  # e.g., "2023-12-01-preview"
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        azure_ad_token_provider=token_provider
    )

    vector_store: AzureSearch = AzureSearch(
        azure_search_endpoint=AZURE_SEARCH_ENDPOINT,
        azure_search_key=None,
        index_name=AZURE_SEARCH_INDEX_NAME,
        embedding_function=aoai_embeddings.embed_query
    )
    vector_store.add_documents(documents=docs)
    return vector_store


# embed and index the docs:
vector_store = embed_and_index_chunks(splits)

# Retrieve relevant chunks based on a question

In [None]:
# Retrieve relevant chunks based on the question

retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3})

retrieved_docs = retriever.get_relevant_documents(
    "<your question>"
)

print(retrieved_docs[0].page_content)

# Use a prompt for RAG that is checked into the LangChain prompt hub (https://smith.langchain.com/hub/rlm/rag-prompt?organizationId=989ad331-949f-4bac-9694-660074a208a7)
prompt = hub.pull("rlm/rag-prompt")
llm = AzureChatOpenAI(
    openai_api_version=AZURE_OPENAI_CHAT_API_VERSION,  # e.g., "2023-12-01-preview"
    azure_deployment=AZURE_OPENAI_CHAT_DEPLOYMENT_NAME,
    temperature=0,
)


def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# Document Q & A

In [None]:
# Ask a question about the document

rag_chain.invoke("<your question>")

# Document Q&A with references

In [None]:
# Return the retrieved documents or certain source metadata from the documents

from operator import itemgetter

from langchain.schema.runnable import RunnableMap

rag_chain_from_docs = (
    {
        "context": lambda input: format_docs(input["documents"]),
        "question": itemgetter("question"),
    }
    | prompt
    | llm
    | StrOutputParser()
)
rag_chain_with_source = RunnableMap(
    {"documents": retriever, "question": RunnablePassthrough()}
) | {
    "documents": lambda input: [doc.metadata for doc in input["documents"]],
    "answer": rag_chain_from_docs,
}

rag_chain_with_source.invoke("<your question>")