# **Creating a Vector Search Index with Azure AI Foundry**

## Overview
This notebook demonstrates how to create and populate a vector search index in Azure AI Search using Azure AI Foundry. You'll learn how to process document data, extract content, generate vector embeddings, and build a search index that can power semantic search and retrieval-augmented generation (RAG) applications.

## What is Vector Search?
Vector search is a technique that allows you to find similar items based on their semantic meaning rather than just keyword matches. In this approach:

1. **Embeddings**: Documents or chunks of text are converted into numerical vector representations (embeddings) that capture semantic meaning
2. **Vector Storage**: These vectors are stored in specialized indexes optimized for vector similarity operations
3. **Similarity Search**: At query time, the system finds documents whose vectors are closest to the query vector

This enables more powerful search capabilities than traditional keyword-based search, as it can understand concepts, synonyms, and the contextual meaning behind words.

## 1. Setting Up The Environment

First, we'll load environment variables and required dependencies for our project.

In [1]:
import dotenv
config = dotenv.dotenv_values(".env")

### Setting Up Key Environment Variables
Here, we load the environment variables including storage configuration, model names, and the date for tracking purposes.

In [2]:
from azure.core.credentials import AzureKeyCredential
from dotenv import dotenv_values

config = dotenv_values(".env")

azure_openai_api_key = config.get("AZURE_OPENAI_API_KEY")
azure_openai_endpoint = config.get("AZURE_OPENAI_API_BASE")
azure_openai_api_version = config.get("AZURE_OPENAI_API_VERSION")
azure_openai_chat_model = config.get("AZURE_OPENAI_MODEL")
azure_openai_embedding_model = config.get("AZURE_OPENAI_EMBEDDING_MODEL")

search_credential = AzureKeyCredential(config.get("SEARCH_KEY"))
search_endpoint = config.get("SEARCH_ENDPOINT")

document_intelligence_key=config.get("document_intelligence_key")
document_intelligence_endpoint=config.get("document_intelligence_endpoint")

container_name = config.get("storage_container")
storage_base_url = config.get("storage_base_url")
connection_string = config.get("storage_connection_string")

index_name = config.get("SEARCH_INDEX_NAME")
index_name

'demo'

## 2. Defining Helper Functions

### Creating an Embedding Function
This function helps us generate vector embeddings for text using Azure OpenAI.

In [3]:
from openai import AzureOpenAI

client = AzureOpenAI(
  api_key = azure_openai_api_key,  
  api_version = azure_openai_api_version,
  azure_endpoint = azure_openai_endpoint
)

def get_embedding(text, model=azure_openai_embedding_model): # model = "deployment_name"
    return client.embeddings.create(input = [text], model=model).data[0].embedding

### Setting Up Azure AI Search Index Schema
Here we define the structure of our search index, including fields for content, metadata, and vector representations. We also configure vector search capabilities and semantic search features.

In [4]:
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    VectorSearch,
    HnswAlgorithmConfiguration,
    VectorSearchProfile,
    AzureOpenAIVectorizer,
    AzureOpenAIVectorizerParameters,
    SimpleField,
    SearchField,
    SearchFieldDataType,
    SearchableField,
    SemanticConfiguration,
    SemanticPrioritizedFields,
    SemanticField,
    SemanticSearch
)

index_client = SearchIndexClient(
    endpoint=search_endpoint,
    credential=search_credential,
)

fields = [
    SimpleField(name="id", type=SearchFieldDataType.String, key=True, filterable=True, sortable=True),
    SearchableField(name="title", type=SearchFieldDataType.String),
    SearchableField(name="content", type=SearchFieldDataType.String),
    SimpleField(name="last_update", type=SearchFieldDataType.DateTimeOffset, filterable=True),
    SimpleField(name="url", type=SearchFieldDataType.String),
    SearchField(
        name="text_vector",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        vector_search_dimensions=3072,
        vector_search_profile_name="myHnswProfile",
    )
]

# Adding vector search settings
vector_search = VectorSearch(
    algorithms=[
        HnswAlgorithmConfiguration(
            name="myHnsw"
        )
    ],
    profiles=[
        VectorSearchProfile(
            name="myHnswProfile",
            algorithm_configuration_name="myHnsw",
            vectorizer_name="myVectorizer"
        )
    ],
    vectorizers=[
        AzureOpenAIVectorizer(
            vectorizer_name="myVectorizer",
            parameters=AzureOpenAIVectorizerParameters(
                resource_url=azure_openai_endpoint,
                deployment_name=azure_openai_embedding_model,
                model_name=azure_openai_embedding_model,
                api_key=azure_openai_api_key,
            )
        )
    ]
)

# Create the semantic settings with the configuration
semantic_config = SemanticConfiguration(
    name="my-semantic-config",
    prioritized_fields=SemanticPrioritizedFields(
        title_field=SemanticField(field_name="title"),
        content_fields=[SemanticField(field_name="content")]
    )
)

semantic_search = SemanticSearch(configurations=[semantic_config])

## 3. Creating the Azure AI Search Index

Now that we've defined the schema, we'll create the actual search index in Azure AI Search.

In [5]:
from azure.search.documents.indexes.models import SearchIndex

# Create the search index
index = SearchIndex(
    name=index_name,
    fields=fields,
    vector_search=vector_search,
    semantic_search=semantic_search
)
result = index_client.create_or_update_index(index)
print(f' {result.name} created')

 demo created


## 4. Document Processing Pipeline

### Setting up Document Extraction Services
Next, we'll create functions to read data from our storage account and use Azure AI Document Intelligence to extract content from PDF documents.

In [7]:
from azure.storage.blob import BlobServiceClient
from azure.ai.documentintelligence import DocumentIntelligenceClient

def initialize_blob_service_client(connection_string, container_name):
    # Initialize the BlobServiceClient and returns the container client
    blob_service_client = BlobServiceClient.from_connection_string(conn_str=connection_string)
    container_client = blob_service_client.get_container_client(container_name)
    return container_client

def initialize_document_intelligence_client():
    # Initialize the Document Intelligence client
    document_intelligence_client = DocumentIntelligenceClient(
        endpoint=document_intelligence_endpoint,
        credential=AzureKeyCredential(document_intelligence_key)
    )
    return document_intelligence_client

### Document Processing Functions
These functions handle downloading documents from blob storage and analyzing them with AI Document Intelligence.

In [None]:
def download_blob_content(blob_client):
    # Download the blob's content
    download_stream = blob_client.download_blob()
    blob_content = download_stream.readall()
    return blob_content


def analyze_document(document_intelligence_client, blob_content):
    # Analyze the document using the Document Intelligence client
    from azure.ai.documentintelligence.models import AnalyzeResult, AnalyzeOutputOption
    poller = document_intelligence_client.begin_analyze_document(
        model_id="prebuilt-layout",
        analyze_request=blob_content,
        content_type="application/octet-stream",  # Adjust based on your document type
        output=[AnalyzeOutputOption.FIGURES]
    )
    result: AnalyzeResult = poller.result()
    operation_id = poller.details["operation_id"]

    if result.figures:
        for figure in result.figures:
            if figure.id:
                response = document_intelligence_client.get_analyze_result_figure(
                    model_id=result.model_id, result_id=operation_id, figure_id=figure.id
                )
                with open(f"data/figures/{figure.id}.png", "wb") as writer:
                    writer.writelines(response)
    return result

### Running the Document Processing Pipeline
This function orchestrates the document processing workflow, retrieving PDF files from storage and extracting their content.

In [11]:
from tqdm import tqdm

def run_process_data_pipeline():
    documents = []

    # Initialize the BlobServiceClient & Document Intelligence client
    container_client = initialize_blob_service_client(connection_string, container_name)
    document_intelligence_client = initialize_document_intelligence_client()
    # List all blobs in the container and directory papers (Attention is all you need, Large Concep Models, Large Language Diffusion Models)
    blob_list = list(container_client.list_blobs())
    # Filter to only include PDF files
    pdf_blob_list = [blob for blob in blob_list if blob.name.lower().endswith('.pdf')]

    if len(pdf_blob_list) == 0:
        print("No blobs found in the container/directory.")
    else:
        with tqdm(total=len(pdf_blob_list), desc="Processing Blobs", unit="blob") as pbar:
            for blob in pdf_blob_list:
                blob_name = blob.name

                # Update the progress bar's description to show the current blob
                pbar.set_description(f"Processing {blob_name}")

                # Download the blob's content
                blob_content = download_blob_content(blob_client = container_client.get_blob_client(blob_name))

                if blob_content is None:
                    continue # Skip to the next blob if download failed

                # Analyze the document using Document Intelligence
                data = analyze_document(document_intelligence_client, blob_content)

                if data is None:
                    continue # Skip to the next blob if analysis failed

                documents.append({
                    "filename": blob_name,
                    "data": data,
                    "url": f"{storage_base_url}/{container_name}/{blob_name}"
                })

                pbar.update(1)
            pbar.set_postfix({"Status": "Finished"})
    return documents

documents_raw = run_process_data_pipeline()

Processing 1706.03762v7.pdf:   0%|          | 0/1 [00:08<?, ?blob/s]


ResourceNotFoundError: (NotFound) Resource not found.
Code: NotFound
Message: Resource not found.
Inner error: {
    "code": "OutputOptionNotFound",
    "message": "The output option was not found: Failed to generate the analyze results in the output option format. Output option should be informed in analyze call."
}

## 5. Document Chunking and Metadata

Now we'll split the extracted documents into smaller chunks suitable for embedding and indexing, and add metadata such as:
- Chunk ID
- Last update timestamp
- Content tracking

### Helper Function for Timestamps
This function provides consistent timestamps for our document metadata.

In [None]:
import datetime
import pytz

def get_sweden_time():
    # Define the timezone for Sweden
    sweden_tz = pytz.timezone('Europe/Stockholm')
    utc_now = datetime.datetime.now(datetime.timezone.utc)
    sweden_time = utc_now.astimezone(sweden_tz)
    return sweden_time.isoformat()

### Creating Document Splits
This function divides documents into logical sections based on titles and headings.

In [None]:
def create_splits(raw_doc):
    document_chunks = []
    current_title = None

    data = raw_doc["data"]

    # Loop through paragraphs to structure the main content
    for paragraph in data.paragraphs:
        # Extract page number from boundingRegions
        if paragraph.role == "title":
            # Update the current title but do not add an entry to document_chunks
            current_title = paragraph.content
        elif paragraph.role == "sectionHeading":
            # Start a new entry for the section heading with the current title
            document_chunks.append({
                "title": current_title,
                "content": "",
            })
        else:
            # Add content to the last entry, updating page_end as needed
            if document_chunks:
                document_chunks[-1]["content"] += " " + paragraph.content
    return document_chunks

In [None]:
# document_intelligence_client = initialize_document_intelligence_client()
# operation_id = result.details["operation_id"]

# for doc in documents_raw:
#     if doc['data'].figures:
#         for figure in doc['data'].figures:
#             if figure.id:
#                 response = document_intelligence_client.get_analyze_result_figure(
#                     model_id=doc['data'].model_id, result_id=doc['data'].operation_id, figure_id=figure.id
#                 )
#                 with open(f"data/figures/{doc['filename']}/{figure.id}.png", "wb") as writer:
#                     writer.writelines(response)
# # .documents_raw[0]['data'].figures

### Splitting Content into Smaller Chunks
This function breaks down document content into smaller chunks suitable for embedding, using the LangChain text splitter to ensure chunks are properly sized for the model.

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

def create_chunks(splits, chunk_size=1024, chunk_overlap=128):
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        model_name=azure_openai_chat_model
    )
    chunk_list = []

    for doc in splits:
        content = doc.get("content", "")
        
        # Create a clean copy of metadata without content
        doc_metadata = doc.copy()
        if "content" in doc_metadata:
            doc_metadata.pop("content")
            
        # Create chunks from the content
        chunks = text_splitter.create_documents([content])
        
        if not chunks:
            new_doc = doc.copy()
            chunk_list.append(new_doc)
        else:
            # Create a new document for each chunk
            for chunk in chunks:
                new_doc = doc_metadata.copy()
                new_doc["content"] = chunk.page_content
                chunk_list.append(new_doc)
                
    return chunk_list

### Processing Documents with Embeddings
This function orchestrates the entire document processing workflow: splitting documents, creating chunks, generating embeddings, and preparing the final data format for indexing.

In [None]:
import uuid
from pathlib import Path
import time
from tqdm import tqdm

# Merged function that combines format_documents and convert_document
def process_documents(documents_raw, get_embedding_fn):
    processed_documents = []
    namespace = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
    
    # Process each document with progress tracking
    for doc in tqdm(documents_raw, desc="Processing documents"):
        url = doc["url"]  # URL for the document
        filename = doc["filename"]  # Filename of the document

        # Split the raw document based on title and section headings
        splits = create_splits(doc)
        # Create chunks from the splits if needed
        chunks = create_chunks(splits, chunk_size=1024, chunk_overlap=128)

        # Process each chunk with embeddings in a single pass
        for i, chunk in enumerate(chunks):
            chunk_name = f"{filename}_chunk_{i}"
            
            # Generate a unique ID for this chunk
            chunk_id = str(uuid.uuid5(namespace, chunk_name))
            
            # Get the content from the chunk
            content = chunk.get("content", "") if isinstance(chunk, dict) else chunk.page_content
            
            # Generate embedding vector for this content
            text_vector = get_embedding_fn(content) if content else []
            
            # Build the complete document with all required fields
            processed_documents.append({
                "id": chunk_id,
                "title": chunk_name,
                "content": content,
                "last_update": get_sweden_time(),
                "url": url,
                "text_vector": text_vector
            })
            
    return processed_documents

## 6. Processing and Indexing Documents

Now we'll execute our document processing pipeline to generate the final embedded documents ready for indexing.

In [None]:
# Start timing
start_time = time.time()

# Process documents and generate embeddings in one step
data_final = process_documents(documents_raw, get_embedding)

# End timing
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Processing completed in {elapsed_time:.2f} seconds.")

## 7. Uploading to Azure AI Search Index

### Defining Upload Function
This function handles the upload of processed documents to our Azure AI Search index.

In [None]:
from azure.search.documents import SearchClient

def push_to_index(data, search_credential, search_endpoint, index_name=index_name):
    search_client = SearchClient(
        index_name=index_name,
        endpoint=search_endpoint,
        credential=search_credential
    )
    search_client.upload_documents(data)

### Batched Upload to Azure AI Search
Finally, we upload our processed documents to Azure AI Search in batches to ensure reliable indexing of large document collections.

In [None]:
import time
from tqdm import tqdm

# Group the header based chunks into batches
batch_size = 5
total_chunks = len(data_final)
num_batches = (total_chunks + batch_size - 1) // batch_size

overall_start_time = time.time()

# Process each batch
for i, batch_num in enumerate(tqdm(range(num_batches), desc="Processing Batches")):
    batch_start_time = time.time()
    start = batch_num * batch_size
    batch = data_final[start:start + batch_size]

    # Push the documents to the index
    push_to_index(
        data=batch,
        search_credential=search_credential,
        search_endpoint=search_endpoint,
        index_name=index_name
    )

overall_end_time = time.time()
elapsed_overall_time = overall_end_time - overall_start_time
print(f"All batches pushed in {elapsed_overall_time:.2f} seconds to {index_name}.")