<a href="https://colab.research.google.com/github/melrahmtz/purple-box/blob/main/hands-on-practice/0703_chunking_to_retrieval_bgem3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Chunking**

In [None]:
import json
import re
import os

# Load Markdown file
file_path = "Manuale-IRIS_SLIM_IN_TEC_IT.md"
file_name = os.path.basename(file_path)
with open(file_path, "r", encoding="utf-8") as file:
    markdown_text = file.read()

# Function to check if a chunk contains a Markdown table
def is_table(chunk):
    return bool(re.search(r'^\|.*\|\n\|[-| ]+\|\n(\|.*\|\n)*', chunk, re.MULTILINE))

# Function to extract and split long tables
def extract_and_split_table(chunk, max_rows=10):
    lines = chunk.strip().split("\n")
    header, table_rows = None, []
    for i, line in enumerate(lines):
        if re.match(r'^\|[-| ]+\|$', line):
            header = lines[i - 1].strip("|").split("|")
            header = [h.strip() for h in header]
            continue
        if header:
            row_data = line.strip("|").split("|")
            row_data = [cell.strip() for cell in row_data]
            table_rows.append(row_data)

    # Split table into chunks if too many rows
    table_chunks = []
    for i in range(0, len(table_rows), max_rows):
        chunk_rows = table_rows[i:i + max_rows]
        table_chunks.append({"headers": header, "rows": chunk_rows})

    return table_chunks if header and table_rows else None

# Function to extract section headers
def extract_section_title(header):
    match = re.match(r'^(#+)\s+(.*)', header.strip())
    return match.group(2) if match else None

# Function to detect table title
def detect_table_title(pre_table_text):
    lines = pre_table_text.strip().split("\n")
    if lines and len(lines[-1].split()) < 10:  # Assuming a title is a short line before a table
        return lines[-1]
    return None

# Function to split text into chunks of max 400 words with 40-word overlap
def split_text(text, section_title, max_words=400, overlap=40):
    words = text.split()
    chunks = []
    start = 0
    while start < len(words):
        end = min(start + max_words, len(words))
        chunk = " ".join(words[start:end])
        # Prepend section title to first chunk
        if start == 0:
            chunk = f"## {section_title}\n{chunk}"
        chunks.append(chunk)
        start += max_words - overlap
    return chunks

# Process Markdown
sections = re.split(r'^(#+\s+.*)', markdown_text, flags=re.MULTILINE)
final_chunks = []
current_section = "Unknown"
chunk_id = 1

for i in range(1, len(sections), 2):
    section_title = extract_section_title(sections[i]) or current_section
    content = sections[i + 1].strip()
    current_section = section_title  # Update current section to maintain hierarchy

    table_matches = list(re.finditer(r'(\|.*\|\n\|[-| ]+\|\n(?:\|.*\|\n)+)', content, re.MULTILINE))
    last_index = 0

    for match in table_matches:
        start, end = match.span()
        pre_table_text = content[last_index:start].strip()
        table_text = match.group(0)
        last_index = end

        table_title = detect_table_title(pre_table_text)  # Extract table title if present
        if pre_table_text:
            text_chunks = split_text(pre_table_text, section_title)
            for chunk in text_chunks:
                final_chunks.append({
                    "chunk_id": chunk_id,
                    "content": chunk,
                    "metadata": {
                        "source": file_name,
                        "section": section_title,
                        "position": chunk_id
                    }
                })
                chunk_id += 1

        table_chunks = extract_and_split_table(table_text)
        if table_chunks:
            for table_chunk in table_chunks:
                final_chunks.append({
                    "chunk_id": chunk_id,
                    "table": table_chunk,
                    "metadata": {
                        "source": file_name,
                        "section": section_title,
                        "table_title": table_title,
                        "position": chunk_id
                    }
                })
                chunk_id += 1

    remaining_text = content[last_index:].strip()
    if remaining_text:
        text_chunks = split_text(remaining_text, section_title)
        for chunk in text_chunks:
            final_chunks.append({
                "chunk_id": chunk_id,
                "content": chunk,
                "metadata": {
                    "source": file_name,
                    "section": section_title,
                    "position": chunk_id
                }
            })
            chunk_id += 1

# Save JSON output
output_file = "Manuale-IRIS_SLIM_IN_TEC_IT_chunks.json"
with open(output_file, "w", encoding="utf-8") as json_file:
    json.dump(final_chunks, json_file, indent=4, ensure_ascii=False)

print(f"Chunking completed. JSON saved to: {output_file}")


Chunking completed. JSON saved to: Manuale-IRIS_SLIM_IN_TEC_IT_chunks.json


# **Embedding**
A **monolingual** approach (`bge-m3`) with document translation to English ***or*** query translation to Italian.

In [None]:
!pip install supabase numpy psycopg2

In [None]:
import os
import json
import torch
import uuid
import numpy as np
from supabase import create_client, Client
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel

# Initialize Supabase
SUPABASE_URL = "https://uzbhoimvqhnubfiexbkz.supabase.co"
SUPABASE_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InV6YmhvaW12cWhudWJmaWV4Ymt6Iiwicm9sZSI6ImFub24iLCJpYXQiOjE3Mzk5NDUwMzYsImV4cCI6MjA1NTUyMTAzNn0.Jn6izUth26LWgFL8pHj3K6E3sMLndX5SurPMUSfCb28"

supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)

# Load BGE-M3 Embedding Model
tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-m3")
model = AutoModel.from_pretrained("BAAI/bge-m3").to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

# Load Translation Model (English -> Italian)
translation_tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-it")
translation_model = AutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-en-it").to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/444 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/964 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/687 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/42.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.38k [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/789k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/814k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.35M [00:00<?, ?B/s]



pytorch_model.bin:   0%|          | 0.00/343M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/343M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

In [None]:
def translate_to_italian(text):
    """Translates English text to Italian."""
    inputs = translation_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(translation_model.device)
    with torch.no_grad():
        translated_tokens = translation_model.generate(**inputs, max_length=512)
    return translation_tokenizer.decode(translated_tokens[0], skip_special_tokens=True)

def get_embedding(text):
    """Generates an embedding vector from input text using BGE-M3."""
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(model.device)
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs.last_hidden_state.mean(dim=1).squeeze().cpu().tolist()

def store_chunks_in_supabase(chunks):
    """Stores text and table chunks into Supabase with embeddings."""
    document_entries = []
    table_entries = []

    for chunk in chunks:
        chunk_id = str(uuid.uuid4())

        # Process text content
        if "content" in chunk and chunk["content"]:
            content = chunk["content"]  # Keep content in Italian
            embedding = get_embedding(content)

            document_entries.append({
                "chunk_id": chunk_id,
                "content": content,
                "embedding": embedding,
                "metadata": chunk["metadata"],
                "type": "text"
            })

        # Process table data
        if "table" in chunk and chunk["table"]:
            table_data = json.dumps(chunk["table"], ensure_ascii=False)
            metadata = chunk.get("metadata", {})
            table_embedding = get_embedding(table_data)

            table_entries.append({
                "chunk_id": chunk_id,
                "table_data": table_data,
                "embedding": table_embedding,
                "metadata": metadata
            })

    # Batch insert into Supabase
    if document_entries:
        supabase.table("documents").insert(document_entries).execute()

    if table_entries:
        supabase.table("tables").insert(table_entries).execute()


In [None]:
# Load JSON chunks
json_file_path = "Manuale-IRIS_SLIM_IN_TEC_IT_chunks.json"
with open(json_file_path, "r", encoding="utf-8") as json_file:
    json_chunks = json.load(json_file)

# Store chunks in Supabase
store_chunks_in_supabase(json_chunks)
print("Text and table embeddings stored successfully in Supabase!")


Text and table embeddings stored successfully in Supabase!


# **Retrieval**

In [None]:
import ast
import json
import numpy as np
import re
from scipy.spatial.distance import cosine

# Querying Supabase
def query_supabase(user_query):
    """Retrieves relevant chunks from Supabase based on query."""
    translated_query = translate_to_italian(user_query)
    query_embedding = np.array(get_embedding(translated_query), dtype=np.float32).flatten()

    response_text = supabase.table("documents").select("chunk_id, content, embedding, type, metadata").execute()
    text_results = []

    for record in response_text.data:
        chunk_embedding = np.array(record["embedding"], dtype=np.float32).flatten()
        similarity = 1 - cosine(query_embedding, chunk_embedding)
        text_results.append((record["chunk_id"], "text", record["content"], similarity))

    response_tables = supabase.table("tables").select("chunk_id, table_data, embedding, metadata").execute()
    table_results = []

    for record in response_tables.data:
        table_embedding = np.array(record["embedding"], dtype=np.float32).flatten()
        similarity = 1 - cosine(query_embedding, table_embedding)
        table_results.append((record["chunk_id"], "table", record["table_data"], similarity))

    all_results = text_results + table_results
    all_results.sort(key=lambda x: x[3], reverse=True)
    return all_results[:5]


In [None]:
#user_query = "What are the key considerations for using and maintaining the Iris Slim units?"  # Answer in Section 2 and Section 6.1
#user_query = "What is the intended use of the IRIS Slim units?"  # Answer in Section 2.1
#user_query = "What are the installation requirements for the IRIS Slim unit?"  # Answer in Section 4.2
#user_query = "What are the operating limits of the IRIS Slim unit?"  # Answer in Section 2.5 (Table 1)
user_query = "What steps should be taken in case of water leakage from the IRIS Slim unit?"  # Answer in Section 4.3.1, Section 6.3, Section 6.3.1

retrieved_chunks = query_supabase(user_query)

for chunk in retrieved_chunks:
    print(f"Chunk ID: {chunk[0]}\nType: {chunk[1]}\nContent: {chunk[2][:300]}...\nRelevance: {chunk[3]:.4f}\n")


Chunk ID: 28717a46-989b-4232-818c-484bd07cc271
Type: text
Content: ## 2.1. Uso Previsto
Le unità Iris Slim sono progettate per la funzione di riscaldamento, raffrescamento, deumidificazione e filtrazione di ambienti residenziali e terziario (uffici, locali pubblici, o simili)....
Relevance: 0.7937

Chunk ID: 93c11ada-11a1-4d21-8bd7-ad29c1f03c5b
Type: text
Content: ## 6.3.1. Troubleshooting IRIS Slim
| | Vedere manuale del controllo | |...
Relevance: 0.7919

Chunk ID: 1e2a55fa-ccae-4a8f-9b0a-0e359fb77228
Type: text
Content: ## 4.1. Predisposizioni All'installazione Di IRIS Slim
Fissare l'unità al muro con le quattro viti (in base alle dimensioni delle teste delle viti possono essere necessarie delle rondelle). Al termine dell'installazione l'unità deve risultare perfettamente in orizzontale o con lieve pendenza nella d...
Relevance: 0.7881

Chunk ID: 16cc7e27-2efa-4f70-b8a3-59736c824200
Type: text
Content: ## IRIS SLIM / IN
Manuale d'installazione ed uso...
Relevance: 0.7592

Chunk ID: 