In [None]:
import os

!pip install faiss-cpu fitz requests numpy langchain openai azure-storage-blob azure-search-documents tiktoken

In [None]:
!pip install --upgrade pymupdf

In [None]:
!pip install langchain-community


In [None]:
#import neccessary libraries for creating AI assistant with correct vector store.

import os

import json
import zipfile
import fitz  # PyMuPDF
import faiss
import numpy as np
import requests
import logging
from typing import List, Any
from azure.storage.blob import BlobServiceClient
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings.sentence_transformer import SentenceTransformerEmbeddings
from langchain_community.vectorstores import FAISS
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents import SearchClient
from azure.search.documents.indexes.models import (
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    VectorSearchProfile, #I added this module.
    HnswAlgorithmConfiguration,
    VectorSearchAlgorithmKind,
    VectorSearchAlgorithmMetric,
    VectorSearch,
)
from openai import AzureOpenAI

Click each of the cells in order. If there is an issue re-click the cell and run again. Cell 5 contains the codes that call each of the functions. You run the functions 1-4 to load them in to memory then run five to call all the functions and create the RAG on Azure.  Watch your directory names and locations as I set it up for a directory on Colab.


In [None]:
# 1️⃣ Setup & Configuration

logging.basicConfig(level=logging.INFO)

AZURE_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=xxxxxxxx;AccountKey=xxxxxx==;EndpointSuffix=core.windows.net"  #Settings + networking > Access keys in your storage account's menu
CONTAINER_NAME = "vector-database" #use any name you want

config_data = {
    "AZURE_OPENAI_KEY": "XXXXXXXXX", # find in access key or in view code in assistant
    "AZURE_OPENAI_ENDPOINT": "https://xxxxxx.openai.azure.com/",
    "AZURE_OPENAI_APIVERSION": "2024-05-01-preview",
    "DEPLOYMENT_NAME": "XXXXX-gpt-4o-mini",
    "MODEL_NAME": "gpt-4o-mini"
}

SEARCH_SERVICE_NAME = "your-data-search"   # any name you want
SEARCH_API_KEY = "xxxxxxxxxxxxxx"  # find in key under storage
INDEX_NAME = "yourblob-index" # use any name you want
ZIP_FILE_PATH = "/content/file-name.zip" # put the file name and path here - Use WinRarZip as it works best, not Mac-compress
pdf_dir = "/Data"
FAISS_INDEX_PATH = "/content/"

In [None]:
# 2️⃣ File Preparation

def extract_zip(zip_path: str, extract_path: str) -> None:
    """Extract ZIP file to specified path."""
    try:
        # Ensure the extraction directory exists
        os.makedirs(extract_path, exist_ok=True)

        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
        logging.info(f"✅ Successfully extracted ZIP file to {extract_path}")
    except zipfile.BadZipFile:
        logging.error(f"❌ File is not a valid ZIP file: {zip_path}")
        raise
    except Exception as e:
        logging.error(f"❌ Failed to extract ZIP file: {str(e)}")
        raise

def extract_text_from_pdfs(pdf_dir: str) -> List[str]:
    """Extract text from all PDFs in directory."""
    texts = []
    try:
        for filename in os.listdir(pdf_dir):
            if filename.lower().endswith('.pdf'):
                pdf_path = os.path.join(pdf_dir, filename)
                try:
                    with fitz.open(pdf_path) as doc:
                        text = ""
                        for page in doc:
                            text += page.get_text()
                        texts.append(text)
                    logging.info(f"✅ Successfully extracted text from {filename}")
                except Exception as e:
                    logging.error(f"❌ Failed to process PDF {filename}: {str(e)}")
        return texts
    except Exception as e:
        logging.error(f"❌ Failed to read PDF directory: {str(e)}")
        raise

def chunk_text(texts: List[str], chunk_size: int = 1000) -> List[str]:
    """Split texts into smaller chunks."""
    chunks = []
    try:
        for text in texts:
            # Simple chunking by character count
            for i in range(0, len(text), chunk_size):
                chunk = text[i:i + chunk_size]
                if chunk.strip():  # Only add non-empty chunks
                    chunks.append(chunk)
        logging.info(f"✅ Created {len(chunks)} text chunks")
        return chunks
    except Exception as e:
        logging.error(f"❌ Failed to chunk text: {str(e)}")
        raise

def create_faiss_index(texts: List[str], dimension: int = 1536) -> str:
    """Create FAISS index from text chunks."""
    try:
        # Initialize FAISS index
        index = faiss.IndexFlatL2(dimension)

        # Convert texts to vectors (placeholder - you'll need to implement actual embedding)
        vectors = np.random.rand(len(texts), dimension).astype('float32')

        # Add vectors to index
        index.add(vectors)

        # Save index
        faiss_path = "faiss_index.bin"
        faiss.write_index(index, faiss_path)

        logging.info(f"✅ Created FAISS index with {len(texts)} vectors")
        return faiss_path
    except Exception as e:
        logging.error(f"❌ Failed to create FAISS index: {str(e)}")
        raise

def upload_faiss_to_azure(index_path: str) -> None:
    """Upload FAISS index to Azure Storage."""
    try:
        # Implement Azure Storage upload logic here
        logging.info("✅ Uploaded FAISS index to Azure Storage")
    except Exception as e:
        logging.error(f"❌ Failed to upload FAISS index: {str(e)}")
        raise


In [None]:
# 3️⃣ AI Search Indexing
credential = AzureKeyCredential(SEARCH_API_KEY)
index_client = SearchIndexClient(f"https://{SEARCH_SERVICE_NAME}.search.windows.net", credential)

In [None]:
# 4️⃣ Upload to Azure Blob Storage
blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(CONTAINER_NAME)

def upload_faiss_to_azure(local_directory):
    for file_name in os.listdir(local_directory):
        file_path = os.path.join(local_directory, file_name)
        if os.path.isfile(file_path):
            blob_client = container_client.get_blob_client(file_name)
            with open(file_path, "rb") as data:
                blob_client.upload_blob(data, overwrite=True)
            logging.info(f"Uploaded: {file_name}")


In [None]:
# 4️⃣.5 sets up indexing of data

def delete_search_index(index_name: str) -> None:
    try:
        # Check if index exists before attempting to delete
        try:
            index_client.get_index(index_name)
            index_client.delete_index(index_name)
            logging.info(f"✅ Existing index '{index_name}' deleted successfully!")
        except Exception:
            logging.info(f"Index '{index_name}' does not exist")

    except Exception as e:
        logging.error(f"Failed to delete index '{index_name}': {str(e)}")


def create_search_index(force_recreate: bool = True):
    if force_recreate:
        delete_search_index(INDEX_NAME)

    try:
        # Check if index already exists
        try:
            existing_index = index_client.get_index(INDEX_NAME)
            if not force_recreate:
                logging.info(f"Index '{INDEX_NAME}' already exists, skipping creation.")
                return
        except Exception:
            pass  # Index doesn't exist, proceed with creation

        # Create algorithm configuration
        algorithm_config = HnswAlgorithmConfiguration(
            name="my-hnsw-config",
            kind="hnsw",
            parameters={
                "m": 4,
                "efConstruction": 400,
                "efSearch": 500,
                "metric": "cosine"
            }
        )

        # Create vector search configuration
        vector_search_config = VectorSearch(
            algorithms=[algorithm_config],
            profiles=[
                VectorSearchProfile(
                    name="my-vector-profile",
                    algorithm_configuration_name="my-hnsw-config"
                )
            ]
        )

        # Create index schema
        index_schema = SearchIndex(
            name=INDEX_NAME,
            fields=[
                SearchField(name="id", type=SearchFieldDataType.String, key=True),
                SearchField(name="content", type=SearchFieldDataType.String, searchable=True),
                SearchField(
                    name="content_vector",
                    type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                    searchable=True,
                    vector_search_dimensions=1536,
                    vector_search_profile_name="my-vector-profile"
                ),
            ],
            vector_search=vector_search_config
        )

        # Create new index
        index_client.create_index(index_schema)
        logging.info(f"✅ Index '{INDEX_NAME}' created successfully!")
    except Exception as e:
        logging.error(f"❌ Failed to create index: {str(e)}")
        raise


# FileSearchTool Class
class FileSearchTool:
    def __init__(self, vector_store_ids: List[str] | None = None):
        self.vector_store_ids = vector_store_ids or []

    def add_vector_store(self, store_id: str) -> None:
        if store_id not in self.vector_store_ids:
            self.vector_store_ids.append(store_id)

    def remove_vector_store(self, store_id: str) -> None:
        try:
            self.vector_store_ids.remove(store_id)
        except ValueError:
            logging.warning(f"Store ID {store_id} not found in vector store list.")

    def execute(self, tool_call: Any) -> Any:
        logging.info(f"Executing tool call: {tool_call}")
        return None

# Generate Vector Store
def generate_vector_store(store_id: str) -> None:
    file_search_tool = FileSearchTool()
    file_search_tool.add_vector_store(store_id)
    logging.info(f"✅ Vector store '{store_id}' added successfully!")

# Execute Workflow
create_search_index(force_recreate=False)  # Set to False if you want to keep existing index
logging.info("✅ Search index creation workflow completed!")

In [None]:
# 4️⃣.75 tests the depolyment

def check_deployments():
    try:
        # Initialize the client
        client = AzureOpenAI(
            azure_endpoint=config_data["AZURE_OPENAI_ENDPOINT"],
            api_key=config_data["AZURE_OPENAI_KEY"],
            api_version=config_data["AZURE_OPENAI_APIVERSION"]
        )

        print("\n=== Testing Deployment Access ===")
        try:
            # Test a simple completion to verify deployment access
            response = client.chat.completions.create(
                model=config_data["DEPLOYMENT_NAME"],  # Use deployment name here
                messages=[
                    {"role": "user", "content": "Hello, is this working?"}
                ],
                max_tokens=10
            )
            print("✅ Successfully connected to deployment")
            print(f"Response: {response.choices[0].message.content}")

        except Exception as e:
            print(f"❌ Error accessing deployment: {str(e)}")
            print("\nTroubleshooting tips:")
            print("1. Verify these match exactly with your Azure portal:")
            print(f"   - Deployment Name: {config_data['DEPLOYMENT_NAME']}")
            print(f"   - Endpoint: {config_data['AZURE_OPENAI_ENDPOINT']}")
            print("2. Check if the API version is current")
            print("3. Verify the API key has proper permissions")

    except Exception as e:
        print(f"❌ Connection failed: {str(e)}")

check_deployments()

In the middle of this code block is where you want to change the assistant's name and prompt. At the bottom is where all the functions are called/runned, so make sure you match the files directory to the actual location


In [None]:
# 5️⃣ Retrieve and Associate Vector Store ID with Azure OpenAI Assistant


def associate_vector_store_with_ai(): #Associate vector store with Azure OpenAI Assistant.
    try:
        # Initialize Azure OpenAI client
        client = AzureOpenAI(
            azure_endpoint=config_data["AZURE_OPENAI_ENDPOINT"],
            api_key=config_data["AZURE_OPENAI_KEY"],
            api_version=config_data["AZURE_OPENAI_APIVERSION"]
        )

        # Create assistant with correct model
        try:
            vector_store = client.beta.vector_stores.create(name="NASDAQ documents")

            pdf_folder = "/content/extracted_pdfs"
            file_paths = [os.path.join(pdf_folder, file) for file in os.listdir(pdf_folder) if file.endswith(".pdf")]

            file_streams = [open(path, "rb") for path in file_paths]
            file_batch = client.beta.vector_stores.file_batches.upload_and_poll(
                vector_store_id=vector_store.id, files=file_streams
            )

            print(vector_store)
# This is where you name and create the prompt for the assistant
            assistant = client.beta.assistants.create(
                name="Financial data Assistant",
                instructions="You are an Financial assistant that answers questions based on financial documents.",
                model=config_data["DEPLOYMENT_NAME"],
                tools=[{"type": "file_search"}],
                tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}}
            )

            logging.info(f"✅ Assistant created successfully! ID: {assistant.id}")
            return assistant.id

        except Exception as e:
            logging.error(f"❌ Failed to create assistant: {str(e)}")
            return None

    except Exception as e:
        logging.error(f"❌ Failed to associate vector store: {str(e)}")
        return None

# Execute Workflow MAKE SURE THAT YOU USE THE RIGHT PATHS/FILENAMES
extract_zip(ZIP_FILE_PATH, "/content/extracted_pdfs")
doc_texts = extract_text_from_pdfs("/content/extracted_pdfs")
chunked_texts = chunk_text(doc_texts)
create_faiss_index(chunked_texts)
upload_faiss_to_azure(FAISS_INDEX_PATH)
create_search_index(force_recreate=False)
print(associate_vector_store_with_ai())

logging.info("✅ Full pipeline executed successfully!")
