Automated Methodology for Generating Individuals in Document Ontology

In [None]:
# Miro: https://miro.com/app/board/uXjVI7yFd2Y=/

# Methodology developed by the Portuguese consortium, designed to enable the extraction of ontology individuals from documents in a faster and more efficient way.
# This methodology is based on a free GPT API called DevsDoCode (t.me/devsdocode) and implemented using Python.

# The methodology needs some specific improvements, but for the initial idea it is already fully functional. The code is provided as a starting point for further development and improvement.

Extract PDF to word using ILovePDF

In [None]:
# The content is converted from PDF to Word to facilitate automated information extraction, while ensuring the preservation of the original table structure and formatting. For this reason, a web-based converter was used instead of Python libraries, as the latter often distorted the layout.

Fire-Safety Document Analyzer – Country, Language and Standards Extraction Assistant

In [None]:
# This script reads a .docx file containing technical or legal text, sends the content to a GPT API that identifies the country, language, and the possible fire safety standards. The user can view these standards in a graphical interface, select the relevant standard for analysis, manually add new ones, and save the final selection to a JSON file.

In [None]:
import os
import re
import json
import time
import textwrap
import tkinter as tk
import customtkinter as ctk
from tkinter import messagebox
import docx
from openai import OpenAI

# ─────────── OpenAI Configuration ───────────
# Initialize OpenAI client
CLIENT = OpenAI(
    api_key="ddc-temp-free-e3b73cd814cc4f3ea79b5d4437912663",
    base_url="https://api.devsdocode.com/v1",
)

# ─────────── File Paths ───────────
# Input Word document and output JSON file paths
DOCX_INPUT = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Documento_Nativo.docx"
OUTPUT_JSON = DOCX_INPUT.replace(".docx", "_results_gui.json")

# ─────────── Helper Functions ───────────
def extract_docx_text(path: str) -> str:
    """Extract text from a Word document."""
    try:
        doc = docx.Document(path)
        return "\n".join(paragraph.text.strip() for paragraph in doc.paragraphs if paragraph.text.strip())
    except Exception as e:
        print(f"Error reading Word document: {e}")
        return ""

def query_gpt(prompt: str, retries=3, temperature=0):
    """Query OpenAI API and return response."""
    wait = 6
    for attempt in range(retries):
        try:
            response = CLIENT.chat.completions.create(
                model="provider-4/gpt-4.1",
                messages=[{"role": "user", "content": prompt}],
                temperature=temperature
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            print(f"GPT error (attempt {attempt+1}/{retries}): {e}")
            time.sleep(wait)
            wait *= 2
    return ""

def clean_json(text: str):
    """Sanitize and parse GPT output into valid JSON."""
    # Remove code fences and extra whitespace
    text = re.sub(r"```(?:json)?(.*?)```", r"\1", text, flags=re.DOTALL).strip()
    text = re.sub(r"^json\s*", "", text, flags=re.IGNORECASE)
    try:
        # Extract JSON object
        text = text[text.index("{"):text.rindex("}")+1]
        # Fix common JSON errors
        text = re.sub(r",\s*}", "}", text)
        text = re.sub(r",\s*]", "]", text)
        return json.loads(text)
    except Exception as e:
        print(f"JSON parsing error: {e}")
        return {
            "country": "Unknown",
            "language": "??",
            "legal_regulations": [],
            "technical_standards": [],
            "other_documents": []
        }

def shorten_text(text: str, max_words=30, max_chars=200):
    """Shorten text for display."""
    text = text.strip()
    if not text:
        return ""
    # Capitalize first letter
    text = text[0].upper() + text[1:]
    # Truncate by words
    if len(text.split()) > max_words:
        text = " ".join(text.split()[:max_words]) + "…"
    # Truncate by characters
    if len(text) > max_chars:
        text = text[:max_chars].rsplit(" ", 1)[0] + "…"
    # Ensure proper punctuation
    if text[-1] not in ".!?":
        text += "."
    return text

# ─────────── Prompt Template ───────────
# Prompt for extracting fire-safety standards
PROMPT_TEMPLATE = """You are an international expert in fire-safety legislation.

Analyze the excerpt below and return:
1. "country" – detected country
2. "language" – ISO-2 code
3. "standards" – only the official fire-safety documents explicitly cited.
Group them in:
 • "legal_regulations"
 • "technical_standards"
 • "other_documents"

Each entry: { "name": "...", "country": "<same>", "justification": "<≤120 words>" }

Return ONLY this JSON (no markdown):

{
 "country": "...",
 "language": "...",
 "legal_regulations": [...],
 "technical_standards": [...],
 "other_documents": [...]
}

Excerpt:
\"\"\"{chunk}\"\"\""""

# ─────────── GPT Processing ───────────
def detect_and_list_standards(text: str):
    """Split text and query GPT for fire-safety standards."""
    MAX_CHUNK_SIZE = 15000
    # Split text into chunks if too long
    chunks = textwrap.wrap(text, MAX_CHUNK_SIZE) if len(text) > MAX_CHUNK_SIZE else [text]
    aggregated = {
        "legal_regulations": [],
        "technical_standards": [],
        "other_documents": []
    }
    country = language = "Unknown"

    for chunk in chunks:
        # Query GPT and clean response
        data = clean_json(query_gpt(PROMPT_TEMPLATE.format(chunk=chunk)))
        country = data.get("country", country)
        language = data.get("language", language)
        # Aggregate standards
        for key, value in data.items():
            if key in aggregated:
                for item in value:
                    item["justification"] = shorten_text(item.get("justification", ""))
                    aggregated[key].append(item)

    # Remove duplicates
    for key in aggregated:
        seen = set()
        unique_list = []
        for item in aggregated[key]:
            if item["name"] not in seen:
                unique_list.append(item)
                seen.add(item["name"])
        aggregated[key] = unique_list

    return {
        "country": country,
        "language": language,
        "standards": aggregated
    }

# ─────────── GUI ───────────
ctk.set_default_color_theme("blue")
ctk.set_appearance_mode("light")

class FireSafetyGUI(ctk.CTk):
    """GUI for displaying and interacting with detected fire-safety standards."""
    FONT_LARGE = ("Segoe UI", 17)
    FONT_TEXT = ("Segoe UI", 13)

    def __init__(self, data):
        super().__init__(fg_color="#F5F6FA")
        self.data = data
        self.title("Document Analyzer")
        self.geometry("1000x680")

        # Header
        header = ctk.CTkFrame(self, fg_color="#e9ecef")
        header.pack(fill="x", padx=22, pady=12)
        ctk.CTkLabel(header, text="Detected Country:", font=self.FONT_LARGE).grid(row=0, column=0, padx=12, sticky="w")
        ctk.CTkLabel(header, text=data["country"], font=self.FONT_LARGE, text_color="#0d6efd").grid(row=0, column=1, sticky="w")
        ctk.CTkLabel(header, text="Language:", font=self.FONT_LARGE).grid(row=0, column=2, padx=(40, 4))
        ctk.CTkLabel(header, text=data["language"], font=self.FONT_LARGE, text_color="#0d6efd").grid(row=0, column=3, sticky="w")

        # Tab view for standards
        self.tab_view = ctk.CTkTabview(self, corner_radius=8)
        self.tab_view.pack(fill="both", expand=True, padx=22, pady=8)
        self.checkboxes = []
        for section, title in [
            ("legal_regulations", "Legal Regulations"),
            ("technical_standards", "Technical Standards"),
            ("other_documents", "Other Documents")
        ]:
            self._populate_section(self.tab_view.add(title), data["standards"][section])

        # Manual addition frame
        add_frame = ctk.CTkFrame(self)
        add_frame.pack(fill="x", padx=22)
        self.manual_entry = tk.StringVar()
        ctk.CTkEntry(
            add_frame,
            textvariable=self.manual_entry,
            placeholder_text="➕ Add regulation manually",
            width=600,
            font=self.FONT_TEXT
        ).pack(side="left", padx=6, pady=10)
        ctk.CTkButton(add_frame, text="Add", command=self.add_manual, width=120).pack(side="left", padx=6)

        # Confirm button
        ctk.CTkButton(
            self,
            text="Confirm Selection",
            command=self.save,
            font=("Segoe UI", 15, "bold"),
            fg_color="#198754",
            hover_color="#157347"
        ).pack(pady=14)

    def _populate_section(self, parent, items):
        """Populate a section with standard items."""
        for idx, item in enumerate(items):
            if not {"name", "country"}.issubset(item):
                continue
            var = ctk.BooleanVar(value=False)  # Default: unselected
            card = ctk.CTkFrame(parent, fg_color="#ffffff", corner_radius=6, border_width=1, border_color="#dfe3e6")
            card.pack(fill="x", padx=8, pady=5)
            ctk.CTkCheckBox(
                card,
                text=f"{item['name']} ({item['country']})",
                variable=var,
                font=self.FONT_LARGE
            ).pack(anchor="w", padx=10, pady=(6, 2))
            if item.get("justification"):
                wrap_length = max(self.winfo_screenwidth() - 250, 940)
                ctk.CTkLabel(
                    card,
                    text=item['justification'],
                    font=self.FONT_TEXT,
                    text_color="#6c757d",
                    wraplength=wrap_length
                ).pack(anchor="w", padx=36, pady=(0, 8))
            self.checkboxes.append((var, item))

    def add_manual(self):
        """Add a manually entered regulation."""
        name = self.manual_entry.get().strip()
        if not name:
            return
        if any(name.lower() == item["name"].lower() for _, item in self.checkboxes):
            messagebox.showinfo("Duplicate", "This regulation is already listed.")
            return
        item = {"name": name, "country": self.data["country"], "justification": ""}
        self._populate_section(self.tab_view.tab("Other Documents"), [item])
        self.manual_entry.set("")

    def save(self):
        """Save selected regulations to a JSON file."""
        selected = [item for var, item in self.checkboxes if var.get()]
        if not selected:
            messagebox.showwarning("Warning", "Please select at least one document.")
            return
        output_data = {
            "country": self.data["country"],
            "language": self.data["language"],
            "selected_norms": [{k: v for k, v in d.items() if k != "country"} for d in selected]
        }
        try:
            with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
                json.dump(output_data, f, indent=2, ensure_ascii=False)
            messagebox.showinfo("Saved", f"Saved to:\n{OUT_JSON}")
            self.destroy()
        except Exception as e:
            messagebox.showerror("Error", f"Failed to save JSON: {e}")

# ─────────── Main Execution ───────────
def main():
    """Main function: read document, query GPT, and launch GUI."""
    # Check if input file exists
    if not os.path.exists(DOCX_INPUT):
        print(f"Error: Input file {DOCX_INPUT} not found.")
        return

    # Extract text from Word document
    text = extract_docx_text(DOCX_INPUT)
    if not text:
        print("Error: No text extracted from document.")
        return

    print("▶️ Querying model…")
    # Detect standards using GPT
    data = detect_and_list_standards(text)
    if not any(data["standards"].values()):
        print("⚠️ Model returned no standards.")
        return

    # Launch GUI
    FireSafetyGUI(data).mainloop()

if __name__ == "__main__":
    main()

Automatic Table Extraction and Normalization from DOCX Documents

In [None]:
# Analyzes the Word file and automatically extracts all table data, identifying titles and content, including handling of continued tables when applicable.
# The extracted information is structured and saved as a JSON file, ready for further processing or validation.

In [None]:
import os
import re
import json
from docx import Document
from docx.table import Table
from docx.text.paragraph import Paragraph
from itertools import zip_longest

# ─────────── File Paths ───────────
# Input Word document and output JSON file paths
SOURCE_DOCX = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Documento_Nativo.docx"
DESTINATION_JSON = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\extracted_tables\Tables_clean.json"

# Ensure output directory exists
os.makedirs(os.path.dirname(DESTINATION_JSON), exist_ok=True)

# ─────────── Regex Patterns ───────────
# Match table titles with Roman numerals or digits
TABLE_TITLE_REGEX = re.compile(r"(?i)^(table|quadro)\s+([ivxlcdm\d]+)", re.IGNORECASE)
# Match dots separator (e.g., " . . . ")
DOTS_SEPARATOR = re.compile(r"\s*\.\s*\.\s*\.\s*")
# Match line breaks
LINE_BREAKS = re.compile(r"\s*\n\s*")

# ─────────── Helper Functions ───────────
def get_cell_text(cell):
    """Extract text from a table cell."""
    return "\n".join(paragraph.text.strip() for paragraph in cell.paragraphs if paragraph.text.strip())

def split_into_chunks(text):
    """Split text into chunks based on dots or line breaks."""
    # Priority: 1) dots separator, 2) line breaks, 3) whole text
    if DOTS_SEPARATOR.search(text):
        return [chunk.strip(" .") for chunk in DOTS_SEPARATOR.split(text) if chunk.strip(" .")]
    if LINE_BREAKS.search(text):
        return [chunk.strip() for chunk in LINE_BREAKS.split(text) if chunk.strip()]
    return [text.strip()]

def expand_row(cells):
    """Expand a row into multiple rows if cells contain multiple chunks."""
    parts = [split_into_chunks(cell) for cell in cells]
    # If all cells have one chunk, return original row
    if all(len(part) == 1 for part in parts):
        return [cells]
    # Create new rows from combinations of chunks
    rows = []
    for combo in zip_longest(*parts, fillvalue=""):
        rows.append(list(combo))
    return rows

def is_table_continuation(prev_table, current_table, paragraphs_between):
    """Check if current_table continues prev_table."""
    if not prev_table or not paragraphs_between:
        return False
    # Consider only significant paragraphs (non-empty and >10 chars)
    significant_paras = [p for p in paragraphs_between if p.strip() and len(p.strip()) > 10]
    return len(significant_paras) == 0

# ─────────── Table Extraction ───────────
def extract_tables(docx_path):
    """Extract tables from a Word document."""
    try:
        # Load document
        doc = Document(docx_path)
        body = doc._element.body
        tables = []
        recent_paragraphs = []
        paragraphs_between = []
        table_index = 1
        previous_table = None

        for element in body:
            # Handle paragraphs
            if element.tag.endswith('}p'):
                text = Paragraph(element, doc).text.strip()
                if text:
                    recent_paragraphs.append(text)
                    paragraphs_between.append(text)
                continue

            # Handle tables
            if element.tag.endswith('}tbl'):
                table = Table(element, doc)

                # Find table number and title
                number = f"TABLE {table_index}"
                title = f"TABLE {table_index}"
                found_title = False

                # Search recent paragraphs for table title
                for i in range(len(recent_paragraphs) - 1, -1, -1):
                    paragraph = recent_paragraphs[i]
                    match = TABLE_TITLE_REGEX.match(paragraph)
                    if match:
                        number = paragraph
                        title = recent_paragraphs[i + 1] if i + 1 < len(recent_paragraphs) else paragraph
                        found_title = True
                        break

                # Extract table rows
                rows = []
                for row in table.rows:
                    raw_cells = [get_cell_text(cell) for cell in row.cells]
                    rows += expand_row(raw_cells)

                # Normalize column count
                max_columns = max(len(row) for row in rows)
                for row in rows:
                    while len(row) < max_columns:
                        row.append("")

                # Check if table is a continuation
                current_table = {"table_content": rows}
                if previous_table and is_table_continuation(previous_table, current_table, paragraphs_between):
                    previous_table["table_content"].extend(rows[1:])  # Skip header row
                    previous_table["needs_review"] |= any(
                        DOTS_SEPARATOR.search(cell) or cell == "" for row in rows for cell in row
                    )
                else:
                    # Create new table entry
                    table_entry = {
                        "table_number": number,
                        "table_title": title,
                        "table_content": rows,
                        "needs_review": any(
                            DOTS_SEPARATOR.search(cell) or cell == "" for row in rows for cell in row
                        )
                    }
                    tables.append(table_entry)
                    previous_table = table_entry
                    table_index += 1

                # Reset paragraphs between tables
                paragraphs_between.clear()
                # Keep only the last 5 paragraphs
                recent_paragraphs = recent_paragraphs[-5:] if len(recent_paragraphs) > 5 else recent_paragraphs

        return tables

    except Exception as e:
        print(f"Error processing document: {e}")
        return []

# ─────────── Main Execution ───────────
def main():
    """Main function: extract tables and save to JSON."""
    # Check if input file exists
    if not os.path.exists(SOURCE_DOCX):
        print(f"Error: Input file {SOURCE_DOCX} not found.")
        return

    # Extract tables
    tables = extract_tables(SOURCE_DOCX)
    if not tables:
        print("No tables extracted.")
        return

    # Save to JSON
    try:
        with open(DESTINATION_JSON, "w", encoding="utf-8") as f:
            json.dump(tables, f, indent=4, ensure_ascii=False)
        review_count = sum(table["needs_review"] for table in tables)
        print(f"🚩 {review_count} out of {len(tables)} tables need manual review → {DESTINATION_JSON}")
    except Exception as e:
        print(f"Error saving JSON: {e}")

if __name__ == "__main__":
    main()

Automatic Detection and Extraction of Tables in PDF Documents via Visual Structure and OCR

In [None]:
# This script visually detects and extracts table images from a PDF document to be incorporated into the graphical interface, enabling easier formatting and handling of complex table structures.
# Need install poppler

In [None]:
import os
import re
import json
import cv2
import numpy as np
from collections import defaultdict
from pdf2image import convert_from_path
from PIL import Image
import pytesseract

# ─────────── Configuration ───────────
# Tesseract and Poppler paths
PYTESSERACT_PATH = r"C:\Users\DEC_User\Desktop\tesserate\tesseract.exe"
TESSDATA_PREFIX = r"C:\Users\DEC_User\Desktop\tesserate\tessdata"
POPPLER_PATH = r"C:\Users\DEC_User\Desktop\poppler-23.11.0\Library\bin"

# Input PDF and output directory
PDF_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Documento Nativo.pdf"
OUTPUT_DIR = "visual_tables"
PDF_DPI = 600

# Set Tesseract configuration
pytesseract.pytesseract.tesseract_cmd = PYTESSERACT_PATH
os.environ["TESSDATA_PREFIX"] = TESSDATA_PREFIX

# ─────────── Helper Functions ───────────
def preprocess_image_for_ocr(image):
    """Preprocess image for OCR by converting to grayscale and applying thresholding."""
    gray = np.array(image.convert("L"))
    _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return Image.fromarray(binary)

def roman_to_int(roman: str) -> int:
    """Convert Roman numerals to integer."""
    roman_values = {'I': 1, 'V': 5, 'X': 10, 'L': 50}
    result = 0
    prev_value = 0
    try:
        for char in reversed(roman):
            value = roman_values[char]
            if value >= prev_value:
                result += value
            else:
                result -= value
            prev_value = value
        return result
    except KeyError:
        print(f"Invalid Roman numeral: {roman}")
        return 0

# ─────────── Table Extraction ───────────
def extract_tables_visually(pdf_path: str, poppler_path: str, output_folder: str = OUTPUT_DIR, dpi: int = PDF_DPI):
    """Extract tables from PDF using computer vision and save as images."""
    # Check if input PDF exists
    if not os.path.exists(pdf_path):
        print(f"Error: PDF file {pdf_path} not found.")
        return

    # Create output directory
    os.makedirs(output_folder, exist_ok=True)

    try:
        # Convert PDF to images
        pages = convert_from_path(pdf_path, dpi=dpi, poppler_path=poppler_path)
    except Exception as e:
        print(f"Error converting PDF to images: {e}")
        return

    table_counter = defaultdict(int)

    for page_idx, page in enumerate(pages, start=1):
        try:
            # Convert page to grayscale and binarize
            gray = np.array(page.convert("L"))
            _, binary = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY_INV)

            # Detect horizontal and vertical lines
            kernel_h = cv2.getStructuringElement(cv2.MORPH_RECT, (50, 1))
            kernel_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 50))
            lines = cv2.add(
                cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel_h),
                cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel_v)
            )

            # Find contours (potential tables)
            contours, _ = cv2.findContours(lines, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            contours = sorted(contours, key=lambda c: cv2.boundingRect(c)[1])  # Sort top-down

            for contour in contours:
                x, y, w, h = cv2.boundingRect(contour)
                # Filter small contours
                if w < 100 or h < 40:
                    continue

                # Expand bounding box for context
                x1 = max(0, x - 10)
                x2 = min(page.width, x + w + 10)
                y1 = max(0, y - 250)  # Include area above for title
                y2 = y + h

                # Crop and save table image
                cropped = page.crop((x1, y1, x2, y2))
                table_counter[page_idx] += 1
                filename = f"PAGE_{page_idx:03d}_table_{table_counter[page_idx]}.png"
                output_path = os.path.join(output_folder, filename)
                cropped.save(output_path)
                print(f"🖼️ Saved: {filename}")

        except Exception as e:
            print(f"Error processing page {page_idx}: {e}")

# ─────────── Table Identification and Renaming ───────────
def identify_and_rename_tables(folder: str):
    """Identify table titles using OCR and rename images."""
    # Check if folder exists
    if not os.path.exists(folder):
        print(f"Error: Output folder {folder} not found.")
        return

    # Get PNG files
    files = sorted([f for f in os.listdir(folder) if f.endswith(".png")])
    if not files:
        print(f"No PNG files found in {folder}.")
        return

    used_names = defaultdict(int)
    last_table_id = None

    for file in files:
        path = os.path.join(folder, file)
        try:
            # Load image
            image = Image.open(path)
            width, height = image.size

            # Crop top half for title detection
            top_half = image.crop((0, 0, width, int(height * 0.5)))
            top_half = preprocess_image_for_ocr(top_half)

            # Perform OCR
            text = pytesseract.image_to_string(top_half, lang="eng", config="--psm 6")
            table_id = None

            # Search for table title
            for line in text.split("\n"):
                line = line.strip().upper()
                if line.startswith("QUADRO"):
                    match = re.search(r"QUADRO\s+([IVXLCDM]+)", line)
                    if match:
                        table_id = f"QUADRO_{match.group(1)}"
                    else:
                        table_id = line.replace(" ", "_")
                    break

            # Use last table ID if none found
            if table_id:
                last_table_id = table_id
            elif last_table_id:
                table_id = last_table_id
            else:
                print(f"❌ Table not identified in: {file} — no previous table to fallback")
                continue

            # Generate new filename
            used_names[table_id] += 1
            new_name = f"{table_id}_{used_names[table_id]}.png"
            new_path = os.path.join(folder, new_name)

            # Rename file if new path doesn't exist
            if not os.path.exists(new_path):
                os.rename(path, new_path)
                print(f"✅ Renamed: {file} → {new_name}")
            else:
                print(f"⚠️ Already exists: {new_name} — skipped to avoid overwrite")

        except Exception as e:
            print(f"Error processing {file}: {e}")

# ─────────── Main Execution ───────────
def main():
    """Main function: extract and rename table images from PDF."""
    # Check Tesseract and Poppler paths
    if not os.path.exists(PYTESSERACT_PATH):
        print(f"Error: Tesseract executable not found at {PYTESSERACT_PATH}.")
        return
    if not os.path.exists(POPPLER_PATH):
        print(f"Error: Poppler not found at {POPPLER_PATH}.")
        return

    # Extract tables
    extract_tables_visually(PDF_PATH, POPPLER_PATH)
    # Identify and rename tables
    identify_and_rename_tables(OUTPUT_DIR)

if __name__ == "__main__":
    main()

Interactive Table Editor for Visual Review and JSON Export

In [None]:
# This script provides a graphical interface to review, edit, and organize tables extracted from documents. It allows users to view table content and related images, edit titles and data, add or remove rows, merge or delete tables, and export the updated data to a new JSON file.

In [None]:
import os
import json
import tkinter as tk
from tkinter import ttk, messagebox
from PIL import Image, ImageTk
import glob

# ─────────── Configuration ───────────
# Input and output JSON files and image directory
JSON_INPUT = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\extracted_tables\Tables_clean.json"
JSON_OUTPUT = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\extracted_tables\Tables_clean_final.json"
IMAGE_DIR = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\visual_tables"

# ─────────── Utility Functions ───────────
def load_tables(path: str):
    """Load table data from a JSON file."""
    try:
        with open(path, encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"Error loading JSON {path}: {e}")
        return []

def save_tables(path: str, tables):
    """Save table data to a JSON file."""
    try:
        with open(path, "w", encoding="utf-8") as f:
            json.dump(tables, f, indent=4, ensure_ascii=False)
    except Exception as e:
        print(f"Error saving JSON {path}: {e}")

def build_table_summary(table_number, table_title, content):
    """Generate a natural-language summary of a table."""
    if not content or len(content) < 2:
        return ""
    header = content[0]
    rows = content[1:]
    summary = f"{table_number} presents '{table_title}' categorized by '{header[0]}'. "
    for row in rows:
        parts = [f"'{val}' '{header[i]}'" for i, val in enumerate(row)]
        phrase = f"For {parts[0]}, there should be {parts[1]}" + (f" with {parts[2]}" if len(parts) > 2 else "") + ". "
        summary += phrase
    return summary.strip()

def attach_summaries(tables):
    """Add or update the 'table_summary' field for each table."""
    for table in tables:
        table["table_summary"] = build_table_summary(
            table.get("table_number", "N/A"),
            table.get("table_title", "Untitled Table"),
            table.get("table_content", [])
        )

# ─────────── GUI Class ───────────
class TableReviewer(tk.Tk):
    """GUI for reviewing, editing, merging, and deleting extracted tables."""
    IMG_MAX_WIDTH, IMG_MAX_HEIGHT = 500, 380

    def __init__(self, tables):
        super().__init__()
        self.title("Table Reviewer")
        self.state("zoomed")

        # Configure ttk style
        style = ttk.Style(self)
        style.theme_use("clam")
        style.configure("Treeview.Heading", font=("Segoe UI", 10, "bold"))
        style.configure("Treeview", rowheight=26, font=("Segoe UI", 10))

        # Initialize data
        self.tables = tables
        self.current = None
        self.img_labels = []
        self.img_caches = []
        self.original_listbox_items = []

        # Layout: Split window into left and right panes
        paned = ttk.Panedwindow(self, orient=tk.HORIZONTAL)
        paned.pack(fill=tk.BOTH, expand=True)

        # Left panel: Table list
        left_frame = ttk.Frame(paned, padding=5)
        ttk.Label(left_frame, text="Tables", font=("Segoe UI", 11, "bold")).pack()
        self.listbox = tk.Listbox(left_frame, width=38, activestyle="none", selectmode=tk.BROWSE)
        self.listbox.pack(fill=tk.BOTH, expand=True, pady=5)
        scrollbar = ttk.Scrollbar(left_frame, command=self.listbox.yview)
        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        self.listbox.configure(yscroll=scrollbar.set)

        # Populate listbox
        for table in tables:
            icon = "⚠️" if table.get("needs_review", False) else "✔️"
            item_text = f"{icon}  {table['table_number']} — {table['table_title']}"
            self.original_listbox_items.append(item_text)
            self.listbox.insert(tk.END, item_text)

        self.listbox.bind("<<ListboxSelect>>", self.show_table)
        self.listbox.bind("<Configure>", self.update_listbox_items)

        # Right panel: Table details and editor
        right_frame = ttk.Frame(paned, padding=5)

        # Table info (number and title)
        info_frame = ttk.Frame(right_frame)
        info_frame.pack(fill=tk.X, pady=(0, 6))
        info_frame.columnconfigure(1, weight=1)
        info_frame.columnconfigure(3, weight=3)
        ttk.Label(info_frame, text="Number:").grid(row=0, column=0, sticky="e", padx=(0, 4))
        self.num_var = tk.StringVar()
        ttk.Entry(info_frame, textvariable=self.num_var).grid(row=0, column=1, sticky="ew", padx=(0, 8))
        ttk.Label(info_frame, text="Title:").grid(row=0, column=2, sticky="e", padx=(0, 8))
        self.title_var = tk.StringVar()
        self.title_entry = ttk.Entry(info_frame, textvariable=self.title_var)
        self.title_entry.grid(row=0, column=3, sticky="ew", padx=(0, 8))
        ttk.Button(info_frame, text="💾 Update", command=self.update_info).grid(row=0, column=4, padx=2)
        self.title_entry.bind("<Configure>", self.update_title_display)

        # Image display
        self.img_frame = ttk.Frame(right_frame)
        self.img_frame.pack(fill=tk.X, pady=(0, 6))

        # Table content grid (Treeview)
        self.tree = ttk.Treeview(right_frame, show="headings")
        y_scrollbar = ttk.Scrollbar(right_frame, command=self.tree.yview)
        y_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        x_scrollbar = ttk.Scrollbar(right_frame, command=self.tree.xview, orient=tk.HORIZONTAL)
        x_scrollbar.pack(side=tk.BOTTOM, fill=tk.X)
        self.tree.configure(yscroll=y_scrollbar.set, xscroll=x_scrollbar.set)
        self.tree.pack(fill=tk.BOTH, expand=True)
        self.tree.bind("<Double-1>", self.start_edit)

        # Action buttons
        button_frame = ttk.Frame(right_frame)
        button_frame.pack(fill=tk.X, pady=4)
        ttk.Button(button_frame, text="➕ Add Row", command=self.add_row).pack(side=tk.LEFT, padx=4)
        ttk.Button(button_frame, text="➖ Remove Row", command=self.remove_row).pack(side=tk.LEFT, padx=4)
        ttk.Button(button_frame, text="📄 Duplicate Row", command=self.duplicate_row).pack(side=tk.LEFT, padx=4)
        ttk.Button(button_frame, text="✔️ Mark Reviewed", command=self.mark_reviewed).pack(side=tk.LEFT, padx=8)
        ttk.Button(button_frame, text="🗑️ Delete", command=self.delete_tables_window).pack(side=tk.RIGHT, padx=4)
        ttk.Button(button_frame, text="💾 Save (Ctrl+S)", command=self.save_json).pack(side=tk.RIGHT, padx=4)
        ttk.Button(button_frame, text="🔀 Merge", command=self.merge_tables_window).pack(side=tk.RIGHT, padx=4)

        # Add panes to paned window
        paned.add(left_frame, weight=1)
        paned.add(right_frame, weight=4)

        # Global key binding
        self.bind("<Control-s>", lambda *_: self.save_json())

    # ─────────── Helper Methods ───────────
    def update_listbox_items(self, *_):
        """Update listbox items based on available width."""
        width = self.listbox.winfo_width()
        visible_chars = max(1, width // 7)
        selected = self.listbox.curselection()
        self.listbox.delete(0, tk.END)
        for text in self.original_listbox_items:
            self.listbox.insert(
                tk.END,
                text if len(text) <= visible_chars else text[:visible_chars - 3] + "..."
            )
        if selected:
            self.listbox.selection_set(selected)
            self.listbox.see(selected)

    def update_title_display(self, *_):
        """Update title entry display based on available width."""
        width = self.title_entry.winfo_width()
        visible_chars = max(1, width // 7)
        full_text = self.title_var.get()
        self.title_entry.delete(0, tk.END)
        self.title_entry.insert(
            0,
            full_text if len(full_text) <= visible_chars else full_text[:visible_chars - 3] + "..."
        )

    def show_table(self, *_):
        """Display selected table details, images, and content."""
        selection = self.listbox.curselection()
        if not selection:
            return
        self.current = selection[0]
        table = self.tables[self.current]
        data = table["table_content"]

        # Update info
        self.num_var.set(table["table_number"])
        self.title_var.set(table["table_title"])
        self.update_title_display()

        # Clear and load images
        for label in self.img_labels:
            label.destroy()
        self.img_labels.clear()
        self.img_caches.clear()

        table_number = table["table_number"].replace(" ", "_")
        img_pattern = os.path.join(IMAGE_DIR, f"{table_number}_*.png")
        img_paths = sorted(glob.glob(img_pattern))

        if img_paths:
            for img_path in img_paths:
                try:
                    img = Image.open(img_path)
                    img.thumbnail((self.IMG_MAX_WIDTH, self.IMG_MAX_HEIGHT))
                    photo = ImageTk.PhotoImage(img)
                    label = ttk.Label(self.img_frame, image=photo)
                    label.pack(pady=2)
                    self.img_labels.append(label)
                    self.img_caches.append(photo)  # Prevent garbage collection
                except Exception as e:
                    print(f"Error loading image {img_path}: {e}")
        else:
            ttk.Label(self.img_frame, text="(No image available)").pack(pady=2)

        # Update Treeview
        self.tree.delete(*self.tree.get_children())
        self.tree["columns"] = list(range(len(data[0])))
        for i, col in enumerate(self.tree["columns"]):
            self.tree.heading(col, text=data[0][i] or f"Column {i+1}")
            self.tree.column(col, width=160, stretch=True)
        for row in data[1:]:
            self.tree.insert("", tk.END, values=row)

    # ─────────── Table Editing ───────────
    def start_edit(self, event):
        """Enable in-place editing of a Treeview cell."""
        if self.tree.identify("region", event.x, event.y) != "cell":
            return
        row_id = self.tree.identify_row(event.y)
        col_idx = int(self.tree.identify_column(event.x)[1:]) - 1
        x, y, w, h = self.tree.bbox(row_id, f"#{col_idx + 1}")
        entry = tk.Entry(self.tree)
        entry.place(x=x, y=y, width=w, height=h)
        entry.insert(0, self.tree.item(row_id)["values"][col_idx])
        entry.focus()

        def commit(_=None):
            values = list(self.tree.item(row_id)["values"])
            values[col_idx] = entry.get()
            self.tree.item(row_id, values=values)
            entry.destroy()

        entry.bind("<Return>", commit)
        entry.bind("<FocusOut>", commit)

    def add_row(self):
        """Add a new empty row to the Treeview."""
        self.tree.insert("", tk.END, values=[""] * len(self.tree["columns"]))

    def duplicate_row(self):
        """Duplicate selected rows in the Treeview."""
        for row in self.tree.selection():
            self.tree.insert("", tk.END, values=self.tree.item(row)["values"])

    def remove_row(self):
        """Remove selected rows from the Treeview."""
        for row in self.tree.selection():
            self.tree.delete(row)

    # ─────────── Table Metadata ───────────
    def update_info(self):
        """Update table number and title."""
        if self.current is None:
            return
        table = self.tables[self.current]
        table["table_number"] = self.num_var.get()
        table["table_title"] = self.title_var.get()

        # Update listbox
        icon = "⚠️" if table.get("needs_review", False) else "✔️"
        item_text = f"{icon}  {table['table_number']} — {table['table_title']}"
        self.original_listbox_items[self.current] = item_text
        self.listbox.delete(self.current)
        self.listbox.insert(self.current, item_text)
        self.update_listbox_items()
        self.update_title_display()

    def mark_reviewed(self):
        """Mark the current table as reviewed."""
        if self.current is None:
            return
        self.tables[self.current]["needs_review"] = False
        item_text = self.listbox.get(self.current).replace("⚠️", "✔️")
        self.original_listbox_items[self.current] = item_text
        self.listbox.delete(self.current)
        self.listbox.insert(self.current, item_text)
        self.listbox.itemconfig(self.current, foreground="green")
        self.update_listbox_items()

    # ─────────── Save and Export ───────────
    def save_json(self, *_):
        """Save updated tables to JSON."""
        if self.current is not None:
            rows = [self.tree.item(i)["values"] for i in self.tree.get_children()]
            headers = [self.tree.heading(c)["text"] for c in self.tree["columns"]]
            table = self.tables[self.current]
            table["table_content"] = [headers] + rows
            table["table_number"] = self.num_var.get()
            table["table_title"] = self.title_var.get()

        attach_summaries(self.tables)  # Update summaries before saving
        save_tables(JSON_OUTPUT, self.tables)
        messagebox.showinfo("Saved", f"Saved to:\n{JSON_OUTPUT}")

    # ─────────── Merge Tables ───────────
    def merge_tables_window(self):
        """Open a window to select tables for merging."""
        window = tk.Toplevel(self)
        window.title("Merge Tables")
        ttk.Label(window, text="Select tables to merge:", font=("Segoe UI", 11, "bold")).pack(pady=5)

        self.merge_vars = []
        for idx, table in enumerate(self.tables):
            var = tk.BooleanVar(value=False)
            chk = ttk.Checkbutton(window, text=f"{table['table_number']} — {table['table_title'][:50]}", variable=var)
            chk.pack(anchor="w", padx=10)
            self.merge_vars.append((var, idx))

        ttk.Label(window, text="Number of the new table:").pack(pady=(8, 0))
        num_entry = ttk.Entry(window, width=20)
        num_entry.pack(pady=4)
        ttk.Label(window, text="Title of the new table:").pack()
        title_entry = ttk.Entry(window, width=60)
        title_entry.pack(pady=4)
        ttk.Button(
            window,
            text="✅ Merge",
            command=lambda: self.merge_selected_tables(window, num_entry.get(), title_entry.get())
        ).pack(pady=10)

    def merge_selected_tables(self, window, new_number, new_title):
        """Merge selected tables into a new table."""
        selected_idxs = [idx for var, idx in self.merge_vars if var.get()]
        if len(selected_idxs) < 2:
            messagebox.showwarning("Invalid selection", "Select at least two tables.")
            return

        # Check column compatibility
        num_cols = len(self.tables[selected_idxs[0]]["table_content"][0])
        for idx in selected_idxs:
            if len(self.tables[idx]["table_content"][0]) != num_cols:
                messagebox.showerror("Error", "Selected tables must have the same number of columns!")
                return

        # Merge content
        merged_content = [self.tables[selected_idxs[0]]["table_content"][0]]  # Header
        for idx in selected_idxs:
            merged_content.extend(self.tables[idx]["table_content"][1:])  # Rows

        # Create new table
        merged_table = {
            "table_number": new_number or "N/A",
            "table_title": new_title or "Merged Table",
            "table_content": merged_content,
            "needs_review": True,
            "image_path": None
        }

        # Add to tables and listbox
        self.tables.append(merged_table)
        icon = "⚠️"
        item_text = f"{icon}  {merged_table['table_number']} — {merged_table['table_title']}"
        self.original_listbox_items.append(item_text)
        self.listbox.insert(tk.END, item_text)
        messagebox.showinfo("Tables merged", "The new merged table was added successfully!")
        self.update_listbox_items()
        window.destroy()

    # ─────────── Delete Tables ───────────
    def delete_tables_window(self):
        """Open a window to select tables for deletion."""
        window = tk.Toplevel(self)
        window.title("Delete Tables")
        ttk.Label(window, text="Select tables to delete:", font=("Segoe UI", 11, "bold")).pack(pady=5)

        self.delete_vars = []
        for idx, table in enumerate(self.tables):
            var = tk.BooleanVar(value=False)
            chk = ttk.Checkbutton(window, text=f"{table['table_number']} — {table['table_title'][:50]}", variable=var)
            chk.pack(anchor="w", padx=10)
            self.delete_vars.append((var, idx))

        ttk.Button(window, text="🗑️ Delete", command=lambda: self.delete_selected_tables(window)).pack(pady=10)

    def delete_selected_tables(self, window):
        """Delete selected tables."""
        selected_idxs = [idx for var, idx in self.delete_vars if var.get()]
        if not selected_idxs:
            messagebox.showwarning("Invalid selection", "Select at least one table to delete.")
            return

        # Delete in reverse order to preserve indices
        selected_idxs.sort(reverse=True)
        for idx in selected_idxs:
            self.tables.pop(idx)
            self.original_listbox_items.pop(idx)
            self.listbox.delete(idx)

        # Clear right panel if current table was deleted
        if self.current is not None and self.current in selected_idxs:
            self.current = None
            self.num_var.set("")
            self.title_var.set("")
            for label in self.img_labels:
                label.destroy()
            self.img_labels.clear()
            self.img_caches.clear()
            self.tree.delete(*self.tree.get_children())
            self.tree["columns"] = []

        self.update_listbox_items()
        messagebox.showinfo("Tables deleted", "Selected tables were removed successfully!")
        window.destroy()

# ─────────── Main Execution ───────────
def main():
    """Main function: load tables and launch GUI."""
    # Check if input JSON exists
    if not os.path.exists(JSON_INPUT):
        messagebox.showerror("Error", f"Input JSON file not found: {JSON_INPUT}")
        return

    # Check if image directory exists
    if not os.path.exists(IMAGE_DIR):
        messagebox.showerror("Error", f"Image directory not found: {IMAGE_DIR}")
        return

    # Load tables and start GUI
    tables = load_tables(JSON_INPUT)
    if not tables:
        messagebox.showerror("Error", "No tables loaded from JSON.")
        return

    TableReviewer(tables).mainloop()

if __name__ == "__main__":
    main()

Automated Table Summarization and Translation with OpenAI

In [None]:
import os
import json
import time
from openai import OpenAI

# ─────────── OpenAI Configuration ───────────
# Initialize OpenAI client
CLIENT = OpenAI(
    api_key="ddc-temp-free-e3b73cd814cc4f3ea79b5d4437912663",
    base_url="https://api.devsdocode.com/v1",
)

# ─────────── File Paths ───────────
# Base directory and JSON file paths
BASE_DIR = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data"
INPUT_JSON = os.path.join(BASE_DIR, "extracted_tables", "Tables_clean_final.json")
OUTPUT_JSON = os.path.join(BASE_DIR, "extracted_tables", "Tables_clean_final_with_summary.json")
METADATA_JSON = os.path.join(BASE_DIR, "Documento_Nativo_results_gui.json")

# ─────────── Utility Functions ───────────
def load_document_language(path: str) -> str:
    """Load document language from metadata JSON."""
    try:
        if os.path.exists(path):
            with open(path, encoding="utf-8") as f:
                return json.load(f).get("language", "en").lower()
        return "en"
    except Exception as e:
        print(f"Error loading metadata {path}: {e}")
        return "en"

# Load document language
DOCUMENT_LANGUAGE = load_document_language(METADATA_JSON)

def query_openai(messages, temperature=0.2, max_tokens=800):
    """Query OpenAI API and return response."""
    try:
        response = CLIENT.chat.completions.create(
            model="provider-4/gpt-4.1",
            messages=messages,
            temperature=temperature,
            max_tokens=max_tokens
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        print(f"⚠️ OpenAI API error: {e}")
        return ""

def fallback_summary_english(table):
    """Generate a fallback English summary for a table."""
    content = table.get("table_content", [])
    if len(content) < 2 or not isinstance(content[0], list):
        return ""
    header, rows = content[0], content[1:]
    summary = [f"Table {table.get('table_number', 'N/A')}, titled '{table.get('table_title', 'Untitled')}', lists:"]
    for row in rows:
        summary.append(", ".join(f"'{row[i]}' for '{header[i]}'" for i in range(min(len(row), len(header)))) + ".")
    return " ".join(summary)

# ─────────── Summary Generation ───────────
def generate_native_summary(table, language):
    """Generate a detailed summary in the native language."""
    table_content = json.dumps(table["table_content"], ensure_ascii=False)
    prompt = (
        f"Write a detailed, single-paragraph summary of this table in {language.upper()}, "
        f"listing all column headers and values using full sentences and single quotes. "
        f"No bullet points.\n\nTable:\n{table_content}"
    )
    messages = [
        {"role": "system", "content": f"You summarize tables into technical prose in {language.upper()}."},
        {"role": "user", "content": prompt}
    ]
    return query_openai(messages)

def translate_to_english(text):
    """Translate a summary to English, preserving numbers and quoted values."""
    if not text:
        return ""
    prompt = (
        f"Translate the following technical paragraph into English. "
        f"Keep all numbers and quoted values exactly the same:\n\n{text}"
    )
    messages = [
        {"role": "system", "content": "You are a technical translator to English."},
        {"role": "user", "content": prompt}
    ]
    return query_openai(messages, temperature=0.0)

# ─────────── Main Execution ───────────
def generate_all_summaries():
    """Generate native and English summaries for all tables."""
    # Check if input JSON exists
    if not os.path.exists(INPUT_JSON):
        print(f"Error: Input JSON file not found: {INPUT_JSON}")
        return

    # Load tables
    try:
        with open(INPUT_JSON, encoding="utf-8") as f:
            tables = json.load(f)
    except Exception as e:
        print(f"Error loading JSON {INPUT_JSON}: {e}")
        return

    # Process each table
    for idx, table in enumerate(tables, 1):
        print(f"[{idx}/{len(tables)}] Processing {table.get('table_number', 'N/A')}…")

        # Generate native summary
        native_summary = generate_native_summary(table, DOCUMENT_LANGUAGE)
        if not native_summary:
            print("   ↪ Using fallback summary.")
            native_summary = fallback_summary_english(table)

        # Translate to English
        english_summary = translate_to_english(native_summary)

        # Update table with summaries
        table["table_summary"] = native_summary
        table["table_summary_en"] = english_summary

        # Rate limiting to avoid API overload
        time.sleep(2.5)

    # Save updated tables
    try:
        with open(OUTPUT_JSON, "w", encoding="utf-8") as f:
            json.dump(tables, f, indent=4, ensure_ascii=False)
        print(f"\n✅ Summaries written to: {OUTPUT_JSON}")
    except Exception as e:
        print(f"Error saving JSON {OUTPUT_JSON}: {e}")

if __name__ == "__main__":
    generate_all_summaries()

BERT-Based Multi-Class Text Classifier for Legal Document Annotation

In [None]:
# Following the annotation process in Ango Hub, a BERT-based neural network is trained to perform hierarchical classification of the text.

In [None]:
import json
import pandas as pd
import torch
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, TrainerCallback
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support
import numpy as np

# ─────────── Configuration ───────────
# Path to annotated JSON data
INPUT_JSON_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\FIREBIM_TEXT-task-export-2025-05-05-11_25_03_GMT.json"
MODEL_NAME = "roberta-base"

# ─────────── Data Loading ───────────
def load_data(path: str):
    """Load and parse annotated JSON data."""
    if not os.path.exists(path):
        raise FileNotFoundError(f"JSON file not found at {path}")
    try:
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
    except Exception as e:
        print(f"Error loading JSON {path}: {e}")
        return []
    records = []
    for item in data:
        task = item.get("task", {})
        tools = task.get("tools", [])
        for tool in tools:
            ner_data = tool.get("ner", {})
            text = ner_data.get("selection", "").strip()
            if not text:
                continue
            label = tool.get("title", "Trash")
            records.append({"text": text, "label": label})
    return records

# ─────────── Data Preparation ───────────
def prepare_dataframe(records):
    """Create DataFrame and map labels to IDs."""
    df = pd.DataFrame(records)
    labels_map = {
        "Trash": "Trash",
        "Chapter Number": "Chapter Number",
        "Chapter Title": "Chapter Title",
        "Article Number": "Article Number",
        "Article Title": "Article Title",
        "Item": "Item",
        "Sub Item": "Sub Item",
        "Sub Sub Item": "Sub Sub Item",
        "Annex Number": "Annex Number",
        "Annex Title": "Annex Title",
        "Title Title": "Title Title",
        "Ref Tables": "Ref Tables",
        "Title Number": "Title Number",
        "Section Number": "Section Number",
        "Section Title": "Section Title"
    }
    label_to_id = {label: i for i, label in enumerate(labels_map.keys())}
    df["label_id"] = df["label"].map(label_to_id)
    missing_labels = df["label_id"].isna().sum()
    if missing_labels > 0:
        print(f"Warning: {missing_labels} records have invalid labels. Assigning 'Trash'.")
        df["label_id"] = df["label_id"].fillna(label_to_id["Trash"])
    print("\nClass Distribution:")
    print(df["label"].value_counts())
    return df, label_to_id

def compute_class_weights(df, label_to_id):
    """Calculate class weights for imbalanced data."""
    class_counts = df["label_id"].value_counts().sort_index()
    total_samples = len(df)
    return torch.tensor(
        [total_samples / (len(class_counts) * count) for count in class_counts],
        dtype=torch.float
    )

# ─────────── Dataset Definition ───────────
class TextDataset(Dataset):
    """Custom Dataset for text classification."""
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt"
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "labels": torch.tensor(label, dtype=torch.long)
        }

# ─────────── Model and Trainer ───────────
class WeightedTrainer(Trainer):
    """Custom Trainer with class weights for imbalanced classes."""
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fn = torch.nn.CrossEntropyLoss(weight=self.class_weights.to(logits.device))
        loss = loss_fn(logits.view(-1, model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

def compute_metrics(pred):
    """Compute precision, recall, and F1 metrics."""
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    prec_weighted, rec_weighted, f1_weighted, _ = precision_recall_fscore_support(labels, preds, average="weighted")
    prec_macro, rec_macro, f1_macro, _ = precision_recall_fscore_support(labels, preds, average="macro")
    return {
        "precision_weighted": prec_weighted,
        "recall_weighted": rec_weighted,
        "f1_weighted": f1_weighted,
        "precision_macro": prec_macro,
        "recall_macro": rec_macro,
        "f1_macro": f1_macro
    }

class EarlyStoppingCallback(TrainerCallback):
    """Callback for early stopping based on validation F1 score."""
    def __init__(self, patience=3):
        self.patience = patience
        self.best_f1 = None
        self.patience_counter = 0

    def on_evaluate(self, args, state, control, metrics, **kwargs):
        f1 = metrics.get("eval_f1_macro")
        if f1 is None:
            return
        if self.best_f1 is None or f1 > self.best_f1:
            self.best_f1 = f1
            self.patience_counter = 0
        else:
            self.patience_counter += 1
            if self.patience_counter >= self.patience:
                print("\n⛔ Early stopping triggered!")
                control.should_training_stop = True

# ─────────── Main Execution ───────────
def main():
    """Main function: load data, train model, and save results."""
    # Disable Weights & Biases logging
    os.environ["WANDB_DISABLED"] = "true"

    # Load and prepare data
    records = load_data(INPUT_JSON_PATH)
    if not records:
        print("No valid records found.")
        return
    df, label_to_id = prepare_dataframe(records)
    class_weights = compute_class_weights(df, label_to_id)

    # Split data
    train_texts, val_texts, train_labels, val_labels = train_test_split(
        df["text"].tolist(),
        df["label_id"].tolist(),
        test_size=0.2,
        random_state=42,
        stratify=df["label_id"]
    )

    # Initialize tokenizer and datasets
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
    train_dataset = TextDataset(train_texts, train_labels, tokenizer)
    val_dataset = TextDataset(val_texts, val_labels, tokenizer)

    # Initialize model
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=len(label_to_id))
    for param in model.base_model.parameters():
        param.requires_grad = True
    model.config.attention_probs_dropout_prob = 0.2
    model.config.hidden_dropout_prob = 0.2

    # Configure training
    training_args = TrainingArguments(
        output_dir="./roberta_text_classifier",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        num_train_epochs=2,
        learning_rate=2e-5,
        warmup_steps=300,
        weight_decay=0.2,
        logging_steps=20,
        report_to="none",
        seed=42,
        load_best_model_at_end=True,
        metric_for_best_model="f1_macro",
        greater_is_better=True,
        save_total_limit=2,
        fp16=False,
        gradient_accumulation_steps=1
    )

    # Initialize trainer
    trainer = WeightedTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(patience=10)],
        class_weights=class_weights
    )

    # Train model
    print("\n🔴 Starting Training...\n")
    trainer.train()

    # Save model and tokenizer
    try:
        model.save_pretrained("./roberta_text_classifier")
        tokenizer.save_pretrained("./roberta_text_classifier")
        print("\n✅ Training completed and model saved!")
    except Exception as e:
        print(f"Error saving model: {e}")

if __name__ == "__main__":
    main()

BERT Model Testing on Word Document

In [None]:
# During the finalization of the fine-tuning process for the BERT model used for legal text classification, it was necessary to reorganize the generated files. Specifically, some critical files that were outside the checkpoint folder were moved into the corresponding checkpoint-* subfolder. These files are:

# pytorch_model.bin: Contains the trained model weights.
# config.json: Defines the model architecture and parameters.
# vocab.txt: Vocabulary used by the tokenizer.
# tokenizer_config.json: Tokenizer configuration.
# special_tokens_map.json: Mapping of special tokens (e.g., [CLS], [SEP]).

GUI-Based Legal Text Classification and JSON Structuring with Fine-Tuned BERT

In [None]:
import json
import tkinter as tk
from tkinter import ttk, messagebox
from datetime import datetime
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from docx import Document
import os

# ─────────── Configuration ───────────
# Paths for input document, output JSON, and trained model
INPUT_DOCX_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Documento_Nativo.docx"
OUTPUT_JSON_BASE = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output"
MODEL_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\bert_text_classifier_roberta\checkpoint-646"

# Mapping of label IDs to human-readable labels
ID_TO_LABEL = {
    0: "Trash",
    8: "Annex Number",
    9: "Annex Title",
    13: "Title Number",
    10: "Title Title",
    1: "Chapter Number",
    2: "Chapter Title",
    12: "Section Number",
    14: "Section Title",
    3: "Article Number",
    4: "Article Title",
    5: "Item",
    6: "Sub Item",
    7: "Sub Sub Item",
    11: "Ref Tables",
}

# ─────────── Helper Functions ───────────
def classify_text(text: str, tokenizer, model):
    """Classify a text segment using the trained model."""
    try:
        encoding = tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=512,
            return_tensors="pt"
        )
        with torch.no_grad():
            logits = model(**encoding).logits
        probs = F.softmax(logits, dim=1)
        conf, pred = probs.max(dim=1)
        return ID_TO_LABEL.get(pred.item(), "Unknown"), conf.item()
    except Exception as e:
        print(f"Error classifying text: {e}")
        return "Unknown", 0.0

def build_json_structure(lines):
    """Build a hierarchical JSON structure from classified lines."""
    data = []
    current = {
        "annex": None, "title": None, "chapter": None, "section": None,
        "article": None, "item": None, "sub_item": None, "sub_sub_item": None
    }
    for _, text, label, _ in lines:
        if label == "Trash":
            continue
        if label == "Annex Number":
            current["annex"] = {"type": "annex", "number": text, "title": "", "titles": []}
            data.append(current["annex"])
            current.update(title=None, chapter=None, section=None, article=None, item=None, sub_item=None, sub_sub_item=None)
        elif label == "Annex Title" and current["annex"]:
            current["annex"]["title"] = text
        elif label == "Title Number" and current["annex"]:
            current["title"] = {"type": "title", "number": text, "title": "", "chapters": [], "articles": []}
            current["annex"]["titles"].append(current["title"])
            current.update(chapter=None, section=None, article=None, item=None, sub_item=None, sub_sub_item=None)
        elif label == "Title Title" and current["title"]:
            current["title"]["title"] = text
        elif label == "Chapter Number" and current["title"]:
            current["chapter"] = {"type": "chapter", "number": text, "title": "", "sections": [], "articles": []}
            current["title"]["chapters"].append(current["chapter"])
            current.update(section=None, article=None, item=None, sub_item=None, sub_sub_item=None)
        elif label == "Chapter Title" and current["chapter"]:
            current["chapter"]["title"] = text
        elif label == "Section Number" and current["chapter"]:
            current["section"] = {"type": "section", "number": text, "title": "", "articles": []}
            current["chapter"]["sections"].append(current["section"])
            current.update(article=None, item=None, sub_item=None, sub_sub_item=None)
        elif label == "Section Title" and current["section"]:
            current["section"]["title"] = text
        elif label == "Article Number" and (current["section"] or current["chapter"] or current["title"]):
            current["article"] = {"type": "article", "number": text, "title": "", "items": []}
            if current["section"]:
                current["section"]["articles"].append(current["article"])
            elif current["chapter"]:
                current["chapter"]["articles"].append(current["article"])
            else:
                current["title"]["articles"].append(current["article"])
            current.update(item=None, sub_item=None, sub_sub_item=None)
        elif label == "Article Title" and current["article"]:
            current["article"]["title"] = text
        elif label == "Item" and current["article"]:
            current["item"] = {"type": "item", "text": text, "sub_items": []}
            current["article"]["items"].append(current["item"])
            current.update(sub_item=None, sub_sub_item=None)
        elif label == "Sub Item" and current["item"]:
            current["sub_item"] = {"type": "sub_item", "text": text, "sub_sub_items": []}
            current["item"]["sub_items"].append(current["sub_item"])
            current["sub_sub_item"] = None
        elif label == "Sub Sub Item" and current["sub_item"]:
            current["sub_sub_item"] = {"type": "sub_sub_item", "text": text}
            current["sub_item"]["sub_sub_items"].append(current["sub_sub_item"])
        elif label == "Ref Tables":
            target = (
                current["sub_sub_item"] or current["sub_item"] or current["item"] or
                current["article"] or current["chapter"] or current["title"] or current["annex"]
            )
            if target:
                target.setdefault("table_refs", []).append(text)
        else:
            target = (
                current["sub_sub_item"] or current["sub_item"] or current["item"] or
                (current["article"] if current["article"] and not current["article"]["title"] else None) or
                (current["section"] if current["section"] and not current["section"]["title"] else None) or
                (current["chapter"] if current["chapter"] and not current["chapter"]["title"] else None) or
                (current["title"] if current["title"] and not current["title"]["title"] else None) or
                (current["annex"] if current["annex"] and not current["annex"]["title"] else None)
            )
            if target:
                field = "text" if "text" in target else "title"
                target[field] += " " + text
    return data

# ─────────── GUI Class ───────────
class ClassificationApp(tk.Tk):
    """GUI for reviewing and editing text classifications."""
    def __init__(self, classified_lines):
        super().__init__()
        self.title("Text Classification Review")
        self.geometry("1300x720")
        self.combobox_vars = []
        self.checkbox_vars = []
        self.text_vars = []

        # Toolbar
        toolbar = tk.Frame(self, bd=1, relief="raised")
        toolbar.pack(side="top", fill="x")
        tk.Button(toolbar, text="Merge Selected Lines", command=self.merge_selected).pack(side="left", padx=6, pady=4)
        tk.Button(toolbar, text="Confirm and Save JSON", bg="#c3f5c3", command=self.save_json).pack(side="right", padx=6, pady=4)

        # Scrollable Area
        container = tk.Frame(self)
        container.pack(fill="both", expand=True)
        self.canvas = tk.Canvas(container, highlightthickness=0)
        self.canvas.pack(side="left", fill="both", expand=True)
        scrollbar = tk.Scrollbar(container, orient="vertical", command=self.canvas.yview)
        scrollbar.pack(side="right", fill="y")
        self.scrollable = tk.Frame(self.canvas)
        self.canvas.create_window((0, 0), window=self.scrollable, anchor="nw")
        self.canvas.configure(yscrollcommand=scrollbar.set)
        self.scrollable.bind("<Configure>", lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all")))
        self.canvas.bind_all("<MouseWheel>", lambda e: self.canvas.yview_scroll(int(-1 * (e.delta / 120)), "units"))

        # Header
        tk.Label(self.scrollable, text="✓", width=2).grid(row=0, column=0)
        tk.Label(self.scrollable, text="#", width=4).grid(row=0, column=1)
        tk.Label(self.scrollable, text="Text", width=80, anchor="w").grid(row=0, column=2)
        tk.Label(self.scrollable, text="Label", width=20).grid(row=0, column=3)
        tk.Label(self.scrollable, text="Confidence", width=10).grid(row=0, column=4)

        # Populate rows
        for row, (idx, text, label, conf) in enumerate(classified_lines, start=1):
            self._add_line(row, idx, text, label, conf)

    def _add_line(self, row, idx, text, label, conf):
        """Add a single classified line to the GUI."""
        var_sel = tk.BooleanVar()
        tk.Checkbutton(self.scrollable, variable=var_sel).grid(row=row, column=0, padx=2)
        self.checkbox_vars.append(var_sel)
        tk.Label(self.scrollable, text=f"{idx:03d}", width=4).grid(row=row, column=1)
        txt = tk.Text(self.scrollable, height=2, width=80, wrap="word")
        txt.insert("1.0", text)
        txt.grid(row=row, column=2, padx=5)
        self.text_vars.append(txt)
        var_lbl = tk.StringVar(value=label)
        ttk.Combobox(
            self.scrollable,
            textvariable=var_lbl,
            values=list(ID_TO_LABEL.values()),
            state="readonly",
            width=20
        ).grid(row=row, column=3, padx=5)
        self.combobox_vars.append((idx, var_lbl, conf))
        tk.Label(self.scrollable, text=f"{conf:.2f}").grid(row=row, column=4)

    def _get_current_lines(self):
        """Retrieve current lines from the GUI."""
        lines = []
        for (idx, lbl_var, conf), txt_widget in zip(self.combobox_vars, self.text_vars):
            text = txt_widget.get("1.0", "end-1c").strip()
            lines.append((idx, text, lbl_var.get(), conf))
        return lines

    def _rebuild_grid(self, lines):
        """Rebuild the GUI grid with updated lines."""
        for widget in self.scrollable.grid_slaves():
            if int(widget.grid_info()["row"]) != 0:
                widget.destroy()
        self.checkbox_vars.clear()
        self.text_vars.clear()
        self.combobox_vars.clear()
        for row, (idx, text, label, conf) in enumerate(lines, start=1):
            self._add_line(row, idx, text, label, conf)

    def merge_selected(self):
        """Merge selected lines into a single line."""
        selected = [i for i, var in enumerate(self.checkbox_vars) if var.get()]
        if len(selected) < 2:
            messagebox.showinfo("Merge Lines", "Select at least two lines to merge.")
            return
        merged_text = " ".join(self.text_vars[i].get("1.0", "end-1c").strip() for i in selected)
        first = selected[0]
        self.text_vars[first].delete("1.0", "end")
        self.text_vars[first].insert("1.0", merged_text)
        for i in sorted(selected[1:], reverse=True):
            del self.text_vars[i]
            del self.checkbox_vars[i]
            del self.combobox_vars[i]
        for var in self.checkbox_vars:
            var.set(False)
        self._rebuild_grid(self._get_current_lines())

    def save_json(self):
        """Save structured data to JSON and close the GUI."""
        try:
            lines = self._get_current_lines()
            structured_data = build_json_structure(lines)
            timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
            output_path = f"{OUTPUT_JSON_BASE}_{timestamp}.json"
            with open(output_path, "w", encoding="utf-8") as f:
                json.dump(structured_data, f, indent=4, ensure_ascii=False)
            messagebox.showinfo("Success", f"✅ JSON saved to:\n{output_path}")
            self.destroy()
        except Exception as e:
            messagebox.showerror("Error", f"Failed to save JSON: {e}")

# ─────────── Main Execution ───────────
def main():
    """Main function: classify document lines and launch GUI."""
    # Validate paths
    if not os.path.exists(INPUT_DOCX_PATH):
        print(f"Error: Input document not found at {INPUT_DOCX_PATH}")
        return
    if not os.path.exists(MODEL_PATH):
        print(f"Error: Model checkpoint not found at {MODEL_PATH}")
        return

    # Load tokenizer and model
    try:
        tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
        model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)
        model.eval()
    except Exception as e:
        print(f"Error loading model or tokenizer: {e}")
        return

    # Read Word document
    try:
        doc = Document(INPUT_DOCX_PATH)
        lines = [p.text.strip() for p in doc.paragraphs if p.text.strip()]
    except Exception as e:
        print(f"Error reading document: {e}")
        return

    # Classify lines
    classified_lines = []
    for idx, line in enumerate(lines, start=1):
        label, conf = classify_text(line, tokenizer, model)
        classified_lines.append((idx, line, label, conf))

    # Launch GUI
    try:
        ClassificationApp(classified_lines).mainloop()
    except Exception as e:
        print(f"Error launching GUI: {e}")

if __name__ == "__main__":
    main()

Enriching Structured JSON with Table Summaries

In [None]:
# Performs a cross-reference between the extracted tables in Tables.json and the references found in structured_output.json to enrich the structure with corresponding summaries.

In [None]:
import os
import json
import time
import re
from openai import OpenAI

# ─────────── Configuration ───────────
# OpenAI API settings
CLIENT = OpenAI(
    api_key="ddc-temp-free-e3b73cd814cc4f3ea79b5d4437912663",
    base_url="https://api.devsdocode.com/v1",
)
MODEL_NAME = "provider-4/gpt-4.1"

# File paths
STRUCTURED_OUTPUT_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output.json"
TABLES_DATA_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\extracted_tables\Tables_clean_final_with_summary.json"
OUTPUT_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_updated.json"

# ─────────── Utility Functions ───────────
def load_json(file_path):
    """Load JSON data from a file."""
    try:
        with open(file_path, "r", encoding="utf-8") as file:
            return json.load(file)
    except Exception as e:
        print(f"Error loading JSON {file_path}: {e}")
        return []

def save_json(data, file_path):
    """Save JSON data to a file."""
    try:
        with open(file_path, "w", encoding="utf-8") as file:
            json.dump(data, file, indent=4, ensure_ascii=False)
    except Exception as e:
        print(f"Error saving JSON {file_path}: {e}")

def extract_table_refs(structured_output):
    """Extract unique table references from structured data."""
    refs = []
    def recursive_search(struct):
        if isinstance(struct, list):
            for item in struct:
                recursive_search(item)
        elif isinstance(struct, dict):
            if "table_refs" in struct:
                refs.extend(struct["table_refs"])
            for key in struct:
                recursive_search(struct[key])
    recursive_search(structured_output)
    return list(set(refs))

# ─────────── Table Matching ───────────
def match_tables_with_openai(table_refs, table_data):
    """Match table references to table numbers using OpenAI."""
    if not table_refs:
        return {}
    prompt = f"""
You are an expert in document analysis. Your task is to correctly associate table references
(`table_refs`) from a structured document with actual table numbers (`table_number`)
extracted from the document.

Instructions:
- Match table numbers directly when possible (e.g., "TABLE I" ↔ "TABLE I").
- If the reference is descriptive (e.g., "Fire resistance classification"),
  match it with the most relevant table title (`table_title`).
- If multiple tables match a reference, return all of them.
- If no match is found, return "Unknown".

Table References to Match:
{json.dumps(table_refs, indent=4)}

Extracted Tables:
{json.dumps([{t['table_number']: t['table_title']} for t in table_data], indent=4)}

Expected JSON Output Format:
{{
    "matches": {{
        "TABLE REF 1": ["TABLE NUMBER 1"],
        "TABLE REF 2": ["TABLE NUMBER 2", "TABLE NUMBER 3"]
    }}
}}
"""
    max_retries = 5
    for attempt in range(max_retries):
        try:
            response = CLIENT.chat.completions.create(
                model=MODEL_NAME,
                messages=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt},
                ],
                temperature=0.0,
                max_tokens=2048,
            )
            response_text = response.choices[0].message.content.strip()
            print(f"\n🔍 OpenAI Response:\n{response_text}")
            cleaned_text = response_text.replace("```json", "").replace("```", "").strip()
            match = re.search(r'"matches"\s*:\s*({.*?})\s*(\n|$)', cleaned_text, re.DOTALL)
            if not match:
                print("⚠️ 'matches' block not found.")
                return {}
            matches_str = match.group(1)
            matches_str = re.sub(r',\s*}', '}', matches_str)
            matches_str = re.sub(r',\s*]', ']', matches_str)
            matches_str = re.sub(r'(?<=\n)([^\s"{][^:]+?):', r'"\1":', matches_str)
            full_json = '{"matches": ' + matches_str + '}'
            return json.loads(full_json).get("matches", {})
        except Exception as e:
            print(f"⚠️ Attempt {attempt + 1} failed: {e}")
            time.sleep(10)
    return {}

# ─────────── Update Summaries ───────────
def update_table_summaries(structured_output, table_matches, table_dict):
    """Update structured data with table summaries."""
    def recursive_update(struct):
        if isinstance(struct, list):
            for item in struct:
                recursive_update(item)
        elif isinstance(struct, dict):
            if "table_refs" in struct:
                summaries_native = []
                summaries_english = []
                for table_ref in struct["table_refs"]:
                    matched_tables = table_matches.get(table_ref, [])
                    for table_number in matched_tables:
                        if table_number.upper() in table_dict:
                            table_info = table_dict[table_number.upper()]
                            summaries_native.append(table_info.get("table_summary", ""))
                            summaries_english.append(table_info.get("table_summary_en", ""))
                            print(f"✅ Linked: {table_ref} ↔ {table_number}")
                if summaries_native:
                    struct["table_summary"] = " | ".join(summaries_native)
                if summaries_english:
                    struct["table_summary_en"] = " | ".join(summaries_english)
            for key in struct:
                recursive_update(struct[key])
    recursive_update(structured_output)

# ─────────── Main Execution ───────────
def main():
    """Main function: match table references and update summaries."""
    # Validate input files
    if not os.path.exists(STRUCTURED_OUTPUT_PATH):
        print(f"Error: Structured output JSON not found at {STRUCTURED_OUTPUT_PATH}")
        return
    if not os.path.exists(TABLES_DATA_PATH):
        print(f"Error: Tables data JSON not found at {TABLES_DATA_PATH}")
        return

    # Load data
    structured_output = load_json(STRUCTURED_OUTPUT_PATH)
    tables_data = load_json(TABLES_DATA_PATH)
    table_dict = {table["table_number"].upper(): table for table in tables_data}

    # Extract and match table references
    table_refs = extract_table_refs(structured_output)
    matches = match_tables_with_openai(table_refs, tables_data)

    # Update summaries
    update_table_summaries(structured_output, matches, table_dict)

    # Save updated JSON
    save_json(structured_output, OUTPUT_PATH)
    print

Keyword Extractor for Ontology: Batch Processing

In [None]:
# This script automatically extracts technical keywords from clauses within a structured JSON file. Using the API, sentences are processed in batches, and relevant terms are identified and directly associated with their original structures in the JSON. The result is semantic enrichment of the content, supporting ontology development or specialized document analysis.

In [None]:
import json
import os
import re
import time
from datetime import datetime
from typing import List, Dict, Any, Optional
from openai import OpenAI

# ============== CONFIGURATION ==============
API_KEY = "ddc-temp-free-e3b73cd814cc4f3ea79b5d4437912663"
BASE_URL = "https://api.devsdocode.com/v1"
MODEL_NAME = "provider-4/gpt-4.1"
REQUESTS_PER_MINUTE = 3
MIN_INTERVAL_SECONDS = 60.0 / REQUESTS_PER_MINUTE
MAX_RETRIES = 5
BATCH_SIZE = 1

INPUT_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_updated.json"
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)

# ============== JSON UTILITIES ==============

def extract_json_array(content: str) -> Optional[str]:
    start = content.find("[")
    end = content.rfind("]") + 1
    if start == -1 or end == 0 or end <= start:
        return None
    return content[start:end]

def strip_code_fences(raw: str) -> str:
    return re.sub(r"```[a-zA-Z0-9_]*\n?|```", "", raw).strip()

def fix_common_json_errors(text: str) -> str:
    text = text.replace("“", "\"").replace("”", "\"")
    text = re.sub(r',\s*}', '}', text)
    text = re.sub(r',\s*]', ']', text)
    text = re.sub(r'"([^"]*?)"\s*"([^"]*?)"', r'"\1", "\2"', text)  # merged strings
    return text

# ============== TRAVERSING STRUCTURE ==============

def _collect_items_recursive(obj: Dict[str, Any], parent_text: str, sink: List[Dict]):
    cur = obj.get("text", "").strip()
    combined = (parent_text + " " + cur).strip()

    for key in ("sub_item", "sub_items", "sub_sub_item", "sub_sub_items"):
        subs = obj.get(key, [])
        if subs:
            for sub in subs:
                _collect_items_recursive(sub, combined, sink)
            return

    if not obj.get("palavras_chave"):
        sink.append({"text": combined, "target": obj})

def collect_all_item_phrases(article_dict: Dict[str, Any], sink: List[Dict]):
    _collect_items_recursive(article_dict, "", sink)

def walk_articles(container: Dict[str, Any], sink: List[Dict]):
    for article in container.get("articles", []):
        for item in article.get("items", []):
            collect_all_item_phrases(item, sink)
    for key in ("chapters", "sections"):
        for sub in container.get(key, []):
            walk_articles(sub, sink)

# ============== PROMPT AND LLM CALL ==============

def build_messages(sentences: List[str]):
    system_message = (
        "You are an assistant specialized in extracting only technical terms "
        "related to fire safety in buildings and accessibility for an ontology.\n"
        "- For each sentence (can be in different languages), return relevant *technical keywords* "
        "in the same language.\n"
        "- DO NOT include generic words (e.g., 'and', 'the') or legal references.\n"
        "- DO NOT output text before or after the JSON.\n"
        "- Return strictly a JSON array of objects in this format:\n"
        "[{\"text\": \"original text\", \"keywords\": [\"term1\", \"term2\"]}]\n"
        "- Strings must use double quotes (\") and commas must be correctly placed."
    )

    user_lines = [f"{i+1}. {s}" for i, s in enumerate(sentences)]
    user_content = "Sentences:\n" + "\n".join(user_lines)

    return [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_content}
    ]

def call_llm(sentences: List[str]) -> Optional[List[Dict[str, Any]]]:
    messages = build_messages(sentences)
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            start = time.time()
            resp = client.chat.completions.create(
                model=MODEL_NAME,
                messages=messages,
                temperature=0.0,
            )
            print(f"LLM responded in {time.time() - start:.1f}s (t={datetime.now().time()})")
            raw = strip_code_fences(resp.choices[0].message.content)
            fixed = fix_common_json_errors(raw)
            array_str = extract_json_array(fixed)
            if array_str:
                return json.loads(array_str)
            print("⚠️ No JSON array extracted.")
        except Exception as e:
            print(f"⚠️ Error on attempt {attempt}/{MAX_RETRIES}: {e}")
            time.sleep(5 * attempt)
    return None

# ============== EXECUTION ==============

def main():
    with open(INPUT_PATH, "r", encoding="utf-8") as f:
        data: List[Dict[str, Any]] = json.load(f)

    all_phrases: List[Dict[str, Any]] = []
    for annex in data:
        for title in annex.get("titles", []):
            walk_articles(title, all_phrases)

    print(f"Total sentences to process: {len(all_phrases)}")
    index = 0
    total = len(all_phrases)
    last_call = 0.0

    while index < total:
        elapsed = time.time() - last_call
        wait = max(0, MIN_INTERVAL_SECONDS - elapsed)
        if wait > 0:
            print(f"Waiting {wait:.1f}s to respect rate limit...")
            time.sleep(wait)

        batch = all_phrases[index : index + BATCH_SIZE]
        texts = [b["text"] for b in batch]
        print(f"→ Batch {index//BATCH_SIZE + 1}: {len(texts)} sentences (#{index+1}–{index+len(texts)})")

        result = call_llm(texts)
        last_call = time.time()

        if result is None:
            print("❌ Permanent failure while processing batch.")
            break

        valid = min(len(result), len(batch))
        if valid < len(batch):
            print(f"⚠️ Only {valid}/{len(batch)} objects were returned.")

        for i in range(valid):
            batch[i]["target"]["palavras_chave"] = result[i].get("keywords", [])

        index += BATCH_SIZE
        print(f"✓ Progress: {index}/{total} ({index/total:.0%})\n")

    with open(INPUT_PATH, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

    print("✅ Extraction complete. File updated at:", INPUT_PATH)

if __name__ == "__main__":
    main()


Automated Definition Enrichment of Technical Keywords

In [None]:
# The script iterates through all the clauses in the structured JSON file, collects the previously extracted keywords, and, based on the legislation identified at the beginning of the methodology (from the results_gui.json file), uses the API to generate specific definitions for each term. These definitions are then directly associated with each keyword in the JSON.

In [None]:
import json
import time
import re
from openai import OpenAI
from requests.exceptions import RequestException

# ─────────── GPT Configuration ───────────
client = OpenAI(
    api_key="ddc-temp-free-e3b73cd814cc4f3ea79b5d4437912663",
    base_url="https://api.devsdocode.com/v1",
)

# ─────────── File Paths ───────────
JSON_METADATA_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Documento_Nativo_results_gui.json"
JSON_INPUT_PATH = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_updated.json"

# ─────────── Metadata Extraction ───────────
def extract_metadata_from_json(filepath):
    try:
        with open(filepath, "r", encoding="utf-8") as f:
            data = json.load(f)

        country = data.get("country", "Unknown Country")
        lang_code = data.get("language", "pt").lower()
        lang_map = {
            "pt": "Portuguese", "en": "English", "es": "Spanish", "fr": "French",
            "de": "German", "be": "Belgian", "it": "Italian", "nl": "Dutch",
            "da": "Danish", "lt": "Lithuanian"
        }
        lang = lang_map.get(lang_code, lang_code.capitalize())
        selected_norms = data.get("selected_norms", [])
        legislation = selected_norms[0]["name"] if selected_norms and "name" in selected_norms[0] else "Unknown Legislation"

        return country, lang, legislation

    except Exception as e:
        print(f"❌ Failed to load metadata: {e}")
        return "Unknown", "Portuguese", "Unknown Legislation"

# ─────────── Keyword Collection ───────────
def collect_keywords_from_items(items, seen, keywords):
    for item in items:
        for level in [item] + item.get("sub_items", []) + sum([s.get("sub_sub_items", []) for s in item.get("sub_items", [])], []):
            for kw in level.get("palavras_chave", []):
                if isinstance(kw, str) and kw not in seen:
                    seen.add(kw)
                    keywords.append(kw)

def collect_all_keywords(data):
    seen, keywords = set(), []
    for annex in data:
        for title in annex.get("titles", []):
            for article in title.get("articles", []): collect_keywords_from_items(article.get("items", []), seen, keywords)
            for chapter in title.get("chapters", []):
                for article in chapter.get("articles", []): collect_keywords_from_items(article.get("items", []), seen, keywords)
        for article in annex.get("articles", []): collect_keywords_from_items(article.get("items", []), seen, keywords)
    return keywords

# ─────────── GPT Definitions ───────────
def get_definitions_gpt(keywords, legislation, lang, retries=3, delay=25):
    prompt = f"""
You are an expert in fire safety legislation. For each technical keyword listed below, return a JSON object where:
- The key is the keyword.
- The value is an object with two keys:
  • "{lang}": a definition (max 100 words) starting with the capitalized keyword and 'é'.
  • "English": a definition starting with the translated keyword and 'is'.
Do not include code fences, extra text or comments. Only valid JSON.
Context: {legislation}
Keywords: {', '.join(keywords)}
""".strip()

    for attempt in range(retries):
        try:
            response = client.chat.completions.create(
                model="provider-4/gpt-4.1",
                messages=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt},
                ],
                temperature=0.0,
                max_tokens=1024,
            )
            text = response.choices[0].message.content.strip()
            text = re.sub(r"```json|```", "", text).strip()
            return json.loads(text)
        except Exception as e:
            print(f"⚠️ Attempt {attempt+1} failed: {e}")
            time.sleep(delay)

    return {kw: {lang: f"{kw} é indefinido", "English": f"{kw} is undefined"} for kw in keywords}

# ─────────── Insert Definitions ───────────
def insert_definitions(data, defs, lang):
    def ins(items):
        for item in items:
            item["palavras_chave"] = {kw: defs.get(kw, {lang: f"{kw} é indefinido", "English": f"{kw} is undefined"}) for kw in item.get("palavras_chave", [])}
            for sub in item.get("sub_items", []):
                sub["palavras_chave"] = {kw: defs.get(kw, {lang: f"{kw} é indefinido", "English": f"{kw} is undefined"}) for kw in sub.get("palavras_chave", [])}
                for subsub in sub.get("sub_sub_items", []):
                    subsub["palavras_chave"] = {kw: defs.get(kw, {lang: f"{kw} é indefinido", "English": f"{kw} is undefined"}) for kw in subsub.get("palavras_chave", [])}

    for annex in data:
        for title in annex.get("titles", []):
            for article in title.get("articles", []): ins(article.get("items", []))
            for chapter in title.get("chapters", []):
                for article in chapter.get("articles", []): ins(article.get("items", []))
        for article in annex.get("articles", []): ins(article.get("items", []))

# ─────────── Main Execution ───────────
# Main execution: reads file, queries GPT, starts GUI
def main():
    country, lang, legislation = extract_metadata_from_json(JSON_METADATA_PATH)
    print(f"🌍 Country: {country} | Language: {lang} | Legislation: {legislation}")

    with open(JSON_INPUT_PATH, "r", encoding="utf-8") as f:
        data = json.load(f)

    keywords = collect_all_keywords(data)
    print(f"🔍 Total unique keywords: {len(keywords)}")

    defs = {}
    batch_size = 6
    for i in range(0, len(keywords), batch_size):
        batch = keywords[i:i+batch_size]
        print(f"➡️  Processing batch {i//batch_size+1} with {len(batch)} keywords")
        defs.update(get_definitions_gpt(batch, legislation, lang))
        if i + batch_size < len(keywords):
            time.sleep(60)  # respect rate limit

    insert_definitions(data, defs, lang)
    data.append({"extracted_metadata": {"country": country, "language": lang, "legislation": legislation}})

    with open(JSON_INPUT_PATH, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, ensure_ascii=False)
    print("✅ File updated with definitions and metadata.")

if __name__ == "__main__":
# Main execution: reads file, queries GPT, starts GUI
    main()

Assignment of Identifiers to Each Clause

In [None]:
# This script processes a structured JSON file and assigns unique hierarchical identifiers (id, id_T) to each element, including annexes, titles, chapters, articles, and paragraphs.

In [None]:
import json
import re

###############################
#     FILES AND CONFIG        #
###############################
JSON_INPUT_FILE = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_updated.json"
JSON_OUTPUT_FILE = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_prepared.json"

COUNTRY_MAP = {
    "Portugal": "PT",
    "Belgium": "BE",
    "Netherlands": "NL",
    "Denmark": "DK",
    "Lithuania": "LT",
}

###############################
# Helper Functions
###############################
def short_title_id(full_str: str) -> str:
    text = full_str.strip()
    m = re.match(r'^(Título|Capítulo)\s+([IVXLCDM]+)$', text, re.IGNORECASE)
    if m:
        word = m.group(1).lower()
        roman = m.group(2).upper()
        return "Tit" + roman if word.startswith("título") else "Cap" + roman
    return re.sub(r"\s+", "", text)

def detect_country_code(json_data):
    default_code = "PT"
    if isinstance(json_data, list) and json_data:
        last_item = json_data[-1]
        if isinstance(last_item, dict) and "extracted_metadata" in last_item:
            c = last_item["extracted_metadata"].get("country", "").strip()
            return COUNTRY_MAP.get(c, default_code)
    return default_code

def create_annex_id(index, cc):
    return f"{cc}-Ane{index}"

def create_article_id(parent_id, article_str):
    match = re.search(r'(\d+)', article_str)
    num = match.group(1) if match else article_str.replace(" ", "")
    return f"{parent_id}_Art{num}"

def extract_letter(text):
    m = re.match(r'^\s*([a-zA-Z])\)', text)
    return m.group(1).lower() if m else None

def extract_roman_lowercase(text):
    m = re.match(r'^\s*((?:i+|v|x+))\)', text.strip(), re.IGNORECASE)
    return m.group(1).lower() if m else None

###############################
# parse_items (com id e id_T)
###############################
def parse_items(items, parent_id, level=0):
    for i, item in enumerate(items, start=1):
        text_str = item.get("text", "").strip()

        # Definir o tipo de identificador
        if level == 0:
            m = re.match(r'^(\d+)', text_str)
            item_num = m.group(1) if m else str(i)
            local_id = f"Ite{item_num}"
        elif level == 1:
            letter = extract_letter(text_str)
            local_id = f"Sub{letter or i}"
        elif level == 2:
            roman = extract_roman_lowercase(text_str)
            local_id = f"SSub{roman or i}"
        else:
            local_id = f"Ite{i}"

        item_id = f"{parent_id}_{local_id}"
        item["id"] = item_id

        # Campos padrão obrigatórios
        item.setdefault("type", "item" if level == 0 else "sub_item" if level == 1 else "sub_sub_item")
        item.setdefault("id_T", [])

        # Processar table_refs e gerar id_T
        if "table_refs" in item and item["table_refs"]:
            id_t_list = []
            for table_ref in item["table_refs"]:
                match = re.search(r'QUADRO\s+([IVXLCDM]+)', table_ref, re.IGNORECASE)
                if match:
                    table_id = f"Tab{match.group(1).upper()}"
                    id_t_list.append(f"{item_id}_{table_id}")
            item["id_T"] = id_t_list

        # Processar subníveis
        if "sub_items" in item:
            item.setdefault("sub_items", [])
            parse_items(item["sub_items"], item_id, level + 1)

        if "sub_sub_items" in item:
            item.setdefault("sub_sub_items", [])
            parse_items(item["sub_sub_items"], item_id, level + 2)

    return items

###############################
# parse_articles
###############################
def parse_articles(articles, parent_id):
    for article in articles:
        art_id = create_article_id(parent_id, article["number"])
        article["id"] = art_id
        article.setdefault("type", "article")
        if "items" in article:
            parse_items(article["items"], art_id, 0)

###############################
# parse_titles
###############################
def parse_titles(titles, annex_id):
    for title in titles:
        short_id = short_title_id(title["number"])
        title_id = f"{annex_id}_{short_id}"
        title["id"] = title_id
        title.setdefault("type", "title")

        if "chapters" in title:
            for chapter in title["chapters"]:
                ch_short = short_title_id(chapter["number"])
                chap_id = f"{title_id}_{ch_short}"
                chapter["id"] = chap_id
                chapter.setdefault("type", "chapter")
                chapter.setdefault("sections", [])
                if "articles" in chapter:
                    parse_articles(chapter["articles"], chap_id)

        if "articles" in title:
            parse_articles(title["articles"], title_id)

###################
# process_json
###################
def process_json(json_data):
    cc = detect_country_code(json_data)
    annex_index = 0
    for annex in json_data:
        if "extracted_metadata" in annex:
            continue
        annex_id = create_annex_id(annex_index, cc)
        annex["id"] = annex_id
        annex.setdefault("type", "annex")
        annex_index += 1

        if "titles" in annex:
            parse_titles(annex["titles"], annex_id)

        if "articles" in annex:
            parse_articles(annex["articles"], annex_id)

    return json_data

###################
# MAIN
###################
if __name__ == "__main__":
    with open(JSON_INPUT_FILE, "r", encoding="utf-8") as f:
        data = json.load(f)

    processed = process_json(data)

    with open(JSON_OUTPUT_FILE, "w", encoding="utf-8") as f:
        json.dump(processed, f, ensure_ascii=False, indent=4)

    print(f"✅ JSON processed and saved to '{JSON_OUTPUT_FILE}'.")


Extract EN keywords

In [None]:
import json
import re
import os

# Mapeamento país → idioma
COUNTRY_LANG_MAP = {
    "Portugal": "pt",
    "Belgium": "nl",
    "Netherlands": "nl",
    "Denmark": "da",
    "Lithuania": "lt"
}

def extract_keyword_from_definition(english_def: str) -> str:
    """Extrai a keyword antes de 'is' ou 'are', limpando parênteses."""
    match = re.match(r'^([\w\s\-/&()]+?)\s+(is|are)\b', english_def)
    if match:
        raw_kw = match.group(1).strip()
        cleaned_kw = re.sub(r'\s*\([^)]*\)', '', raw_kw)
        return cleaned_kw.strip().lower()
    return english_def.split()[0].lower()  # fallback

def process_node(node, lang_tag="pt", remove_palavras_chave=False):
    if "palavras_chave" in node:
        keywords_main = []
        keywords_en = []
        definitions_main = []
        definitions_en = []

        for main_kw, entry in node["palavras_chave"].items():
            def_main = ""
            if lang_tag == "pt":
                def_main = entry.get("Portuguese", "")
            elif lang_tag == "nl":
                def_main = entry.get("Dutch", "")
            elif lang_tag == "da":
                def_main = entry.get("Danish", "")
            elif lang_tag == "lt":
                def_main = entry.get("Lithuanian", "")
            def_en = entry.get("English", "").strip()

            if def_main:
                keywords_main.append(main_kw.strip())
                definitions_main.append(def_main.strip())
            if def_en:
                keywords_en.append(extract_keyword_from_definition(def_en))
                definitions_en.append(def_en.strip())

        if keywords_main:
            node[f"keywords_{lang_tag}"] = keywords_main
            node[f"definitions_{lang_tag}"] = definitions_main
        if keywords_en:
            node["keywords_en"] = keywords_en
            node["definitions_en"] = definitions_en

        if remove_palavras_chave:
            del node["palavras_chave"]

    # Recursivamente processa os subnós
    for key in ["titles", "chapters", "articles", "items", "sub_items", "sub_sub_items", "sections", "paragraphs"]:
        if key in node:
            for child in node[key]:
                process_node(child, lang_tag, remove_palavras_chave)

def run(input_path: str, output_path: str, metadata_path: str, remove_original=False):
    # Verificar se os ficheiros existem
    for path in [input_path, metadata_path]:
        if not os.path.exists(path):
            print(f"❌ Ficheiro não encontrado: {path}")
            return

    # Obter país e idioma a partir do ficheiro de metadados
    with open(metadata_path, "r", encoding="utf-8") as f:
        metadata = json.load(f)
        country = metadata.get("country", "Portugal")
        lang_tag = COUNTRY_LANG_MAP.get(country, "pt")  # padrão: pt

    # Carregar o ficheiro principal
    with open(input_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    for annex in data:
        process_node(annex, lang_tag=lang_tag, remove_palavras_chave=remove_original)

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

    print(f"✅ JSON atualizado salvo em: {output_path} (idioma: {lang_tag})")

# Caminhos de ficheiro
input_file = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_prepared.json"
output_file = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_Protege.json"
metadata_file = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Documento_Nativo_results_gui.json"

if __name__ == "__main__":
    run(input_file, output_file, metadata_file, remove_original=True)


Generation of OWL Individuals from Structured JSON for Ontology Integration

In [None]:
# This script processes a structured JSON file containing legal clauses and converts each element—such as annexes, titles, chapters, articles, and paragraphs—into OWL individuals in Turtle (.ttl) format. It assigns unique identifiers to each node (:hasID), extracts either the original text or description (:hasOriginalText or :hasDescription), and establishes hierarchical relationships among elements (e.g., :ArticleOf, :hasChpater). It also includes annotations with keywords and their definitions (:hasKeyword, :hasDefiniton). Finally, all generated individuals are appended to an existing base .ttl file.

In [None]:
import json
import os

# ─────────── Global Variables ───────────
# Dictionary to store hasTable relations
TABLE_RELATIONS = {}

# Dictionary to store all individuals before generating the TTL
INDIVIDUALS = {}

# ─────────── Language Mapping ───────────
COUNTRY_LANG_MAP= {
    "Portugal": "pt",
    "Belgium": "nl",      # ajuste para "fr" ou "de" se preferir
    "Dennmark": "da",
    "Netherlands": "nl",
    "Lithuania": "lt"
}

lang_tag = "und"  # Default value


# ─────────── Helper Functions ───────────
def extract_ancestors_from_id(node_id):
    """Extract all ancestor IDs from a node ID."""
    parts = node_id.split('_')
    ancestors = []
    current_id = ""
    for i, part in enumerate(parts):
        if i > 0:
            current_id += "_"
        current_id += part
        ancestors.append(current_id)
    return ancestors[:-1]  # Exclude the node itself

def sanitize_text(text):
    """Sanitize text for TTL output by removing invalid characters."""
    if not isinstance(text, str):
        text = str(text)
    text = text.replace("\n", " ").replace("\r", " ").replace('"""', '"').replace('"', "'")
    return text.strip()

def collect_all_ids_in_hierarchy(node):
    """Recursively collect IDs of all hierarchical elements."""
    result = {
        "titles": [],
        "chapters": [],
        "articles": [],
        "items": [],
        "sub_items": [],
        "sub_sub_items": [],
        "sections": [],
        "paragraphs": [],
        "annexes": []
    }

    # Process titles
    if "titles" in node:
        for title in node["titles"]:
            result["titles"].append(title["id"])
            child_info = collect_all_ids_in_hierarchy(title)
            for key in result:
                result[key].extend(child_info[key])

    # Process chapters
    if "chapters" in node:
        for chapter in node["chapters"]:
            result["chapters"].append(chapter["id"])
            child_info = collect_all_ids_in_hierarchy(chapter)
            for key in result:
                result[key].extend(child_info[key])

    # Process articles
    if "articles" in node:
        for article in node["articles"]:
            result["articles"].append(article["id"])
            child_info = collect_all_ids_in_hierarchy(article)
            for key in result:
                result[key].extend(child_info[key])

    # Process items
    if "items" in node:
        for item in node["items"]:
            result["items"].append(item["id"])
            child_info = collect_all_ids_in_hierarchy(item)
            for key in result:
                result[key].extend(child_info[key])

    # Process sub-items
    if "sub_items" in node:
        for sub_item in node["sub_items"]:
            result["sub_items"].append(sub_item["id"])
            child_info = collect_all_ids_in_hierarchy(sub_item)
            for key in result:
                result[key].extend(child_info[key])

    # Process sub-sub-items
    if "sub_sub_items" in node:
        for sub_sub_item in node["sub_sub_items"]:
            result["sub_sub_items"].append(sub_sub_item["id"])
            child_info = collect_all_ids_in_hierarchy(sub_sub_item)
            for key in result:
                result[key].extend(child_info[key])

    # Process sections
    if "sections" in node:
        for section in node["sections"]:
            result["sections"].append(section["id"])
            child_info = collect_all_ids_in_hierarchy(section)
            for key in result:
                result[key].extend(child_info[key])

    # Process paragraphs
    if "paragraphs" in node:
        for paragraph in node["paragraphs"]:
            result["paragraphs"].append(paragraph["id"])
            child_info = collect_all_ids_in_hierarchy(paragraph)
            for key in result:
                result[key].extend(child_info[key])

    # Process annex
    if "type" in node and node["type"] == "annex":
        result["annexes"].append(node["id"])

    return result

def generate_individual_ttl(node, node_class, country, ancestors=None):
    """Generate TTL for an individual node."""
    if ancestors is None:
        ancestors = {}

    lines = []
    node_id = node["id"]
    print(f"Processing node: {node_id} ({node_class})")

    # Define the individual as owl:NamedIndividual
    lines.append(f":{node_id} a owl:NamedIndividual , :{node_class} ;")
    lines.append(f'    :hasID "{node_id}" ;')

    # Add data properties
    if node_class in ["Item", "SubItem", "SubSubItem"]:
        original_text = sanitize_text(node.get("text", ""))
        lines.append(f'    :hasOriginalText "{original_text}" ;')
    else:
        description = sanitize_text(node.get("title", ""))
        lines.append(f'    :hasDescription "{description}" ;')

    lines.append(f'    :hasCountry "{country}" ;')
    designator = node.get("type", "")
    lines.append(f'    :hasDesignator "{designator}"')

    lines_to_add = []

    # Add hierarchical ancestry relations
    parent_id = extract_ancestors_from_id(node_id)[-1] if extract_ancestors_from_id(node_id) else None
    if parent_id:
        if node_class == "Title":
            lines_to_add.append(f":TitleOf :{parent_id}")
        elif node_class == "Chapter":
            lines_to_add.append(f":ChapterOf :{parent_id}")
        elif node_class == "Article":
            lines_to_add.append(f":ArticleOf :{parent_id}")
        elif node_class == "Item":
            lines_to_add.append(f":ItemOf :{parent_id}")
        elif node_class == "SubItem":
            lines_to_add.append(f":SubItemOf :{parent_id}")
        elif node_class == "SubSubItem":
            lines_to_add.append(f":SubSubItemOf :{parent_id}")

    # Add hierarchical relations
    hierarchy_info = collect_all_ids_in_hierarchy(node)
    if hierarchy_info["titles"]:
        titles_list = ", ".join(f":{tid}" for tid in hierarchy_info["titles"])
        lines_to_add.append(f":hasTitle {titles_list}")
    if hierarchy_info["chapters"]:
        chapters_list = ", ".join(f":{cid}" for cid in hierarchy_info["chapters"])
        lines_to_add.append(f":hasChapter {chapters_list}")
    if hierarchy_info["articles"]:
        articles_list = ", ".join(f":{aid}" for aid in hierarchy_info["articles"])
        lines_to_add.append(f":hasArticle {articles_list}")
    if hierarchy_info["items"]:
        items_list = ", ".join(f":{item_id}" for item_id in hierarchy_info["items"])
        lines_to_add.append(f":hasItem {items_list}")
    if hierarchy_info["sub_items"]:
        subitems_list = ", ".join(f":{subid}" for subid in hierarchy_info["sub_items"])
        lines_to_add.append(f":hasSubItem {subitems_list}")
    if hierarchy_info["sub_sub_items"]:
        subsubitems_list = ", ".join(f":{subsubid}" for subsubid in hierarchy_info["sub_sub_items"])
        lines_to_add.append(f":hasSubSubItem {subsubitems_list}")
    if "id_S" in node and node["id_S"]:
        sections_list = ", ".join(f":{sid}" for sid in node["id_S"])
        lines_to_add.append(f":hasSection {sections_list}")
    if "id_P" in node and node["id_P"]:
        paragraphs_list = ", ".join(f":{pid}" for pid in node["id_P"])
        lines_to_add.append(f":hasParagraph {paragraphs_list}")

    # Maintain hasTable relations from id_T
    if "id_T" in node and node["id_T"]:
        tables_list = ", ".join(f":{tid}" for tid in node["id_T"])
        lines_to_add.append(f":hasTable {tables_list}")
        print(f"Adding original hasTable for {node_id}: {tables_list}")

    # ✅ Add keywords and definitions from the new structure
    if "keywords_pt" in node:
        for kw in node["keywords_pt"]:
            lines_to_add.append(f':hasKeyword "{sanitize_text(kw)}"@{lang_tag}')
    if "keywords_en" in node:
        for kw in node["keywords_en"]:
            lines_to_add.append(f':hasKeyword "{sanitize_text(kw)}"@en')
    if "definitions_pt" in node:
        for definition in node["definitions_pt"]:
            lines_to_add.append(f':hasDefinition "{sanitize_text(definition)}"@{lang_tag}')
    if "definitions_en" in node:
        for definition in node["definitions_en"]:
            lines_to_add.append(f':hasDefinition "{sanitize_text(definition)}"@en')

    # Store lines in individuals dictionary
    INDIVIDUALS[node_id] = {"lines": lines, "lines_to_add": lines_to_add}
    return lines


def generate_table_ttl(table_id, table_ref, table_summary, country, ancestors=None):
    """Generate TTL for a table individual."""
    if ancestors is None:
        ancestors = {}

    print(f"Generating table: {table_id}")
    table_summary = sanitize_text(table_summary)
    lines = []
    lines.append(f":{table_id} a owl:NamedIndividual , :Table ;")
    lines.append(f'    :hasID "{table_id}" ;')
    lines.append(f'    :hasCountry "{country}" ;')
    lines.append(f'    :hasDesignator "Table" ;')
    lines.append(f'    :hasOriginalText "{table_summary}"')

    # Extract ancestors from table ID
    ancestor_ids = extract_ancestors_from_id(table_id)
    print(f"Ancestors extracted for {table_id}: {ancestor_ids}")

    # Add table to ancestors in TABLE_RELATIONS
    for ancestor_id in ancestor_ids:
        if ancestor_id not in TABLE_RELATIONS:
            TABLE_RELATIONS[ancestor_id] = []
        if table_id not in TABLE_RELATIONS[ancestor_id]:
            TABLE_RELATIONS[ancestor_id].append(table_id)
            print(f"Adding hasTable for {ancestor_id}: {table_id}")

    lines_to_add = []
    if "annex" in ancestors:
        lines_to_add.append(f":AnnexOf :{ancestors['annex']}")
    if "title" in ancestors:
        lines_to_add.append(f":TitleOf :{ancestors['title']}")
    if "chapter" in ancestors:
        lines_to_add.append(f":ChapterOf :{ancestors['chapter']}")
    if "article" in ancestors:
        lines_to_add.append(f":ArticleOf :{ancestors['article']}")
    if "item" in ancestors:
        lines_to_add.append(f":ItemOf :{ancestors['item']}")
    if "sub_item" in ancestors:
        lines_to_add.append(f":SubItemOf :{ancestors['sub_item']}")
    if "sub_sub_item" in ancestors:
        lines_to_add.append(f":SubSubItemOf :{ancestors['sub_sub_item']}")

    INDIVIDUALS[table_id] = {"lines": lines, "lines_to_add": lines_to_add}
    return lines

# ─────────── Process Hierarchy ───────────
def process_annex(node, country):
    """Process an annex and its children."""
    lines = []
    annex_id = node["id"]
    ancestors = {"annex": annex_id}
    lines.extend(generate_individual_ttl(node, "Annex", country, ancestors))

    for title in node.get("titles", []):
        lines.extend(process_title(title, country, ancestors))
    return lines

def process_title(node, country, ancestors=None):
    """Process a title and its children."""
    if ancestors is None:
        ancestors = {}
    lines = []
    title_id = node["id"]
    new_ancestors = ancestors.copy()
    new_ancestors["title"] = title_id
    lines.extend(generate_individual_ttl(node, "Title", country, new_ancestors))

    for chapter in node.get("chapters", []):
        lines.extend(process_chapter(chapter, country, new_ancestors))
    for article in node.get("articles", []):
        lines.extend(process_article(article, country, new_ancestors))
    return lines

def process_chapter(node, country, ancestors=None):
    """Process a chapter and its children."""
    if ancestors is None:
        ancestors = {}
    lines = []
    chapter_id = node["id"]
    new_ancestors = ancestors.copy()
    new_ancestors["chapter"] = chapter_id
    lines.extend(generate_individual_ttl(node, "Chapter", country, new_ancestors))

    for article in node.get("articles", []):
        lines.extend(process_article(article, country, new_ancestors))
    return lines

def process_article(node, country, ancestors=None):
    """Process an article and its children."""
    if ancestors is None:
        ancestors = {}
    lines = []
    article_id = node["id"]
    new_ancestors = ancestors.copy()
    new_ancestors["article"] = article_id
    lines.extend(generate_individual_ttl(node, "Article", country, new_ancestors))

    for item in node.get("items", []):
        lines.extend(process_item(item, country, new_ancestors))
    return lines

def process_item(node, country, ancestors=None):
    """Process an item and its children."""
    if ancestors is None:
        ancestors = {}
    lines = []
    item_id = node["id"]
    new_ancestors = ancestors.copy()
    new_ancestors["item"] = item_id
    lines.extend(generate_individual_ttl(node, "Item", country, new_ancestors))

    if "id_T" in node and node["id_T"]:
        for i, table_id in enumerate(node["id_T"]):
            table_ref = node.get("table_refs", [f"TABLE {i+1}"])[i] if "table_refs" in node else f"Table for {table_id}"
            table_summary = node.get("table_summary", "No summary available")
            lines.extend(generate_table_ttl(table_id, table_ref, table_summary, country, new_ancestors))

    for sub_item in node.get("sub_items", []):
        lines.extend(process_subitem(sub_item, country, new_ancestors))
    return lines

def process_subitem(node, country, ancestors=None):
    """Process a sub-item and its children."""
    if ancestors is None:
        ancestors = {}
    lines = []
    sub_item_id = node["id"]
    new_ancestors = ancestors.copy()
    new_ancestors["sub_item"] = sub_item_id
    lines.extend(generate_individual_ttl(node, "SubItem", country, new_ancestors))

    if "id_T" in node and node["id_T"]:
        for i, table_id in enumerate(node["id_T"]):
            table_ref = node.get("table_refs", [f"TABLE {i+1}"])[i] if "table_refs" in node else f"Table for {table_id}"
            table_summary = node.get("table_summary", "No summary available")
            lines.extend(generate_table_ttl(table_id, table_ref, table_summary, country, new_ancestors))

    for sub_sub_item in node.get("sub_sub_items", []):
        lines.extend(process_subsubitem(sub_sub_item, country, new_ancestors))
    return lines

def process_subsubitem(node, country, ancestors=None):
    """Process a sub-sub-item and its children."""
    if ancestors is None:
        ancestors = {}
    lines = []
    sub_sub_item_id = node["id"]
    new_ancestors = ancestors.copy()
    new_ancestors["sub_sub_item"] = sub_sub_item_id
    lines.extend(generate_individual_ttl(node, "SubSubItem", country, new_ancestors))

    if "id_T" in node and node["id_T"]:
        for i, table_id in enumerate(node["id_T"]):
            table_ref = node.get("table_refs", [f"TABLE {i+1}"])[i] if "table_refs" in node else f"Table for {table_id}"
            table_summary = node.get("table_summary", "No summary available")
            lines.extend(generate_table_ttl(table_id, table_ref, table_summary, country, new_ancestors))
    return lines

# ─────────── Finalize Individuals ───────────
def finalize_individuals():
    """Finalize TTL lines for all individuals."""
    all_ttl_lines = []
    for node_id, data in INDIVIDUALS.items():
        lines = data["lines"]
        lines_to_add = data["lines_to_add"].copy()

        # Add hasTable relations from TABLE_RELATIONS
        if node_id in TABLE_RELATIONS:
            existing_tables = set()
            for line in lines_to_add:
                if line.startswith(":hasTable"):
                    existing_tables.update(line.split(":hasTable ")[1].split(", "))
            new_tables = [f":{tid}" for tid in TABLE_RELATIONS[node_id] if f":{tid}" not in existing_tables]
            if new_tables:
                tables_list = ", ".join(new_tables)
                lines_to_add.append(f":hasTable {tables_list}")
                print(f"Finalizing {node_id} with additional hasTable: {tables_list}")

        # Format lines with proper TTL syntax
        if lines_to_add:
            lines[-1] += ";"
            for i, triple_part in enumerate(lines_to_add):
                if i < len(lines_to_add) - 1:
                    lines.append(f"    {triple_part} ;")
                else:
                    lines.append(f"    {triple_part} .\n")
        else:
            lines[-1] += ".\n"

        all_ttl_lines.extend(lines)
    return all_ttl_lines

# ─────────── Main Execution ───────────
def main():
    """Main function: read JSON, generate TTL individuals, and append to file."""
    # File paths
    metadata_json_path = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Documento_Nativo_results_gui.json"
    json_path = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\structured_output_Protege.json"
    ttl_base_path = r"C:\Users\DEC_User\Desktop\FIREBIM\rase_llm_project\data\Ontologia\FRO-25022025_PT.ttl"

    # Check if files exist
    if not os.path.exists(metadata_json_path):
        print(f"Error: Metadata JSON file {metadata_json_path} not found.")
        return
    if not os.path.exists(json_path):
        print(f"Error: JSON file {json_path} not found.")
        return
    if not os.path.exists(ttl_base_path):
        print(f"Error: Base TTL file {ttl_base_path} not found.")
        return

    # Load country from metadata JSON
    with open(metadata_json_path, "r", encoding="utf-8") as f:
        metadata = json.load(f)
        global lang_tag
        country = metadata.get("country", "Unknown")
        lang_tag = COUNTRY_LANG_MAP.get(country, "und")


    # Load JSON data
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    # Reset global variables
    global TABLE_RELATIONS, INDIVIDUALS
    TABLE_RELATIONS = {}
    INDIVIDUALS = {}

    # Process annexes
    for annex in data:
        if "extracted_metadata" in annex:
            continue
        process_annex(annex, country)

    # Finalize and write TTL
    all_ttl_lines = finalize_individuals()

    with open(ttl_base_path, "a", encoding="utf-8") as out_file:
        out_file.write("\n\n########################################################\n")
        out_file.write("# Individuals generated automatically from JSON\n")
        out_file.write("########################################################\n\n")
        for line in all_ttl_lines:
            out_file.write(line)

    print(f"✅ Individuals successfully appended to {ttl_base_path}!")

if __name__ == "__main__":
    main()
