# Custom Asynchronous Indexing Pipeline with Text and Image Embeddings  
   
This notebook demonstrates how to create a **custom asynchronous indexing pipeline** that:  
   
- Reads PDF documents from Azure Blob Storage.  
- Extracts text and images using Azure Document Intelligence.  
- Generates embeddings for text and images using **Cohere models** in **Azure AI Foundry**.  
- Indexes the data into **Azure AI Search** with separate `text_vector` and `image_vector` fields.  
- Allows searching over text and image vectors.  
   
We will go through the following steps:  
   
1. **Install Required Libraries**  
2. **Set Up Environment Variables**  
3. **Create the Azure AI Search Index**  
4. **Define the Custom Indexing Pipeline Components**  
5. **Initialize and Run the Indexing Pipeline**  
6. **Perform Test Searches**  

## 1. Install Required Libraries

Make sure to install the necessary Python packages. Execute the following cells to install them.

In [None]:
# Install required packages  
%pip install azure-search-documents==11.6.0b4 
%pip install azure-core azure-storage-blob azure-ai-document-intelligence  
%pip install azure-ai-ml  
%pip install azure-ai-inference  
%pip install openai  
%pip install aiohttp

In [1]:
# Import necessary libraries and configure logging  
   
import logging  
   
# Configure the root logger  
logging.basicConfig(  
    level=logging.INFO,  # Set root logger level  
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'  
)  
   
# Suppress logs from azure and uamqp libraries  
logging.getLogger('azure').setLevel(logging.WARNING)  
logging.getLogger('uamqp').setLevel(logging.WARNING)  

## 2. Set Up Environment Variables  
   
Set up the necessary environment variables by creating a `.env` file or by setting them directly in the notebook. Avoid hardcoding sensitive information in the notebook.  
   
Required environment variables:  
   
- `AZURE_SEARCH_SERVICE_ENDPOINT`  
- `AZURE_SEARCH_API_KEY`  
- `AZURE_STORAGE_ACCOUNT_NAME`  
- `AZURE_STORAGE_ACCOUNT_CONTAINER_NAME`  
- `AZURE_OPENAI_ENDPOINT`  
- `AZURE_OPENAI_KEY`  
- `AZURE_OPENAI_EMBEDDING_DEPLOYMENT`  
- `AZURE_OPENAI_EMBEDDING_MODEL_NAME`  
- `AZURE_OPENAI_EMBEDDING_DIMENSIONS`  
- `AZURE_AI_INFERENCE_ENDPOINT`  
- `AZURE_AI_INFERENCE_KEY`  
- `DOCUMENTINTELLIGENCE_ENDPOINT`  
- `DOCUMENTINTELLIGENCE_API_KEY`  
   
Ensure that the identities used have the necessary permissions (e.g., **Storage Blob Data Reader** role).  

In [2]:
import os  
from dotenv import load_dotenv  
   
# Load environment variables from .env file  
load_dotenv(override=True)  
   
# Azure Cognitive Search settings  
search_service_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]  
search_api_key = os.environ["AZURE_SEARCH_API_KEY"]  
index_name = "asynch-custom-push-demo"  
   
# Azure Storage settings  
storage_account_name = os.environ["AZURE_STORAGE_ACCOUNT_NAME"]  
storage_container_name = os.environ["AZURE_STORAGE_ACCOUNT_CONTAINER_NAME"]  
   
# Azure OpenAI settings (for text embeddings)
# text_embedding_endpoint = os.environ["AZURE_OPENAI_ENDPOINT"]  
# text_embedding_key = os.environ["AZURE_OPENAI_KEY"]  
# text_embedding_model_name = os.environ["AZURE_OPENAI_EMBEDDING_MODEL_NAME"]  
# text_embedding_dimensions = int(os.getenv("AZURE_OPENAI_EMBEDDING_DIMENSIONS", 1536))  


 
   
# Azure AI Inference settings (for embeddings)  
ai_foundry_endpoint = os.environ["AZURE_AI_FOUNDRY_ENDPOINT"]  
ai_foundry_key = os.environ["AZURE_AI_FOUNDRY_KEY"] 
text_embedding_model = os.environ["TEXT_EMBEDDING_MODEL"]
text_embedding_dimensions = int(os.getenv("TEXT_EMBEDDING_DIMENSIONS", 1024)) 
image_embedding_model= os.environ["IMAGE_EMBEDDING_MODEL"]
image_embedding_dimensions =  int(os.getenv("IMAGE_EMBEDDING_DIMENSIONS", 1024))    # Set this based on your image embedding model  
   
# Azure Document Intelligence settings  
document_intelligence_endpoint = os.environ["DOCUMENTINTELLIGENCE_ENDPOINT"]  
document_intelligence_key = os.environ["DOCUMENTINTELLIGENCE_API_KEY"]  

## 3. Create the Azure AI Search Index  
   
We'll create the search index with the appropriate schema, including separate fields for `text_vector` and `image_vector`, and a `page_number` field to track the pages.  

In [3]:
from azure.search.documents.indexes import SearchIndexClient  
from azure.search.documents.indexes.models import (  
    SearchField,  
    SearchFieldDataType,  
    VectorSearch,  
    HnswAlgorithmConfiguration,  
    VectorSearchProfile,  
    SemanticConfiguration,  
    SemanticSearch,  
    SemanticPrioritizedFields,  
    SemanticField,  
    SearchIndex  
)  

from azure.core.credentials import AzureKeyCredential  
   
# Create a SearchIndexClient  
search_index_client = SearchIndexClient(  
    endpoint=search_service_endpoint,  
    credential=AzureKeyCredential(search_api_key)  
)  
   
# Define the index schema  
fields = [  
    SearchField(  
        name="parent_id",  
        type=SearchFieldDataType.String,  
        filterable=True,  
        facetable=True,  
        sortable=True  
    ),  
    SearchField(  
        name="chunk_id",  
        type=SearchFieldDataType.String,  
        key=True,  
        filterable=True,  
        facetable=True,  
        sortable=True  
    ),  
    SearchField(  
        name="title",  
        type=SearchFieldDataType.String,  
        filterable=True,  
        facetable=True,  
        sortable=True  
    ),  
    SearchField(  
        name="chunk",  
        type=SearchFieldDataType.String,  
        searchable=True  
    ),  
    SearchField(  
        name="text_vector",  
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),  
        vector_search_dimensions=text_embedding_dimensions,  
        vector_search_profile_name="textHnswProfile",  
    ),  
    SearchField(  
        name="image_vector",  
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),  
        vector_search_dimensions=image_embedding_dimensions,  
        vector_search_profile_name="imageHnswProfile",  
    ),  
    SearchField(  
        name="page_number",  
        type=SearchFieldDataType.Int32,  
        filterable=True,  
        facetable=True,  
        sortable=True  
    ),  
]  
   
# Configure the vector search settings  
vector_search = VectorSearch(  
    algorithms=[  
        HnswAlgorithmConfiguration(name="myHnswAlgorithm")  
    ],  
    profiles=[  
        VectorSearchProfile(  
            name="textHnswProfile",  
            algorithm_configuration_name="myHnswAlgorithm",  
        ),  
        VectorSearchProfile(  
            name="imageHnswProfile",  
            algorithm_configuration_name="myHnswAlgorithm",  
        ),  
    ],  
)  
   
# Configure semantic search settings (optional)  
semantic_config = SemanticConfiguration(  
    name="my-semantic-config",  
    prioritized_fields=SemanticPrioritizedFields(  
        title_field=SemanticField(field_name="title"),  
        content_fields=[SemanticField(field_name="chunk")],  
    ),  
)  
   
semantic_search = SemanticSearch(configurations=[semantic_config])  
   
# Create the search index  
index = SearchIndex(  
    name=index_name,  
    fields=fields,  
    vector_search=vector_search,  
    semantic_search=semantic_search,  
)  
   
# Create or update the index in Azure Cognitive Search  
search_index_client.create_or_update_index(index)  
   
print(f"Index '{index.name}' created or updated.")  

Index 'asynch-custom-push-demo' created or updated.


## 4. Define the Custom Indexing Pipeline Components  
   
The custom indexing pipeline consists of the following components:  
   
- **FileReader**: Reads PDFs using Azure Document Intelligence and extracts text and images.  
- **Chunker**: Splits the text into chunks for embedding.  
- **TextEmbedder**: Generates text embeddings using Azure OpenAI.  
- **ImageEmbedder**: Generates image embeddings using Azure AI Inference.  
- **FileUploader**: Uploads the processed documents into Azure Cognitive Search.  
- **AsynchronousIndexer**: Orchestrates the entire pipeline asynchronously.  

## 5. Initialize and Run the Indexing Pipeline  
   
Now we'll initialize the `AsynchronousIndexer` with the appropriate settings and run the indexing pipeline. 

In [4]:
import nest_asyncio  
import asyncio  
from asynch_indexer.AsynchronousIndexer import AsynchronousIndexer  
   
# Necessary when running asyncio in Jupyter notebooks  
nest_asyncio.apply()  
   
# Initialize the AsynchronousIndexer  
indexer = AsynchronousIndexer(  
    index_name=index_name,  
    search_endpoint=search_service_endpoint,  
    search_api_key=search_api_key,  
    storage_account_name=storage_account_name,  
    storage_container_name=storage_container_name,  
    ai_foundry_endpoint = ai_foundry_endpoint  ,
    ai_foundry_key = ai_foundry_key,
    text_embedding_model= text_embedding_model,
    image_embedding_model=image_embedding_model, 
    document_intelligence_endpoint=document_intelligence_endpoint,  
    document_intelligence_key=document_intelligence_key,  
)  
   
# Run the indexing pipeline  
asyncio.run(indexer.run_indexing())  

2024-12-28 12:40:25,476 - asynch_indexer.AsynchronousIndexer - INFO - Reader read_worker_0: Reading document 1_London_Brochure.pdf
2024-12-28 12:40:25,480 - asynch_indexer.AsynchronousIndexer - INFO - Reader read_worker_1: Reading document 1_slide_1.pdf
2024-12-28 12:40:30,225 - asynch_indexer.AsynchronousIndexer - INFO - Reader read_worker_1: Completed analyze_document for 1_slide_1.pdf
2024-12-28 12:40:30,323 - asynch_indexer.AsynchronousIndexer - INFO - Reader read_worker_0: Completed analyze_document for 1_London_Brochure.pdf
2024-12-28 12:40:30,838 - asynch_indexer.AsynchronousIndexer - INFO - Reader read_worker_1: Found 4 figures in 1_slide_1.pdf
2024-12-28 12:40:30,992 - asynch_indexer.AsynchronousIndexer - INFO - Reader read_worker_1: Finished reading 1_slide_1.pdf in 5.51 seconds
2024-12-28 12:40:30,993 - asynch_indexer.AsynchronousIndexer - INFO - Reader read_worker_1: Done processing 1_slide_1.pdf
2024-12-28 12:40:30,994 - asynch_indexer.AsynchronousIndexer - INFO - Chunker 

## 6. Perform Test Searches  
   
Finally, we'll perform test searches against the index to verify that the text and image vectors have been indexed correctly.  

In [12]:
from azure.search.documents import SearchClient
from azure.ai.inference import EmbeddingsClient   
from azure.search.documents.models import VectorizedQuery   
   
# Initialize the SearchClient  
search_client = SearchClient(  
    endpoint=search_service_endpoint,  
    index_name=index_name,  
    credential=AzureKeyCredential(search_api_key),  
)

embeddings_client = EmbeddingsClient(  
        endpoint=ai_foundry_endpoint,  
        credential=AzureKeyCredential(ai_foundry_key),  
        model=text_embedding_model
)  
   
# Define the search query  
query_text = "Enter your search query here"  
   
# Function to get the query embedding using Coheremebed
def get_query_embedding(query):  
    response = embeddings_client.embed(  
        input=[query]  
    )  
    print("Model:", response.model)
    print("Usage:", response.usage)
    return response.data[0].embedding
   
# Get the query embedding  
query_embedding = get_query_embedding(query_text) 
   
# Perform the vector search  
vector_query = VectorizedQuery(  
    vector=query_embedding,  
    k_nearest_neighbors=3,  
    fields="text_vector", 
)  
   
results = search_client.search(  
    search_text=query_embedding,  
    vector_queries=[vector_query],  
    select=["title", "chunk", "page_number"],  
    top=3  
)  
   
# Print the results  
for result in results:  
    print(f"Title: {result['title']}")  
    print(f"Page Number: {result['page_number']}")  
    print(f"Chunk: {result['chunk']}")  
    print("---")  

Model: embed-multilingual-v3.0
Usage: {'prompt_tokens': 6, 'completion_tokens': 0, 'total_tokens': 6}
Title: 1_slide_1.pdf
Page Number: 1
Chunk: Infrastructure is forecasted to be one of the fastest-growing
segments of private markets
Industry infrastructure AUM1
16%
CAGR
2,541
10%
CAGR
1,190
729
2017
2022
2027
Clients allocating more to infra in new market regime3
Private Debt
43%
39%
18%
Infrastructure
37%
45%
18%
Private Equity
28%
57%
15%
Hedge Funds
22%
50%
28%
Venture Capital
22%
43%
35%
Real Estate
18%
52%
30%
More capital
Same amount of capital
Less capital
Note: For footnoted information, refer to slide 11.
$75T global infrastructure funding need2
2022-2040 cumulative infrastructure investment & needs, $T
Investment
40
Needs
23
7
5
Energy
Telecom
& digital
Transport
Water
Infrastructure fares well in inflationary environments4
High growth / high inflation
20-year total returns (ann'd)
17%
16%
15%
0%
Global Direct
Infrastructure
Global Direct
Real Estate
Global Equities
Global 

## Conclusion  
   
In this notebook, we've built a custom asynchronous indexing pipeline that processes both text and images from PDF documents, generates embeddings using Azure OpenAI and Azure AI Inference, and indexes them into Azure Cognitive Search. This allows for advanced vector-based searches over both text and images.  
   
You can extend this pipeline to handle more complex scenarios, larger datasets, or integrate additional processing steps as needed.  

In [None]:
from azure.core.credentials import AzureKeyCredential  
from azure.ai.documentintelligence import DocumentIntelligenceClient  
from azure.identity.aio import DefaultAzureCredential  
from azure.storage.blob.aio import BlobServiceClient  

from pdf2image import convert_from_bytes  
from io import BytesIO 
# Display the image in the notebook  
from IPython.display import display  


from azure.ai.inference import EmbeddingsClient
from azure.ai.inference import ImageEmbeddingsClient
from azure.ai.inference.models import EmbeddingInput
import base64  
import io  
from PIL import Image

storage_account_url = f"https://demosharedstorage1.blob.core.windows.net"  
storage_container_name = "demo-indexer-storage"
credential = DefaultAzureCredential()  

blob_service_client = BlobServiceClient(  
            account_url=storage_account_url,  
            credential=credential  
        )  

storage_container_client = blob_service_client.get_container_client(storage_container_name)  


blob_list = storage_container_client.list_blobs()  
  
# Populate the file queue with blobs  
async for blob in blob_list:  
    if blob.name.endswith('.pdf'): 
        print(blob.name)
        blob_client = storage_container_client.get_blob_client(blob)  
        # Download the blob data asynchronously  
        stream = await blob_client.download_blob()  
        data = await stream.readall()

        document_client = DocumentIntelligenceClient(  
                    endpoint=document_intelligence_endpoint,  
                    credential=AzureKeyCredential(document_intelligence_key) ,
                ) 
        print("begin")
        poller = document_client.begin_analyze_document(  
                    model_id="prebuilt-layout",  
                    body=data 
                )  
        
        result = poller.result()

        text_pages = []  
        images = []  
        for page in result.pages: 
            page_text = "\n".join([line.content for line in page.lines])  
            text_pages.append((blob.name, page_text, "1", page.page_number))  


        # Convert the PDF pages to images  

        dpi = 300  # Adjust DPI as needed  
        poppler_path= r"C:\Users\kchouchen\Desktop\AI-search-multimodality-ip\.venv\Lib\site-packages\poppler-24.08.0\Library\bin"
        pages_images = convert_from_bytes(data, dpi=dpi, poppler_path =poppler_path)  
        print("test")
        # Map page numbers to page objects  
        pages_by_number = {page.page_number: page for page in result.pages}  
        # Extract images from the page   
        if result.figures:  
            for figure in result.figures:  
                for region in figure.bounding_regions:  
                    page_number = region.page_number  
                    polygon = region.polygon  # List of x, y coordinates  

                    # Get the corresponding page image  
                    page_image = pages_images[page_number - 1]  

                    # Get the page object to determine units  
                    page = pages_by_number[page_number]  
                    unit = page.unit  # 'pixel' or 'inch'  

                    # Map the polygon coordinates to pixel values  
                    x_coords = polygon[0::2]  
                    y_coords = polygon[1::2]  

                    if unit == 'pixel':  
                        # Coordinates are already in pixels  
                        pass  
                    elif unit == 'inch':  
                        # Convert inches to pixels (dpi is dots per inch)  
                        x_coords = [x * dpi for x in x_coords]  
                        y_coords = [y * dpi for y in y_coords]  
                    else:  
                        print(f"Unknown unit '{unit}' in page {page_number}")  
                        continue  

                    x_min = min(x_coords)  
                    x_max = max(x_coords)  
                    y_min = min(y_coords)  
                    y_max = max(y_coords)  

                    # Ensure coordinates are within image bounds  
                    left = int(max(x_min, 0))  
                    upper = int(max(y_min, 0))  
                    right = int(min(x_max, page_image.width))  
                    lower = int(min(y_max, page_image.height))  

                    # Crop the figure from the page image  
                    figure_image = page_image.crop((left, upper, right, lower))  

                    # Convert the image to bytes  
                    image_buffer = BytesIO()  
                    figure_image.save(image_buffer, format='PNG')  
                    image_data = image_buffer.getvalue()  

                    # Append the image data to the images list  
                    images.append((blob.name, image_data, "1", page_number)) 


                    # Open the image from bytes  
                    image = Image.open(io.BytesIO(image_data))  
                    # Save the image to a bytes buffer in PNG format  
                    buffered = io.BytesIO()  
                    image.save(buffered, format="PNG")  
                    # Get the base64 encoded string  
                    img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')  
                    # Create the base64 data URL  
                    data_url = f"data:image/png;base64,{img_str}"  
                
                    embeddings_client = ImageEmbeddingsClient(  
                                endpoint=ai_vision_endpoint,  
                                credential=AzureKeyCredential(ai_vision_key),  
                                model=ai_vision_model
                    )  

                    # Generate embeddings using Coheremebed  
                    response = embeddings_client.embed(  
                        input=[EmbeddingInput(image=data_url)]  
                    )  
                    print("Model:", response.model)
                    print("Usage:", response.usage)
                    print(response.data[0].embedding)
                    break
                break
                    #display(figure_image)  
   
