# **Index Data**

In [1]:
import os
import dotenv
dotenv.load_dotenv(".env")

True

In [2]:
from azure.core.credentials import AzureKeyCredential
import datetime

container_name = os.getenv("storage_container")
storage_base_url = os.getenv("storage_base_url")
connection_string = os.getenv("storage_connection_string")

azure_openai_api_version = os.getenv("AZURE_OPENAI_API_VERSION")
embedding_model = os.getenv("embeddingModel")
chat_model = os.getenv("chatModel")

current_date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

index_name = os.getenv("SEARCH_INDEX_NAME")
index_name

'test-index'

In [3]:
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.projects.models import ConnectionType

# Creating a project client
project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(),
    conn_str=os.environ["PROJECT_CONNECTION_STRING"],
)
for connection in project_client.connections.list():
    print(f"connection name: {connection.name}")

# Creating an AI search connection
search_connection = project_client.connections.get(
    connection_name=os.getenv("SEARCH_CONNECTION_NAME"),
    include_credentials=True)

# Creating an OpenAI connection
oai_connection = project_client.connections.get(
    connection_name=os.getenv("OAI_CONNECTION_NAME"),
    include_credentials=True)

ai_connection = project_client.connections.get(
    connection_name=os.getenv("AI_CONNECTION_NAME"),
    include_credentials=True)

connection name: ai-ericssonlearningpathhub131886796645_aoai
connection name: ai-ericssonlearningpathhub131886796645
connection name: searchericssonlearningpath
connection name: bingericssonlearningpath
connection name: aiericssonlearningpathwesteurope
connection name: ericsson-learning-path-project/demo_data
connection name: ericsson-learning-path-project/demodata
connection name: ericsson-learning-path-project/product_data
connection name: ericsson-learning-path-project/customer_data
connection name: ericsson-learning-path-project/workspaceartifactstore
connection name: ericsson-learning-path-project/workspaceblobstore


In [4]:
embedding_client = project_client.inference.get_embeddings_client()

def get_embedding(text):
    # get an embedding for the text using the project's default model inferencing endpoint
    embedding = embedding_client.embed(
        input=text,
        dimensions=1536,
        model=embedding_model,
    )
    return embedding.data[0].embedding

In [5]:
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    VectorSearch,
    HnswAlgorithmConfiguration,
    VectorSearchProfile,
    AzureOpenAIVectorizer,
    AzureOpenAIVectorizerParameters,
    SimpleField,
    SearchField,
    SearchFieldDataType,
    SearchableField,
    SemanticConfiguration,
    SemanticPrioritizedFields,
    SemanticField,
    SemanticSearch
)

index_client = SearchIndexClient(
    endpoint=search_connection.endpoint_url,
    credential=AzureKeyCredential(key=search_connection.key)
)

fields = [
    SimpleField(name="id", type=SearchFieldDataType.String, key=True, filterable=True, sortable=True),
    SearchableField(name="title", type=SearchFieldDataType.String),
    SearchableField(name="content", type=SearchFieldDataType.String),
    SimpleField(name="last_update", type=SearchFieldDataType.DateTimeOffset, filterable=True),
    SimpleField(name="url", type=SearchFieldDataType.String),
    SearchField(
        name="text_vector",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        vector_search_dimensions=1536,
        vector_search_profile_name="myHnswProfile",
    )
]

# Adding vector search settings
vector_search = VectorSearch(
    algorithms=[
        HnswAlgorithmConfiguration(
            name="myHnsw"
        )
    ],
    profiles=[
        VectorSearchProfile(
            name="myHnswProfile",
            algorithm_configuration_name="myHnsw",
            vectorizer_name="myVectorizer"
        )
    ],
    vectorizers=[
        AzureOpenAIVectorizer(
            vectorizer_name="myVectorizer",
            parameters=AzureOpenAIVectorizerParameters(
                resource_url=oai_connection.endpoint_url, #azure_openai_endpoint,
                deployment_name=embedding_model,
                model_name=embedding_model,
                api_key=oai_connection.key,
            )
        )
    ]
)

# Create the semantic settings with the configuration
semantic_config = SemanticConfiguration(
    name="my-semantic-config",
    prioritized_fields=SemanticPrioritizedFields(
        title_field=SemanticField(field_name="title"),
        content_fields=[SemanticField(field_name="content")]
    )
)

semantic_search = SemanticSearch(configurations=[semantic_config])

# Create Index

### Creating the index

In [6]:
from azure.search.documents.indexes.models import SearchIndex

# Create the search index
index = SearchIndex(
    name=index_name,
    fields=fields,
    vector_search=vector_search,
    semantic_search=semantic_search
)
result = index_client.create_or_update_index(index)
print(f' {result.name} created')

 test-index created


### **Preprocessing Pipeline**
- **Read data from storage account**
- **Use Document Intelligence to crack PDF**
    - **Extract text**
    - **Extract images and write to file**

In [7]:
from azure.storage.blob import BlobServiceClient
from azure.ai.documentintelligence import DocumentIntelligenceClient

def initialize_blob_service_client(connection_string, container_name):
    # Initialize the BlobServiceClient and returns the container client
    blob_service_client = BlobServiceClient.from_connection_string(conn_str=connection_string)
    container_client = blob_service_client.get_container_client(container_name)
    return container_client

def initialize_document_intelligence_client(ai_connection):
    # Initialize the Document Intelligence client
    document_intelligence_client = DocumentIntelligenceClient(
        endpoint=ai_connection.endpoint_url,
        credential=AzureKeyCredential(ai_connection.key)
    )
    return document_intelligence_client

In [8]:
def download_blob_content(blob_client):
    # Download the blob's content
    try:
        download_stream = blob_client.download_blob()
        blob_content = download_stream.readall()
        return blob_content
    except Exception as e:
        print(f"Error downloading blob: {str(e)}")
        return None


def analyze_document(document_intelligence_client, blob_content):
    # Analyze the document using the Document Intelligence client
    from azure.ai.documentintelligence.models import AnalyzeResult
    poller = document_intelligence_client.begin_analyze_document(
        model_id="prebuilt-layout",
        analyze_request=blob_content,
        content_type="application/octet-stream",  # Adjust based on your document type
    )
    result: AnalyzeResult = poller.result()
    return result

**Run Document Cracking Pipeline**

In [25]:
from tqdm import tqdm

def run_process_data_pipeline():
    documents = []

    # Initialize the BlobServiceClient & Document Intelligence client
    container_client = initialize_blob_service_client(connection_string, container_name)
    document_intelligence_client = initialize_document_intelligence_client(ai_connection)

    # List all blobs in the container
    blob_list = list(container_client.list_blobs())  # Convert generator to list to get total count
    total_blobs = len(blob_list)  # Total number of blobs

    if total_blobs == 0:
        print("No blobs found in the container.")
    else:
        with tqdm(total=total_blobs, desc="Processing Blobs", unit="blob") as pbar:
            for blob in blob_list:
                blob_name = blob.name

                # Update the progress bar's description to show the current blob
                pbar.set_description(f"Processing {blob_name}")

                # Download the blob's content
                blob_content = download_blob_content(blob_client = container_client.get_blob_client(blob_name))

                if blob_content is None:
                    continue # Skip to the next blob if download failed

                # Analyze the document using Document Intelligence
                data = analyze_document(document_intelligence_client, blob_content)

                if data is None:
                    continue # Skip to the next blob if analysis failed

                documents.append({
                    "filename": blob_name,
                    "data": data,
                    "url": f"{storage_base_url}/{container_name}/{blob_name}"
                })

                pbar.update(1)
            pbar.set_postfix({"Status": "Finished"})
    return documents

documents_raw = run_process_data_pipeline()

Processing 1706.03762v7.pdf: 100%|██████████| 1/1 [00:07<00:00,  7.64s/blob, Status=Finished]


### **Create Chunks + Add Metadata**
- **chunk id**
- **last update**
- **number of tokens**

In [26]:
documents_raw[0].keys()

dict_keys(['filename', 'data', 'url'])

In [27]:
import datetime
import pytz

def get_sweden_time():
    # Define the timezone for Sweden
    sweden_tz = pytz.timezone('Europe/Stockholm')
    utc_now = datetime.datetime.now(datetime.timezone.utc)
    sweden_time = utc_now.astimezone(sweden_tz)
    return sweden_time.isoformat()

In [31]:
def create_chunks(raw_doc):
    document_chunks = []
    current_title = None

    data = raw_doc["data"]

    # Loop through paragraphs to structure the main content
    for paragraph in data.paragraphs:
        # Extract page number from boundingRegions
        if paragraph.role == "title":
            # Update the current title but do not add an entry to document_chunks
            current_title = paragraph.content
        elif paragraph.role == "sectionHeading":
            # Start a new entry for the section heading with the current title
            document_chunks.append({
                "title": current_title,
                "content": "",
            })
        else:
            # Add content to the last entry, updating page_end as needed
            if document_chunks:
                document_chunks[-1]["content"] += " " + paragraph.content
    return document_chunks

In [32]:
from langchain_text_splitters import CharacterTextSplitter

def create_splits(content, chunk_size=1025, chunk_overlap=128):
    text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)

    # This list will hold our final dictionaries with updated content.
    splits = []

    # Iterate through original documents and create chunks
    for doc in content:
        text = doc.get("content", "")
        
        # Update the dictionary to use `page_content` instead of `content`
        doc["page_content"] = text
        doc.pop("content", None)  # Remove the old `content` key if it exists

        chunks = text_splitter.create_documents([text])
        if len(chunks) < 1:
            splits.append(doc)
        else:
            for chunk in chunks:
                new_doc = doc.copy()
                new_doc["page_content"] = chunk.page_content  # Assign the splitted text
                splits.append(new_doc)
    return splits

In [33]:
import uuid
from pathlib import Path

# Function to aggregate content for each document type
def format_documents(documents_raw):
    aggregated_documents = []

    for doc in documents_raw:
        url = doc["url"]  # URL for the document
        filename = doc["filename"]  # Filename of the document

        chunks = create_chunks(doc)
        splits = create_splits(chunks, chunk_size=1024, chunk_overlap=128)

        for i, split in enumerate(splits):
            chunk_name = f"{filename}_chunk_{i}"
            namespace = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")

            # Append the splits with metadata
            aggregated_documents.append({
                "id": str(uuid.uuid5(namespace, chunk_name)),
                "title": chunk_name,
                "page_content": split.get("page_content", "") if isinstance(split, dict) else split.page_content,  # Aggregated chunk content
                "last_update": get_sweden_time(),  # Current date & time
                "url": url
            })
    return aggregated_documents

formatted_documents = format_documents(documents_raw)

In [34]:
formatted_documents[0:2]

[{'id': '62112fb9-3e61-52f8-a291-3425273c20ec',
  'title': '1706.03762v7.pdf_chunk_0',
  'page_content': 'The dominant sequence transduction models are based on complex recurrent or convolutional neural networks that include an encoder and a decoder. The best performing models also connect the encoder and decoder through an attention mechanism. We propose a new simple network architecture, the Transformer, based solely on attention mechanisms, dispensing with recurrence and convolutions entirely. Experiments on two machine translation tasks show these models to be superior in quality while being more parallelizable and requiring significantly less time to train. Our model achieves 28.4 BLEU on the WMT 2014 English- to-German translation task, improving over the existing best results, including ensembles, by over 2 BLEU. On the WMT 2014 English-to-French translation task, our model establishes a new single-model state-of-the-art BLEU score of 41.8 after training for 3.5 days on eight GP

**index data**

In [16]:
import time
from tqdm import tqdm
import uuid

def convert_document(doc, get_embedding):
    # Define standard values
    standard_values = {
        'id': f'{uuid.uuid4()}',
        'title': 'unknown',
        'page_content': '',
        'last_update': get_sweden_time(),
        'url': '',
        'text_vector': []
    }
    
    # Extract values from the Document object, using standard values if missing
    id = doc.get('id', standard_values['id'])  # Use standard
    title = doc.get('title', standard_values['title'])
    content = doc.get('page_content', standard_values['page_content'])
    last_update = doc.get('last_update', standard_values['last_update'])
    url = doc.get('url', standard_values['url'])
    text_vector = get_embedding(content) if content else standard_values['text_vector']

    # Construct the document for uploading
    document = {
        "id": id,
        "title": title,
        "content": content,
        "last_update": last_update,
        "url": url,
        "text_vector": text_vector,
    }
    return document

data_final = []

# Start timing
start_time = time.time()

# Process documents with progress bar
for i, doc in enumerate(tqdm(formatted_documents, desc="Converting documents")):
    data_final.append(convert_document(doc, get_embedding))

# End timing
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Conversion completed in {elapsed_time:.2f} seconds.")

Converting documents: 100%|██████████| 25/25 [00:03<00:00,  7.12it/s]

Conversion completed in 3.52 seconds.





**Push to index**

In [17]:
from azure.search.documents import SearchClient

def push_to_index(data, search_connection, index_name=index_name):
    search_client = SearchClient(
        index_name=index_name,
        endpoint=search_connection.endpoint_url,
        credential=AzureKeyCredential(key=search_connection.key)
    )
    search_client.upload_documents(data)

In [None]:
import time
from tqdm import tqdm

# Group the header based chunks into batches
batch_size = 5
total_chunks = len(data_final)
num_batches = (total_chunks + batch_size - 1) // batch_size

overall_start_time = time.time()

# Process each batch
for i, batch_num in enumerate(tqdm(range(num_batches), desc="Processing Batches")):
    batch_start_time = time.time()
    start = batch_num * batch_size
    batch = data_final[start:start + batch_size]

    # Push the documents to the index
    push_to_index(
        data=batch,
        search_connection=search_connection,
        index_name=index_name
    )

overall_end_time = time.time()
elapsed_overall_time = overall_end_time - overall_start_time
print(f"All batches pushed in {elapsed_overall_time:.2f} seconds.")

Processing Batches: 100%|██████████| 5/5 [00:00<00:00,  5.16it/s]

All batches pushed in 0.97 seconds.





: 