In [None]:
!pip install -q PyMuPDF unstructured

In [None]:
!pip install PyPDF2

In [None]:
import fitz
import os
import re
import json
from PIL import Image
import io

import re

import json


In [None]:
pdf_1_file = open(pdf_1_path, 'rb')
pdf_2_file = open(pdf_2_path, 'rb')

In [6]:
def dynamic_crop_above_caption(doc, pdf_name, page_num, caption_text, caption_rect, output_folder, fig_num, dpi=300):
    page = doc[page_num]
    x0, y0, x1, y1 = caption_rect
    caption_y = y0

    print(f"Figura {fig_num}: Using provided caption rect on page {page_num + 1}: {caption_rect}")

    pix = page.get_pixmap(dpi=dpi)
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    scale = dpi / 72

    drawing_rects = []
    for i, draw in enumerate(page.get_drawings()):
        r = draw["rect"]
        lw = draw.get("linewidth", 1.0)
        stroke = draw.get("color", (0, 0, 0))
        fill = draw.get("fill")

        print(f"   ➤ Rect {i}: {r}, lw={lw}, fill={fill}, stroke={stroke}")

        if lw <= 1.0 and fill is None and stroke in [(0, 0, 0), (0.3, 0.3, 0.3)]:
            print(f"Passed visual filter")
            if abs(r.y1 - caption_y) < 100:
                print(f"Passed caption proximity: y1={r.y1:.2f}, caption_y={caption_y:.2f}")
                drawing_rects.append(r)
            else:
                print(f"Failed caption proximity: y1={r.y1:.2f}, caption_y={caption_y:.2f}")
        else:
            print(f"Failed visual filter")

    if drawing_rects:
        closest_rect = min(drawing_rects, key=lambda r: abs(r.y1 - caption_y))
        crop_px = (
            int(closest_rect.x0 * scale),
            int(closest_rect.y0 * scale),
            int(closest_rect.x1 * scale),
            int(closest_rect.y1 * scale)
        )
        cropped = img.crop(crop_px)

        filename = f"{pdf_name}_figure_{fig_num}.png"
        out_path = os.path.join(output_folder, filename)
        cropped.save(out_path)
        print(f"Cropped from vector box: figure {fig_num} on page {page_num + 1}")

        return {
            "filename": filename,
            "figure_id": f"Figura {fig_num}",
            "caption": caption_text,
            "page": page_num + 1,
            "rendered": True,
            "source": "vector_box_crop"
        }

    else:
        print("No vector border found — trying embedded image fallback...")
        images = page.get_images(full=True)
        valid_images = images[1:] if len(images) > 1 else []

        if valid_images:
            xref = valid_images[0][0]  # assume first non-header image
            image_data = doc.extract_image(xref)
            image_bytes = image_data["image"]

            filename = f"{pdf_name}_figure_{fig_num}.png"
            out_path = os.path.join(output_folder, filename)
            Image.open(io.BytesIO(image_bytes)).save(out_path)
            print(f"Saved embedded fallback image for figure {fig_num} on page {page_num + 1}")

            return {
                "filename": filename,
                "figure_id": f"Figura {fig_num}",
                "caption": caption_text,
                "page": page_num + 1,
                "rendered": False,
                "source": "embedded_image_fallback"
            }

        else:
            print("No embedded image found either, skipping figure.")
            return None

def extract_all_figures_with_captions(pdf_path, pdf_name, output_folder) -> dict:
    os.makedirs(output_folder, exist_ok=True)
    doc = fitz.open(pdf_path)

    figure_pattern = re.compile(r"Figura\s+(\d+)\s*[-–—]\s*(.*)", re.IGNORECASE)
    figure_map = {}

    for page_num in range(len(doc)):
        page = doc[page_num]
        blocks = page.get_text("blocks")

        for block in blocks:
            text = block[4].strip()
            match = figure_pattern.match(text)

            if match:
                fig_num = match.group(1)
                caption = match.group(2).strip()
                print(f"🔎 Found caption match: Figura {fig_num} — '{caption}' on page {page_num + 1}")

                if fig_num in figure_map:
                    print(f"Skipping Figura {fig_num} on page {page_num + 1} — already in map")
                    continue

                caption_rect = block[:4]
                crop_result = dynamic_crop_above_caption(doc, pdf_name, page_num, caption, caption_rect, output_folder, fig_num)
                if crop_result:
                    figure_map[fig_num] = crop_result

    print(f"Extracted {len(figure_map)} figures using vector-based cropping")

    metadata_path = os.path.join(output_folder, "figure_metadata.json")
    with open(metadata_path, "w", encoding="utf-8") as f:
        json.dump(figure_map, f, indent=2, ensure_ascii=False)

    print(f"Saved metadata to {metadata_path}")
    return figure_map


In [58]:
def sanitize_caption(caption):
    return re.sub(r'[^\w\s-]', '', caption).strip().replace(' ', '_')[:60]

def dynamic_crop_above_caption(doc, pdf_name, page_num, caption_text, caption_rect, output_folder, fig_num, dpi=300):
    def process_page(page, caption_y, image_page_num, proximity_strategy="abs", fallback=False):
        pix = page.get_pixmap(dpi=dpi)
        img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
        scale = dpi / 72
        page_height = page.rect.height

        drawing_rects = []
        print(" Scanning vector borders on page...")
        for i, draw in enumerate(page.get_drawings()):
            r = draw["rect"]
            lw = draw.get("linewidth", 1.0)
            stroke = draw.get("color", (0, 0, 0))
            fill = draw.get("fill")

            print(f"   ➤ Rect {i}: {r}, lw={lw}, fill={fill}, stroke={stroke}")

            if lw <= 1.0 and fill is None and stroke in [(0, 0, 0), (0.3, 0.3, 0.3)]:
                print(f"Passed visual filter")
                if proximity_strategy == "abs":
                    if abs(r.y1 - caption_y) < 100:
                        print(f"Passed caption proximity: y1={r.y1:.2f}, caption_y={caption_y:.2f}")
                        drawing_rects.append(r)
                    else:
                        print(f"Failed caption proximity")
                elif proximity_strategy == "bleed":
                    bottom_margin = 150  # pixels from bottom considered for bleed-over
                    if (page_height - r.y1) < bottom_margin:
                        print(f"Passed bleed-over proximity (y1={r.y1:.2f}, page_height={page_height:.2f})")
                        drawing_rects.append(r)
                    else:
                        print(f"Failed bleed-over proximity (y1={r.y1:.2f} >= {bottom_margin:.2f})")
            else:
                print(f"Failed visual filter")

        if drawing_rects:
            closest_rect = min(drawing_rects, key=lambda r: abs(r.y1 - caption_y))
            crop_px = (
                int(closest_rect.x0 * scale),
                int(closest_rect.y0 * scale),
                int(closest_rect.x1 * scale),
                int(closest_rect.y1 * scale)
            )
            cropped = img.crop(crop_px)

            filename = f"{pdf_name}_figure_{fig_num}.png"  # fig_num here is the full unique_id e.g., "69_VMware_NSX"
            out_path = os.path.join(output_folder, filename)
            cropped.save(out_path)
            print(f"Cropped from vector box: figure {fig_num} from page {image_page_num + 1}")

            return {
                "filename": filename,
                "figure_id": f"Figura {fig_num}",
                "caption": caption_text,
                "page": page_num + 1,
                "image_page": image_page_num + 1,
                "rendered": True,
                "source": "bleed_over_crop" if fallback else "vector_box_crop"
            }
        return None

    page = doc[page_num]
    x0, y0, x1, y1 = caption_rect
    caption_y = y0

    print(f"\n[Figura {fig_num}] Using provided caption rect on page {page_num + 1}: {caption_rect}")
    result = process_page(page, caption_y, page_num, proximity_strategy="abs")

    if result:
        return result

    caption_is_top_of_page = caption_y < 200

    if caption_is_top_of_page and page_num > 0:
        print("Caption is high on the page — trying previous page first (bleed-over)...")
        prev_page = doc[page_num - 1]
        result = process_page(prev_page, caption_y, page_num - 1, proximity_strategy="bleed", fallback=True)
        if result:
            return result

    # Embedded image fallback — select the one closest *above* the caption 
    print("Trying embedded image fallback on caption page (using top-aligned matching)...")
    image_infos = page.get_image_info(xrefs=True)

    # Filter only images whose top is above the caption
    image_above_caption = [
        info for info in image_infos
        if info["bbox"][1] < caption_y
    ]

    print(f"Found {len(image_infos)} embedded images with position info on page {page_num + 1}")
    print(f"Caption Y: {caption_y}")
    for info in image_above_caption:
        print(f"   ➤ Image bbox: {info['bbox']}")

    if image_above_caption:
        closest_image = max(image_above_caption, key=lambda info: info["bbox"][1])
        xref = closest_image["xref"]

        image_data = doc.extract_image(xref)
        image_bytes = image_data["image"]

        filename = f"{pdf_name}_figure_{fig_num}.png"
        out_path = os.path.join(output_folder, filename)
        Image.open(io.BytesIO(image_bytes)).save(out_path)
        print(f"Saved embedded image just above caption for figure {fig_num} on page {page_num + 1}")

        return {
            "filename": filename,
            "figure_id": f"Figura {fig_num}",
            "caption": caption_text,
            "page": page_num + 1,
            "image_page": page_num + 1,
            "rendered": False,
            "source": "embedded_image_fallback"
        }

    print("No embedded image found either, skipping figure")
    return None

def extract_all_figures_with_captions(pdf_path, pdf_name, output_folder) -> dict:
    os.makedirs(output_folder, exist_ok=True)
    doc = fitz.open(pdf_path)

    figure_pattern = re.compile(r"Figura\s+(\d+)\s*[-–—]\s*(.*)", re.IGNORECASE)
    figure_map = {}
    seen_figures = set()

    for page_num in range(len(doc)):
        page = doc[page_num]
        blocks = page.get_text("blocks")

        for block in blocks:
            text = block[4].strip()
            match = figure_pattern.match(text)

            if match:
                fig_num = match.group(1)
                caption = match.group(2).strip().lower()
                figure_key = (fig_num, caption)

                print(f"🔎 Found caption match: Figura {fig_num} — '{caption}' on page {page_num + 1}")

                safe_caption = sanitize_caption(caption)
                unique_id = f"{fig_num}_{safe_caption}"

                if unique_id in figure_map:
                    print(f"Skipping Figura {fig_num} — duplicate caption")
                    continue

                caption_rect = block[:4]

                crop_result = dynamic_crop_above_caption(
                    doc, pdf_name, page_num, caption, caption_rect,
                    output_folder, unique_id  # used for both image name + key
                )

                if crop_result:
                    figure_map[unique_id] = crop_result


    print(f"Extracted {len(figure_map)} figures using vector-based cropping")

    metadata_path = os.path.join(output_folder, "figure_metadata.json")
    with open(metadata_path, "w", encoding="utf-8") as f:
        json.dump(figure_map, f, indent=2, ensure_ascii=False)

    print(f"Saved metadata to {metadata_path}")
    return figure_map


In [None]:
pdf_path = '/data/VM_manual/ISTRUZIONE_OPERATIVA_CREAZIONE_VM_CLOUD_INSIEL_REV_00.pdf'
output_folder_path = '/data/VM_manual'
pdf_name = 'VM_manual'

figures_folder = os.path.join(output_folder_path, 'figures')
os.makedirs(figures_folder, exist_ok=True)

wifi_figure_map = extract_all_figures_with_captions(
    pdf_path=pdf_path,
    pdf_name=pdf_name,
    output_folder=figures_folder 
)

metadata_path = os.path.join(output_folder_path, "figure_metadata.json")
with open(metadata_path, "w", encoding="utf-8") as f:
    json.dump(wifi_figure_map, f, indent=2, ensure_ascii=False)

print(f"Saved figure metadata to: {metadata_path}")
print("Sample extracted figures:")
for fig in list(wifi_figure_map.values())[:5]:
    print(fig)


In [None]:
pdf_path = '/data/wifi_manual/ISTRUZIONE_OPERATIVA_CONFIGURAZIONE_WIFI_ARUBA_REV_01.pdf'
output_folder_path = '/data/wifi_manual'
pdf_name = 'wifi_manual'

figures_folder = os.path.join(output_folder_path, 'figures')
os.makedirs(figures_folder, exist_ok=True)

wifi_figure_map = extract_all_figures_with_captions(
    pdf_path=pdf_path,
    pdf_name=pdf_name,
    output_folder=figures_folder  
)

metadata_path = os.path.join(output_folder_path, "figure_metadata.json")
with open(metadata_path, "w", encoding="utf-8") as f:
    json.dump(wifi_figure_map, f, indent=2, ensure_ascii=False)

print(f"Saved figure metadata to: {metadata_path}")
print("Sample extracted figures:")
for fig in list(wifi_figure_map.values())[:5]:
    print(fig)


# Semantic chunking w/ llama parse + gpt 4 and instructor (?)


In [None]:
!pip install llama-parse --quiet

In [96]:
def parse_pdf_llama_no_image_text(filename: str, output_path: str):
    print(f"Parsing '{filename}' (OCR=OFF, image‑text filtered)...")

    parser = LlamaParse(
        api_key=os.environ["LLAMA_CLOUD_API_KEY"],
        result_type="markdown",
        include_metadata=True,
        include_images=False,
        parse_mode="parse_page_with_agent",
        disable_image_extraction=True,
        disable_ocr=True,
        chunking_strategy="hierarchical",
        auto_title_generation=False,
        infer_table_structures=False,
        num_workers=4
    )

    documents = parser.load_data(filename)

    filtered = []
    for doc in documents:
        if doc.metadata.get("block_type") != "image":
            filtered.append(doc)

    md_text = "\n\n".join([d.text for d in filtered])

    with open(output_path, "w", encoding="utf-8") as f:
        f.write(md_text)

    print(f"Saved filtered markdown to: {output_path}")
    return md_text


In [None]:
import nest_asyncio
nest_asyncio.apply()

import os
from llama_parse import LlamaParse
from IPython.display import Markdown, display

os.environ["LLAMA_CLOUD_API_KEY"] = LLAMA_CLOUD_API_KEY

filename    = '/data/wifi_manual/ISTRUZIONE_OPERATIVA_CONFIGURAZIONE_WIFI_ARUBA_REV_01.pdf'
output_path = os.path.join(
    os.path.dirname(filename),
    os.path.basename(filename).replace(".pdf", "_parsed_no_image_llm.md")
)

md_result = parse_pdf_llama_no_image_text(filename, output_path)


display(Markdown(md_result[:2000]))

In [None]:
import nest_asyncio
nest_asyncio.apply()

import os
from llama_parse import LlamaParse
from IPython.display import Markdown, display


os.environ["LLAMA_CLOUD_API_KEY"] = LLAMA_CLOUD_API_KEY
# Usage
filename    = '/data/VM_manual/ISTRUZIONE_OPERATIVA_CREAZIONE_VM_CLOUD_INSIEL_REV_00.pdf'
output_path = os.path.join(
    os.path.dirname(filename),
    os.path.basename(filename).replace(".pdf", "_parsed_no_image_llm.md")
)

md_result = parse_pdf_llama_no_image_text(filename, output_path)

# Preview first bit
display(Markdown(md_result[:2000]))

In [4]:
def strip_artifacts(md_text: str) -> str:
    # 1) Remove mermaid code fences
    md_text = re.sub(r'```mermaid\b.*?```', '', md_text, flags=re.DOTALL|re.IGNORECASE)
    # 2) Drop any fenced code blocks (```...```)
    md_text = re.sub(r'```[\s\S]*?```', '', md_text)
    # 3) Remove ASCII‑tree lines
    md_text = re.sub(r'(?m)^[ \t]*(?:[├└│].*|\+--.*)$', '', md_text)
    # 4) Remove pure markdown table rows
    md_text = re.sub(r'(?m)^\|.*\|\s*$', '', md_text)
    return md_text

# Split pdf into pages 
def split_into_pages(md_text: str) -> list[tuple[int, str]]:
    pattern = re.compile(
        r'(Pagina\s+(\d+)\s+di\s+\d+.*?)(?=Pagina\s+\d+\s+di\s+\d+|\Z)',
        flags=re.DOTALL | re.IGNORECASE
    )
    chunks = pattern.findall(md_text)

    pages = []
    for full_block, page_num in chunks:
        pages.append((int(page_num), full_block))
    return pages


def extract_sections_from_index_pages(md_text: str, page_nums: list[int]) -> set[str]:
    pages = split_into_pages(md_text)
    blocks = [b for num, b in pages if num in page_nums]

    if not blocks:
        raise ValueError(f"No index pages found for {page_nums}")

    nums = set()
    for block in blocks:
        for line in block.splitlines():
            m = re.match(r'^\s*(\d+(?:\.\d+)*)\.', line)
            if m:
                nums.add(m.group(1))
    return nums


def promote_numeric_headings(parsed_md: str, valid_sections: set[str]) -> str:
    pattern = re.compile(
        r'^(?P<hashes>#{0,6}\s*)?(?P<num>\d+(?:\.\d+)*)(?:\.)?\s+(?P<title>.*)$',
        re.MULTILINE
    )

    def _repl(m):
        hashes = m.group("hashes") or ''
        num = m.group("num")
        title = m.group("title").strip()

        if num not in valid_sections:
            return m.group(0)

        if title.endswith(':'):
            return m.group(0)

        letters = re.findall(r'[A-Za-z]', title)
        if letters:
            lower_frac = sum(1 for c in letters if c.islower()) / len(letters)
            if lower_frac > 0.4:
                return m.group(0)

        level = num.count('.') + 1
        return f"{'#' * level} {num} {title}"

    return pattern.sub(_repl, parsed_md)



def process_page(page_md: str, page_num: int, valid_sections: set[str], header_skip_patterns: list[re.Pattern]) -> str:
    lines = page_md.splitlines()
    out = []
    page_line = None
    date_str = None

    for L in lines:
        if any(pat.match(L) for pat in header_skip_patterns):
            m_dt = re.search(r'ISTRUZIONE OPERATIVA.*?([0-9]{2}/[0-9]{2}/[0-9]{4})', L, flags=re.IGNORECASE)
            if m_dt:
                date_str = m_dt.group(1)
            continue
        m_pg = re.match(r'^\s*Pagina\s+(\d+)\s+di\s+(\d+)', L, flags=re.IGNORECASE)
        if m_pg:
            page_num_str = m_pg.group(1)
            total_pages = m_pg.group(2)
            page_line = f"*Pagina {page_num_str} di {total_pages}"
            continue

        out.append(L)

    if page_line:
        meta = page_line
        if date_str:
            meta += f" ({date_str})*"
        out.insert(0, '')        
        out.insert(1, meta)      
        out.insert(2, '')        



    body = "\n".join(out)

    if page_num > 2:
        body = strip_artifacts(body)

    body = promote_numeric_headings(body, valid_sections)
    return body


In [5]:
wifi_skip_patterns = [
    re.compile(r'^\s*Configurazione Wi-Fi Aruba:', re.IGNORECASE),
    re.compile(r'^\s*access point e terminali', re.IGNORECASE),
    re.compile(r'^\s*ISTRUZIONE OPERATIVA\s*\d{2}/\d{2}/\d{4}', re.IGNORECASE),
]

vm_skip_patterns = [
    re.compile(r'^\s*Creazione VM su Cloud INSIEL.*$', re.IGNORECASE),
    re.compile(r'^\s*ISTRUZIONE OPERATIVA\s*\d{2}/\d{2}/\d{4}', re.IGNORECASE),
]


In [None]:
md_path = "/data/wifi_manual/ISTRUZIONE_OPERATIVA_CONFIGURAZIONE_WIFI_ARUBA_REV_01_parsed_no_image_llm.md"
output_path = "/data/wifi_manual/ISTRUZIONE_OPERATIVA_CONFIGURAZIONE_WIFI_ARUBA_REV_01_cleaned.md"


with open(md_path, 'r', encoding='utf-8') as f:
    raw_md = f.read()



toc_sections = extract_sections_from_index_pages(raw_md, page_nums=[3])
print("Valid sections:", sorted(toc_sections)[:10], "…")


pages = split_into_pages(raw_md)
processed = [
    process_page(chunk, num, toc_sections, header_skip_patterns=wifi_skip_patterns)
    for num, chunk in pages
]

final_md = "\n\n".join(processed)
with open(output_path, 'w', encoding='utf-8') as f:
    f.write(final_md)

print("Cleaned Markdown written to", output_path)


In [None]:
md_path = "/data/VM_manual/ISTRUZIONE_OPERATIVA_CREAZIONE_VM_CLOUD_INSIEL_REV_00_parsed_no_image_llm.md"
output_path = "/data/VM_manual/ISTRUZIONE_OPERATIVA_CREAZIONE_VM_CLOUD_INSIEL_REV_00_cleaned.md"

with open(md_path, 'r', encoding='utf-8') as f:
    raw_md = f.read()


# Get true section numbers from the INDICE on page 3!!
toc_sections = extract_sections_from_index_pages(raw_md, page_nums=[3, 4])
print("Valid sections:", sorted(toc_sections)[:10], "…")

pages = split_into_pages(raw_md)
processed = [
    process_page(chunk, num, toc_sections, header_skip_patterns=vm_skip_patterns)
    for num, chunk in pages
]

final_md = "\n\n".join(processed)
with open(output_path, 'w', encoding='utf-8') as f:
    f.write(final_md)

print("Cleaned Markdown written to", output_path)


In [None]:
!pip install openai instructor --quiet

In [9]:
import re
import os
from openai import OpenAI
import instructor
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from tqdm import tqdm


class FigureMetadata(BaseModel):
    figure_id: str = Field(..., description="Unique identifier for the figure")
    filename: str = Field(..., description="Name of the image file")
    caption: str = Field(..., description="Caption text for the figure")
    page: int = Field(..., description="Page number where the figur caption is located")
    image_page: int = Field(..., description="Page number where the image is located")
    rendered: bool = Field(..., description="Whether the image has been rendered")
    source: str = Field(..., description="Source of the image (e.g., 'vector_box_crop')")


class SemanticChunk(BaseModel):
    section_number: str = Field(..., description="Hierarchical numeric section (e.g., 5.1.3.1.3)")
    section_title: str = Field(..., description="Concise and meaningful title for the section")
    page_numbers: List[int] = Field(..., description="Page numbers included in the chunk")
    content: str = Field(..., description="Full markdown content of the chunk")
    parent_topics: List[str] = Field(..., description="Ordered list of parent section titles")
    document: str = Field(..., description="Name or identifier of the document")
    chunk_summary: str = Field(..., description="A summary of the chunk content")
    figures: Optional[List[FigureMetadata]] = Field(default_factory=list, description="Figures referenced explicitly in this chunk")

class ChunkSet(BaseModel):
    chunks: List[SemanticChunk] = Field(..., description="List of semantic chunks")

class SummaryOnlyChunk(BaseModel):
    chunk_summary: str = Field(..., description="A summary of the chunk content")

In [9]:
def summarize_chunks(chunks: List[SemanticChunk]) -> List[SemanticChunk]:
    enriched = []

    for chunk in tqdm(chunks, desc="Summarizing chunks"):
        figure_count = len(chunk.figures)
        figure_desc = ""
        if figure_count:
            figure_titles = ", ".join([f.caption for f in chunk.figures])
            figure_desc = f"\nThis section includes {figure_count} figure(s): {figure_titles}."

        parent_path = " > ".join(chunk.parent_topics + [chunk.section_title])

        prompt = f"""
You are helping document a technical manual. Below is a markdown section titled:

{parent_path}

Please write a clear, concise **summary** of this section for retrieval-augmented systems.

Guidelines:
- Summarize the purpose of the section
- Note any configuration steps, troubleshooting, or systems mentioned
- If figures are included, mention how they support the content
{figure_desc}

Markdown content:
{chunk.content}
"""

        try:
            summary_obj = client.chat.completions.create(
                model="gpt-4-turbo",
                response_model=SummaryOnlyChunk,
                messages=[{"role": "user", "content": prompt}],
                temperature=0.3,
            )

            chunk.chunk_summary = summary_obj.chunk_summary

        except Exception as e:
            print(f"Failed to summarize section {chunk.section_number}: {e}")
            chunk.chunk_summary = ""

        enriched.append(chunk)

    return enriched


In [10]:
def parse_heading_structure(md_text: str) -> List[Dict]:
    headings = []
    lines = md_text.splitlines()

    for idx, line in enumerate(lines):
        match = re.match(r'^(#{1,6})\s+(.*)', line)
        if match:
            level = len(match.group(1))
            raw_text = match.group(2).strip()

            if re.search(r'\.{5,}\s*\d+\s*$', raw_text):
                continue

            sec_match = re.match(r'^([\d\.]+)\.?\s*(.*)', raw_text)
            if not sec_match:
                continue

            section_number = sec_match.group(1)
            title = sec_match.group(2).strip()

            headings.append({
                "line_number": idx,
                "level": level,
                "raw": line.strip(),
                "text": raw_text,
                "section_number": section_number,
                "title": title
            })

    return headings

In [11]:
def resolve_parent_topics(headings: List[Dict]) -> List[Dict]:
    stack = []
    result = []

    for heading in headings:
        level = heading["level"]

        stack = [h for h in stack if h["level"] < level]

        stack.append(heading)

        parent_topics = [h["title"] for h in stack[:-1]]

        heading_with_parents = heading.copy()
        heading_with_parents["parent_topics"] = parent_topics
        result.append(heading_with_parents)

    return result


In [12]:
from difflib import get_close_matches

from typing import List, Dict, Optional

def generate_semantic_chunks_from_headings(
    md_text: str,
    headings: List[Dict],
    document_name: str,
    line_to_page: Dict[int, int],
    original_lines: Optional[List[str]] = None
) -> List[Dict]:

    lines = md_text.splitlines()
    chunks = []

    def get_chunk_page_range(start: int, end: int, mapping: Dict[int, int]) -> List[int]:
        pages = [mapping.get(i) for i in range(start, end)]
        return sorted(set(p for p in pages if p is not None))

    for i, heading in enumerate(headings):
        start_line = heading["line_number"]
        end_line = headings[i + 1]["line_number"] if i + 1 < len(headings) else len(lines)

        chunk_lines = lines[start_line:end_line]
        chunk_text = "\n".join(chunk_lines).strip()

        page_numbers = get_chunk_page_range(start_line, end_line, line_to_page)

        chunks.append({
            "section_number": heading["section_number"],
            "section_title": heading["title"],
            "page_numbers": page_numbers,
            "content": chunk_text,
            "parent_topics": heading["parent_topics"],
            "document": document_name,
            "chunk_summary": "",     
            "figures": []           
        })

    return chunks



def match_caption_to_content(fig, content: str) -> bool:
    lines = re.findall(r"(Figura\s+\d+\s*[-–—]\s*[^\n]+)", content, flags=re.IGNORECASE)
    fig_caption = fig["caption"].lower()
    return any(fig_caption in line.lower() for line in lines)



def attach_figures_to_chunks(chunks: List[SemanticChunk], figure_metadata_dict: dict) -> List[SemanticChunk]:
    figure_metadata = list(figure_metadata_dict.values())
    fig_index = {}
    for fig in figure_metadata:
        match = re.search(r"Figura\s+(\d+)", fig["figure_id"])
        if match:
            fig_index.setdefault(match.group(1), []).append(fig)

    for chunk in chunks:
        matched_figures = []
        seen_ids = set()
        figure_nums = re.findall(r"Figura\s+(\d+)", chunk.content, flags=re.IGNORECASE)

        for num in figure_nums:
            for fig in fig_index.get(num, []):
                if fig["figure_id"] not in seen_ids and match_caption_to_content(fig, chunk.content):
                    matched_figures.append(FigureMetadata(**fig))
                    seen_ids.add(fig["figure_id"])

        chunk.figures = matched_figures

    return chunks

In [13]:
def build_line_to_page_map(lines: List[str]) -> dict:
    line_to_page = {}
    current_page = None

    for i, line in enumerate(lines):
        match = re.search(r"Pagina\s+(\d+)\s+di\s+\d+", line, flags=re.IGNORECASE)
        if match:
            current_page = int(match.group(1))
        if current_page:
            line_to_page[i] = current_page

    return line_to_page

def strip_page_markers(lines: List[str]) -> List[str]:
    return [line for line in lines if not re.search(r"Pagina\s+\d+\s+di\s+\d+", line, re.IGNORECASE)]

def get_chunk_page_range(start_line: int, end_line: int, line_to_page: dict) -> List[int]:
    pages = [line_to_page.get(i) for i in range(start_line, end_line)]
    return sorted(set(p for p in pages if p is not None))

In [14]:
def normalize_section_number(sec: str) -> str:
    return sec.strip().rstrip(".")

def deduplicate_by_section_number(chunks: List[SemanticChunk]) -> List[SemanticChunk]:
    seen = {}
    removed = []
    for chunk in chunks:
        key = normalize_section_number(chunk.section_number)
        if key in seen:
            removed.append(seen[key])
        seen[key] = chunk
    return list(seen.values())

## GENERATE WIFI CHUNKS

In [16]:
from openai import OpenAI
from instructor import from_openai
import json

client = from_openai(OpenAI(api_key=OPENAI_API_KEY))


md_path = "/data/wifi_manual/ISTRUZIONE_OPERATIVA_CONFIGURAZIONE_WIFI_ARUBA_REV_01_cleaned.md"
document_name = "ISTRUZIONE OPERATIVA CONFIGURAZIONE WIFI ARUBA REV 01"
figure_metadata_path = '/content/drive/MyDrive/rag_comparison/data/wifi_manual/figure_metadata.json'
final_json_path = '/data/wifi_manual/final_wifi_chunks.json'

with open(md_path, 'r', encoding='utf-8') as f:
    markdown_text = f.read()

lines = markdown_text.splitlines()
line_to_page = build_line_to_page_map(lines)
stripped_lines = strip_page_markers(lines)
stripped_markdown = "\n".join(stripped_lines)

parsed_headings = parse_heading_structure(markdown_text)

parsed_headings = parse_heading_structure(markdown_text)
headings_with_parents = resolve_parent_topics(parsed_headings)


raw_chunks = generate_semantic_chunks_from_headings(
    stripped_markdown,
    headings_with_parents,
    document_name,
    line_to_page=line_to_page,
    original_lines=lines  
)


semantic_chunks = [SemanticChunk(**chunk) for chunk in raw_chunks]

with open(figure_metadata_path, 'r') as f:
    figure_metadata_dict = json.load(f)

semantic_chunks_with_figures = attach_figures_to_chunks(semantic_chunks, figure_metadata_dict)

summarized_chunks = summarize_chunks(semantic_chunks_with_figures)



with open(final_json_path, 'w', encoding='utf-8') as f:
    json.dump([chunk.model_dump() for chunk in summarized_chunks], f, ensure_ascii=False, indent=2)


Summarizing chunks: 100%|██████████| 46/46 [06:13<00:00,  8.11s/it]


## GENERATE VM CHUNKS


In [18]:
client = from_openai(OpenAI(api_key=OPENAI_API_KEY))

md_path = "/data/VM_manual/ISTRUZIONE_OPERATIVA_CREAZIONE_VM_CLOUD_INSIEL_REV_00_cleaned.md"
document_name = "ISTRUZIONE OPERATIVA CREAZIONE VM CLOUD INSIEL REV 00"
figure_metadata_path = '/data/VM_manual/figure_metadata.json'
final_json_path = '/data/VM_manual/final_VM_chunks.json'

with open(md_path, 'r', encoding='utf-8') as f:
    markdown_text = f.read()

lines = markdown_text.splitlines()
line_to_page = build_line_to_page_map(lines)
stripped_lines = strip_page_markers(lines)
stripped_markdown = "\n".join(stripped_lines)

parsed_headings = parse_heading_structure(markdown_text)

parsed_headings = parse_heading_structure(markdown_text)
headings_with_parents = resolve_parent_topics(parsed_headings)


raw_chunks = generate_semantic_chunks_from_headings(
    stripped_markdown,
    headings_with_parents,
    document_name,
    line_to_page=line_to_page,
    original_lines=lines 
)


semantic_chunks = [SemanticChunk(**chunk) for chunk in raw_chunks]
deduped_chunks = deduplicate_by_section_number(semantic_chunks)

with open(figure_metadata_path, 'r') as f:
    figure_metadata_dict = json.load(f)

semantic_chunks_with_figures = attach_figures_to_chunks(deduped_chunks, figure_metadata_dict)

summarized_chunks = summarize_chunks(semantic_chunks_with_figures)

with open(final_json_path, 'w', encoding='utf-8') as f:
    json.dump([chunk.model_dump() for chunk in summarized_chunks], f, ensure_ascii=False, indent=2)

Summarizing chunks: 100%|██████████| 92/92 [10:34<00:00,  6.90s/it]


## Let's evaluate our new JSONs to make sure they make sense

In [19]:
def check_summary_quality(chunks, min_word_count=8):
    empty = [c for c in chunks if not c.chunk_summary.strip()]
    short = [c for c in chunks if len(c.chunk_summary.split()) < min_word_count]

    return {
        "empty_summaries": len(empty),
        "short_summaries": len(short),
        "examples": [(c.section_number, c.chunk_summary) for c in short[:3]]
    }


In [20]:
def check_figure_alignment(chunks):
    misaligned = [c for c in chunks if "Figura" in c.content and not c.figures]
    return {
        "missing_figure_links": len(misaligned),
        "examples": [(c.section_number, c.content[:200]) for c in misaligned[:3]]
    }


In [21]:
def estimate_token_count(text):
    return int(len(text) / 4)

def check_embedding_compatibility(chunks, max_tokens=8192):
    too_large = [c for c in chunks if estimate_token_count(c.content) > max_tokens]
    return {
        "too_large_chunks": len(too_large),
        "examples": [(c.section_number, estimate_token_count(c.content)) for c in too_large[:3]]
    }


In [22]:
def check_section_numbers(chunks):
    section_nums = [c.section_number for c in chunks]
    seen = set()
    duplicates = []

    for sec in section_nums:
        if sec in seen:
            duplicates.append(sec)
        seen.add(sec)

    def to_tuple(s):
        return tuple(int(x) for x in s.strip('.').split('.'))

    sorted_sections = sorted(section_nums, key=to_tuple)
    out_of_order = [s for s, t in zip(section_nums, sorted_sections) if s != t]

    return {
        "duplicates": duplicates,
        "out_of_order": out_of_order[:5]
    }


In [226]:
from openai import OpenAI
openai = OpenAI(api_key=OPENAI_API_KEY))

def test_retrieval_qa(chunk: SemanticChunk, question: str) -> str:
    system_prompt = "You are an expert assistant helping with technical documentation."
    context = f"Use the following document section:\n\n{chunk.content}"

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"{context}\n\nQuestion: {question}"}
    ]

    response = openai.chat.completions.create(
        model="gpt-4-turbo",
        messages=messages,
        temperature=0.0
    )
    return response.choices[0].message.content.strip()


In [10]:
import json
from typing import List

def load_chunks_from_json(path: str) -> List[SemanticChunk]:
    with open(path, "r", encoding="utf-8") as f:
        raw_data = json.load(f)

    return [SemanticChunk(**chunk) for chunk in raw_data]


In [24]:
def evaluate_chunks_from_valid(chunks):
    results = {}

    results.update(check_summary_quality(chunks))
    results.update(check_figure_alignment(chunks))
    results.update(check_section_numbers(chunks))
    results.update(check_embedding_compatibility(chunks))

    return results

In [25]:
chunks = load_chunks_from_json("/content/drive/MyDrive/rag_comparison/data/wifi_manual/final_wifi_chunks.json")
results = evaluate_chunks_from_valid(chunks)
print(results)

{'empty_summaries': 0, 'short_summaries': 0, 'examples': [], 'missing_figure_links': 0, 'duplicates': [], 'out_of_order': [], 'too_large_chunks': 0}


In [26]:
chunks = load_chunks_from_json("/content/drive/MyDrive/rag_comparison/data/VM_manual/final_VM_chunks.json")
results = evaluate_chunks_from_valid(chunks)
print(results)

{'empty_summaries': 0, 'short_summaries': 0, 'examples': [], 'missing_figure_links': 0, 'duplicates': [], 'out_of_order': ['5.4.1.1.6', '5.5.1.', '5.5.1.1.', '5.5.1.2.', '5.5.1.3.'], 'too_large_chunks': 0}


In [27]:
def print_section_order(chunks):
    def to_tuple(sec):
        return tuple(int(x) for x in sec.strip(".").split(".") if x.isdigit())

    sorted_chunks = sorted(chunks, key=lambda c: to_tuple(c.section_number))

    for chunk in sorted_chunks:
        print(chunk.section_number)


In [28]:
wifi_chunks = load_chunks_from_json("/content/drive/MyDrive/rag_comparison/data/wifi_manual/final_wifi_chunks.json")
print_section_order(wifi_chunks)

1
2
3
4
5
5.1
5.1.1
5.1.2
5.1.3
5.1.3.1
5.1.3.1.1
5.1.3.1.2
5.1.3.1.3
5.1.3.1.4
5.1.3.2
5.1.3.2.1
5.1.3.2.2
5.1.3.2.3
5.1.3.2.4
5.1.3.3
5.1.3.3.1
5.1.3.4
5.1.3.4.1
5.1.3.5
5.1.3.5.1
5.1.3.6
5.1.3.7
5.1.3.8
5.1.3.9
5.1.3.10.
5.1.3.11.
5.1.3.12.
5.1.4
5.2
5.2.1
5.2.2
5.2.2.1
5.2.2.2
6
7
7.1
7.2
7.3
8
9
10


In [29]:
vm_chunks = load_chunks_from_json("/content/drive/MyDrive/rag_comparison/data/VM_manual/final_VM_chunks.json")
print_section_order(vm_chunks)

1
2
3
4
5
5.1
5.1.1
5.1.2
5.1.3
5.2
5.2.1
5.2.2
5.2.2.1
5.2.2.2
5.3
5.3.1
5.3.1.1
5.3.1.2
5.3.1.3
5.3.2
5.3.2.1
5.3.2.1.1
5.3.2.1.2
5.3.2.1.3
5.3.2.1.3.1.
5.3.2.1.4
5.3.2.1.4.1.
5.3.2.1.4.2.
5.3.2.1.4.3.
5.3.2.1.4.4.
5.3.2.1.4.5.
5.4
5.4.1.1.1
5.4.1.1.2
5.4.1.1.2.1.
5.4.1.1.2.2.
5.4.1.1.3
5.4.1.1.3.1.
5.4.1.1.4
5.4.1.1.5
5.4.1.1.5.1.
5.4.1.1.5.2.
5.4.1.1.6
5.4.1.1.7
5.4.1.1.7.1.
5.4.1.1.7.2.
5.4.1.1.8
5.4.1.1.8.1.
5.4.1.1.9
5.4.1.1.9.1.
5.4.1.1.9.2.
5.4.1.1.9.3.
5.4.1.1.9.4.
5.5
5.5.1.
5.5.1.1.
5.5.1.2.
5.5.1.3.
5.5.1.4.
5.5.1.5.
5.5.1.6.
5.5.1.7.
5.5.1.8.
5.5.2.
5.5.2.1.
5.5.2.2.
5.5.2.3.
5.5.3.
5.5.3.1.
5.5.3.1.1.
5.5.3.1.2.
5.5.3.2.
5.5.3.3.
5.5.3.4.
5.5.3.4.1.
5.5.3.4.2.
5.5.3.5.
5.5.3.5.1.
6.
6.1.
6.2.
7.
7.1.
7.2.
8.
9.
9.1.
9.2.
9.3.
10.
11.
12.


In [30]:
def normalize_section_number(sec: str) -> str:
    return sec.strip().rstrip(".")
from collections import defaultdict

def group_chunks_by_section(chunks):
    grouped = defaultdict(list)
    for c in chunks:
        key = normalize_section_number(c.section_number)
        grouped[key].append(c)
    return {k: v for k, v in grouped.items() if len(v) > 1}  # only duplicates
def compare_duplicate_chunks(grouped):
    for section, versions in grouped.items():
        print(f"\n🔁 Duplicate Section: {section} (x{len(versions)})")
        for i, c in enumerate(versions):
            print(f"\n— Version {i+1} —")
            print(f"Title: {c.section_title}")
            print(f"Summary: {c.chunk_summary}")
            print(f"Figures: {[f.figure_id for f in c.figures]}")
            print(f"Content Preview:\n{c.content[:300]}...\n")


In [387]:
from collections import defaultdict

def normalize_section_number(sec: str) -> str:
    return sec.strip().rstrip(".")

def group_duplicates(chunks):
    grouped = defaultdict(list)
    for c in chunks:
        norm = normalize_section_number(c.section_number)
        grouped[norm].append(c)
    return {k: v for k, v in grouped.items() if len(v) > 1}

import difflib

def analyze_duplicates(grouped):
    comparison_report = []

    for section, versions in grouped.items():
        if len(versions) != 2:
            continue

        v1, v2 = versions

        v1_words = len(v1.content.split())
        v2_words = len(v2.content.split())

        v1_figs = len(v1.figures)
        v2_figs = len(v2.figures)

        # summary_ratio = difflib.SequenceMatcher(None, v1.chunk_summary, v2.chunk_summary).ratio()
        content_ratio = difflib.SequenceMatcher(None, v1.content, v2.content).ratio()

        comparison_report.append({
            "section": section,
            "v1_words": v1_words,
            "v2_words": v2_words,
            "v1_figures": v1_figs,
            "v2_figures": v2_figs,
            # "summary_similarity": round(summary_ratio, 2),
            "content_similarity": round(content_ratio, 2),
            "v1_preview": v1.content[:150].strip(),
            "v2_preview": v2.content[:150].strip()
        })

    return comparison_report


In [31]:
duplicates = group_chunks_by_section(wifi_chunks)

# Compare side-by-side
compare_duplicate_chunks(duplicates)

In [32]:
duplicates = group_chunks_by_section(vm_chunks)

# Compare side-by-side
compare_duplicate_chunks(duplicates)

In [381]:
import pandas as pd
groups = group_duplicates(vm_chunks)
comparison_report = analyze_duplicates(groups)
df = pd.DataFrame(comparison_report)
df.sort_values("v2_words", ascending=False)



Unnamed: 0,section,v1_words,v2_words,v1_figures,v2_figures,content_similarity,v1_preview,v2_preview
38,12,10,775,0,53,0.01,## 12. INDICE DELLE FIGURE\n\n\n\n\n*Pagina 5 ...,# 12. INDICE DELLE FIGURE\n\nFIGURA 1 - VMWARE...
26,6.1,5,160,0,0,0.05,### 6.1. SEGMENTI DI RETE,### 6.1. SEGMENTI DI RETE\n\nNello IaaS INSIEL...
0,5.4.1.1.6,52,96,0,2,0.03,##### 5.4.1.1.6 NAT47\n##### 5.4.1.1.7 DNS.......,##### 5.4.1.1.6 NAT\n\n\n\n\n\n\nNessuna regol...
30,7.2,4,74,0,0,0.08,### 7.2. NOMI GRUPPI,## 7.2. NOMI GRUPPI\n\nPer i nomi dei gruppi v...
7,5.5.1.6,6,72,0,0,0.15,### 5.5.1.6. ESPANSIONE DISCO DI SISTEMA,### 5.5.1.6. ESPANSIONE DISCO DI SISTEMA\nIn f...
27,6.2,10,58,0,0,0.27,### 6.2. RANGE DI IP DEDICATI PER TIPOLOGIA DI...,### 6.2. RANGE DI IP DEDICATI PER TIPOLOGIA DI...
31,8,5,53,0,0,0.18,## 8. GESTIONE DELLA DOCUMENTAZIONE,## 8. GESTIONE DELLA DOCUMENTAZIONE\n\nNel pre...
6,5.5.1.5,4,52,0,0,0.17,### 5.5.1.5. DISCHI AGGIUNTIVI,### 5.5.1.5. DISCHI AGGIUNTIVI\nÈ possibile ch...
15,5.5.3.1,5,32,0,0,0.28,### 5.5.3.1. MODIFICA NOME HOST,### 5.5.3.1. MODIFICA NOME HOST\nIl nome host ...
8,5.5.1.7,5,30,0,0,0.3,### 5.5.1.7. CAMBIO DENOMINAZIONE VM,### 5.5.1.7. CAMBIO DENOMINAZIONE VM\nDopo la ...


In [None]:
# for c in summarized_chunks:
#     c.section_number = normalize_section_number(c.section_number)

deduped_chunks = deduplicate_by_section_number(vm_chunks)

with open("/content/drive/MyDrive/rag_comparison/data/VM_manual/final_VM_chunks.json", "w", encoding="utf-8") as f:
    json.dump([c.model_dump() for c in deduped_chunks], f, ensure_ascii=False, indent=2)


In [33]:
def extract_used_figures(chunks: List[SemanticChunk]) -> set:
    used = set()
    for chunk in chunks:
        for fig in chunk.figures:
            used.add(fig["figure_id"] if isinstance(fig, dict) else fig.figure_id)
    return used
def compare_figures(fig_metadata, chunks):
    defined = {fig["figure_id"] for fig in fig_metadata}
    used = extract_used_figures(chunks)

    missing_from_chunks = defined - used
    unknown_figures = used - defined

    return {
        "defined_total": len(defined),
        "used_total": len(used),
        "unused_figures": sorted(missing_from_chunks),
        "invalid_figures": sorted(unknown_figures)
    }


In [3]:
def load_figure_metadata_from_dict(path):
    with open(path, 'r', encoding='utf-8') as f:
        data = json.load(f)


    if not isinstance(data, dict):
        raise ValueError("Expected figure metadata to be a dictionary")


    return list(data.values())


In [35]:
vm_chunks = load_chunks_from_json("/data/VM_manual/final_VM_chunks.json")
vm_figures = load_figure_metadata_from_dict("/data/VM_manual/figure_metadata.json")


report = compare_figures(vm_figures, vm_chunks)
print(json.dumps(report, indent=2))

{
  "defined_total": 90,
  "used_total": 90,
  "unused_figures": [],
  "invalid_figures": []
}


In [344]:
def find_chunk_references(fig_number: str, chunks: List[SemanticChunk]):
    results = []
    pattern = re.compile(rf"(Figura\s+{fig_number}\s*-\s*[^\n]+)", re.IGNORECASE)

    for chunk in chunks:
        matches = pattern.findall(chunk.content)
        if matches:
            results.append({
                "section": chunk.section_number,
                "title": chunk.section_title,
                "matched_lines": matches,
                "attached_figures": [fig.figure_id if hasattr(fig, "figure_id") else fig["figure_id"] for fig in chunk.figures]
            })
    return results


In [None]:
for fig_num in ["21", "30", "44"]:
    print(f"\n Figura {fig_num} references:")
    refs = find_chunk_references(fig_num, vm_chunks)
    for r in refs:
        print(f"\n {r['section']} – {r['title']}")
        for line in r["matched_lines"]:
            print(f" {line}")
        print(f" Attached figures: {r['attached_figures']}")


ahhhh we have 3 figure 69 but we match on figure num and then just attribute the first figure match!!!

In [36]:
wifi_chunks = load_chunks_from_json("/data/wifi_manual/final_wifi_chunks.json")
wifi_figures = load_figure_metadata_from_dict("/data/wifi_manual/figure_metadata.json")

# Run figure comparison
report = compare_figures(wifi_figures, wifi_chunks)
print(json.dumps(report, indent=2))

{
  "defined_total": 49,
  "used_total": 49,
  "unused_figures": [],
  "invalid_figures": []
}


In [323]:
test_chunk1 = {
    "section_number": "5.4.1.1.6",
    "section_title": "NAT",
    "content": "##### 5.4.1.1.6 NAT\n\n\n\n\n\n\nNessuna regola NAT definita. È possibile iniziare facendo clic su \"Aggiungi regola NAT\"\n\nFigura 68 - VMWare NSX – Rete – NAT\n\nSono presenti nella griglia i seguenti campi:\n\n- Nome.\n\n- Azione.\n\n- Associa.\n\n- IP di origine.\n\n- IP di destinazione/Porta.\n\n- IP convertito/Porta.\n\n- Applica a.\n\n- Abilitata.\n\n- Stato.\n\n\n\n*Pagina 48 di 66 (22/10/2024)*\n\n\nFigura 69 - VMWare NSX – Rete – NAT – Configurazione NAT\n\nQuesta funzionalità è utile per configurare NAT nel contesto di un gateway di Livello 1 predefinito.\nAl momento non sono presenti NAT configurati.",
    "parent_topics": [
      "MODALITÀ ESECUTIVE",
      "CLOUD FIREWALL",
      "PROFILI"
    ],
    "document": "ISTRUZIONE OPERATIVA CREAZIONE VM CLOUD INSIEL REV 00",
    "chunk_summary": "This section of the technical manual, titled 'MODALITÀ ESECUTIVE > CLOUD FIREWALL > PROFILI > NAT', focuses on the configuration and management of Network Address Translation (NAT) rules within a VMWare NSX environment. It begins by indicating that no NAT rules are currently defined and provides a prompt to add new NAT rules. Key fields listed for rule configuration include Name, Action, Bind, Source IP, Destination IP/Port, Translated IP/Port, Apply to, Enabled, and Status. Two figures are included: one showing the NAT network setup and another detailing the NAT configuration steps, emphasizing the utility of NAT in the context of a predefined Level 1 gateway.",
    "figures": []
}

test_chunk2 = {
    "section_number": "5.4.1.1.7.1.",
    "section_title": "SERVIZI DNS",
    "content": """#### 5.4.1.1.7.1.SERVIZI DNS

Figura 69 - VMWare NSX – DNS – Servizi DNS

Sono presenti nella griglia i seguenti campi:

- Nome

- Gateway
- IP servizio DNS
- Zona DNS predefinita
- Zona FQDN
- Stato

Figura 69 - VMWare NSX – DNS – Servizi DNS – Configurazione DNS

Non vi sono servizi DNS configurati a livello di NSX.

NOTA BENE:
- I servizi DNS vengono forniti nel contesto dell'Active Directory.
- Tipicamente i servizi DNS vengono configurati manualmente sia per le VM collegate al dominio che per le VM non collegate al dominio.""",
    "figures": []
}



In [321]:
def match_caption_to_content(fig, content: str) -> bool:
    lines = re.findall(r"(Figura\s+\d+\s*-\s*[^\n]+)", content, flags=re.IGNORECASE)
    fig_caption = fig["caption"].lower()
    return any(fig_caption in line.lower() for line in lines)

def simulate_attach_figures(chunk, figure_metadata_list):
    fig_index = {}
    for fig in figure_metadata_list:
        match = re.search(r"Figura\s+(\d+)", fig["figure_id"])
        if match:
            fig_index.setdefault(match.group(1), []).append(fig)

    matched_figures = []
    seen_ids = set()

    figure_nums = re.findall(r"Figura\s+(\d+)", chunk["content"], flags=re.IGNORECASE)

    for num in figure_nums:
        for fig in fig_index.get(num, []):
            if fig["figure_id"] not in seen_ids and match_caption_to_content(fig, chunk["content"]):
                matched_figures.append(fig)
                seen_ids.add(fig["figure_id"])

    return matched_figures



In [324]:
matched = simulate_attach_figures(test_chunk1, vm_figures)

for fig in matched:
    print(f"✅ Matched: {fig['figure_id']}")


✅ Matched: Figura 68_vmware_nsx__rete__nat
✅ Matched: Figura 69_vmware_nsx__rete__nat__configurazione_nat


In [325]:
matched = simulate_attach_figures(test_chunk2, vm_figures)

for fig in matched:
    print(f"✅ Matched: {fig['figure_id']}")


✅ Matched: Figura 69_vmware_nsx__dns__servizi_dns
✅ Matched: Figura 69_vmware_nsx__dns__servizi_dns__configurazione_dns


Clean JSON even more to seperate some metadata

In [24]:
import json
import re
from pathlib import Path

def clean_whitespace(text: str) -> str:
    text = re.sub(r'\n{3,}', '\n\n', text)
    text = re.sub(r'[ \t]+$', '', text, flags=re.MULTILINE)
    return text.strip()

def determine_section_type(section_title: str) -> str:
    title_lower = section_title.lower()

    if any(word in title_lower for word in ["configurazione", "config", "setup"]):
        return "configuration"
    elif any(word in title_lower for word in ["verifica", "test", "check"]):
        return "verification"
    elif any(word in title_lower for word in ["dettaglio", "detail"]):
        return "detail"
    elif any(word in title_lower for word in ["introduzione", "scopo", "ambito"]):
        return "introduction"
    elif any(word in title_lower for word in ["riferimenti", "allegati", "indice"]):
        return "reference"
    else:
        return "general"

def transform_chunk(chunk):
    section_id = chunk.get("section_number", "")
    section_title = chunk.get("section_title", "")
    page_numbers = chunk.get("page_numbers", [])

    if not page_numbers:
        page_range = ""
    elif len(page_numbers) == 1:
        page_range = str(page_numbers[0])
    else:
        page_range = f"{min(page_numbers)}-{max(page_numbers)}"

    parent_path = " > ".join(chunk.get("parent_topics", []))

    content = clean_whitespace(chunk.get("content", ""))
    content_summary = chunk.get("chunk_summary", "").strip()

    figures = []
    for fig in chunk.get("figures", []):
        figures.append({
            "figure_id": fig.get("figure_id", "").replace(" ", "_"),
            "filename": fig.get("filename", ""),
            "caption": fig.get("caption", "").capitalize(),
            "page": fig.get("page", "")
        })

    return {
        "section_id": section_id,
        "section_title": section_title,
        "page_range": page_range,
        "parent_path": parent_path,
        "document": chunk.get("document", ""),
        "content": content,
        "content_summary": content_summary,
        "figures": figures,
        "metadata": {
            "original_section_number": section_id,
            "original_page_numbers": page_numbers,
            "language": "Italian",
            "section_type": determine_section_type(section_title)
        }
    }

def process_manual(input_file, output_file):
    with open(input_file, "r", encoding="utf-8") as f:
        data = json.load(f)

    transformed = [transform_chunk(chunk) for chunk in data]

    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(transformed, f, indent=2, ensure_ascii=False)

    print(f"✓ {len(transformed)} chunks written to {output_file}")



In [25]:
wifi_input = "/data/wifi_manual/final_wifi_chunks.json"
wifi_output = "/data/wifi_manual/cleaned_wifi_chunks.json"
process_manual(wifi_input, wifi_output)

✓ 46 chunks written to /content/drive/MyDrive/rag_comparison/data/wifi_manual/cleaned_wifi_chunks.json


In [26]:
vm_input = "/data/VM_manual/final_VM_chunks.json"
vm_output = "/data/VM_manual/cleaned_VM_chunks.json"
process_manual(vm_input, vm_output)

✓ 92 chunks written to /content/drive/MyDrive/rag_comparison/data/VM_manual/cleaned_VM_chunks.json
