### Extract end page until which TOC is present

In [None]:

import pdfplumber
from typing import Optional
from langchain.chat_models import AzureChatOpenAI

def find_table_of_contents_end_page(pdf_path: str, chat_model: AzureChatOpenAI, batch_size: int = 10, max_pages: int = 50) -> int:
    """
    Detects the last page number where the Table of Contents (TOC) appears in a PDF.

    Args:
        pdf_path (str): Path to the PDF file.
        chat_model (AzureChatOpenAI): LangChain AzureChatOpenAI instance.
        batch_size (int): Number of pages to process in each LLM call.
        max_pages (int): Maximum number of pages to scan (default 50).
        
    Returns:
        int: Last page number where the Table of Contents is found.
    """
    with pdfplumber.open(pdf_path) as pdf:
        total_pages = len(pdf.pages)
        pages_to_check = min(total_pages, max_pages)

        for start in range(1, pages_to_check + 1, batch_size):
            end = min(start + batch_size - 1, pages_to_check)
            batch_pages = []
            for i in range(start - 1, end):
                text = pdf.pages[i].extract_text() or ""
                batch_pages.append(f"\n--- PAGE {i + 1} ---\n{text}")

            combined_text = "\n".join(batch_pages)

            prompt = f"""
SYSTEM:
You are an expert in document structure analysis.

INSTRUCTION:
You are given a batch of pages from the start of a PDF document. Analyze and determine up to which page the Table of Contents (TOC) continues.

RULES:
- Identify if any of the pages contain Table of Contents using clues like: 
  * headings such as "Contents", "Table of Contents"
  * dot leaders (e.g., "1.1 Overview .......... 5")
  * chapter or section listings with page numbers

RETURN FORMAT:
- Return only the page number **after which** the Table of Contents ends.
- If TOC is not present in this batch, return "NONE".
- If TOC continues beyond this batch, return "CONTINUES".

PDF PAGES CONTENT:
{combined_text}
"""

            result = chat_model.invoke(prompt).content.strip()

            if result.isdigit():
                return int(result)
            elif result.upper() == "NONE":
                return max(1, start - 1)
            elif result.upper() == "CONTINUES":
                continue

        # Fallback if TOC end isn't confidently found
        return 3


chat_model = AzureChatOpenAI(
    openai_api_key="13p7qJwQxxNSbetSXlCoBpNQoJBIIMY35fUIQrdZ7ji7weqpM6K2JQQJ99BFACHYHv6XJ3w3AAAAACOGQaS7", 
    openai_api_base="https://param-mc26b6rc-eastus2.cognitiveservices.azure.com/",
    openai_api_version="2025-01-01-preview",
    deployment_name="o4-mini",
    temperature=1
)

pdf_path = "ia.pdf"
toc_end_page = find_table_of_contents_end_page(pdf_path, chat_model)
print(f"📘 Table of Contents ends at page: {toc_end_page}")

### Extract sections from TOC which contains controles

In [None]:

import pdfplumber
from typing import List, Dict
from langchain.chat_models import AzureChatOpenAI

def extract_control_sections_from_toc(
    pdf_path: str,
    toc_end_page: int,
    chat_model: AzureChatOpenAI
) -> List[Dict[str, str]]:
    """
    Extracts sections from the Table of Contents that contain actual controls,
    with section name, start page, and end page.

    Args:
        pdf_path (str): Path to the regulatory PDF file.
        toc_end_page (int): Page number where the Table of Contents ends.
        chat_model (AzureChatOpenAI): Azure OpenAI LLM instance.

    Returns:
        List[Dict]: Each dictionary has 'section_name', 'start_page', 'end_page'.
    """

    with pdfplumber.open(pdf_path) as pdf:
        toc_text = []
        for i in range(toc_end_page):
            text = pdf.pages[i].extract_text() or ""
            toc_text.append(f"\n--- PAGE {i + 1} ---\n{text}")

        combined_toc = "\n".join(toc_text)

    prompt = f"""
SYSTEM:
You are a cybersecurity compliance analyst and expert in regulatory frameworks like ISO 27001, NIST 800-53, and UAE IA Regulation.
You are analyzing the Table of Contents (TOC) from a regulatory document to identify which sections are likely to contain security or compliance **controls**.

INSTRUCTION:
Based on the TOC content provided, extract the list of sections or subsections that are likely to contain actual regulatory controls.

RULES:
1. Look for keywords like "Controls", "Security Requirements", "Information Assurance", "Risk Treatment", 
"Security Measures", "Technical Controls", "Management Controls".
2. A section may contain controls directly or indirectly via child sections — include both if applicable.
3. For each identified section, extract:
   - `section_name`: Full name of the section or sub-section
   - `start_page`: The page number the section starts from (as per TOC)
   - `end_page`: The page number it ends on (i.e., just one page before the very next listed section).
4. end_page of all the section_names is always the start_page of very next section_name. Do never deviate from this.
5. If multiple subsections contain controls, include each as a separate entry.
6. If a single large section contains all controls, return just one dictionary entry. 
7. If no sections contain controls, return an empty list.
8. Keep end page od any sections as start page of the very next sections or subsection.
9. Always give section and subsections in the output. 
10. Ensure that the output dictionary includes both sections and all of their corresponding subsections. No control-related section or subsection should be omitted.
11. Between the identified start and end pages, do not skip or exclude any sections or subsections. All intermediate sections and subsections must be included in the output.
12. Only include sections or subsections that actually define controls. Exclude any content that is only introductory, explanatory, summarizing, or meta in nature — such as summaries, mappings, methodologies, or overviews. 
13. Do not include sections or subsections which will not have actual controls like structure or how controles are made.

FORMAT:
Return only a **JSON list** of dictionaries with this structure:
[
  {{
    "section_name": "Security Controls",
    "start_page": 33,
    "end_page": 89
  }},
  {{
    "section_name": "Annex A: ISO 27001 Controls",
    "start_page": 90,
    "end_page": 112
  }}
]

EXAMPLES:
If the TOC says:
- "Chapter 4: Information Security Controls ............25"
- "Chapter 5: Physical Security Controls .............. 40"
- "Annex A: Control Catalogue ......................... 60"
You might output:
[
  {{
    "section_name": "Chapter 4: Information Security Controls",
    "start_page": 25,
    "end_page": 39
  }},
  {{
    "section_name": "Chapter 5: Physical Security Controls",
    "start_page": 40,
    "end_page": 59
  }},
  {{
    "section_name": "Annex A: Control Catalogue",
    "start_page": 60,
    "end_page": 80
  }}
]

Most important rule: 
1. end_page of all the section_names is always the start_page of very next section_name. Do never deviate from this.
2. Do not include sections or subsections which will not have actual controls like structure or how controles are made.
TOC CONTENT:
{combined_toc}
"""

    result = chat_model.invoke(prompt).content.strip()

    # Try parsing it as list of dictionaries
    import json
    try:
        control_sections = json.loads(result)
        if isinstance(control_sections, list):
            return control_sections
        else:
            raise ValueError("Expected list output from LLM")
    except json.JSONDecodeError:
        print("⚠️ Failed to parse LLM output as JSON.")
        print("Raw Output:", result)
        return []


pdf_path = "ia.pdf"
toc_end_page = 3  # Replace with actual detected value

sections = extract_control_sections_from_toc(pdf_path, toc_end_page, chat_model)

for section in sections:
    print(f"📘 {section['section_name']} | Pages {section['start_page']} to {section['end_page']}")


### If the pages are less then directly extract the controles csv (For <50 pages)

In [None]:
import pdfplumber
from langchain.chat_models import AzureChatOpenAI
from langchain.prompts import PromptTemplate

# -----------------------------
# Step 1: Extract Text from PDF in Plain Format
# -----------------------------
def extract_pdf_content_plaintext(pdf_path: str, start_page: int, end_page: int) -> str:
    pages = []
    with pdfplumber.open(pdf_path) as pdf:
        total = len(pdf.pages)
        if start_page < 1 or end_page > total or start_page > end_page:
            raise ValueError(f"Invalid page range: 1 <= start <= end <= {total}")

        for idx in range(start_page - 1, end_page):
            page = pdf.pages[idx]
            text = page.extract_text() or ""

            # Format plain text content
            page_text = f"Page: {idx + 1}\nContent:\n{text.strip()}"

            # Add tables
            tables_text = ""
            for table in page.extract_tables():
                if table:
                    tables_text += "\nTable:\n"
                    for row in table:
                        tables_text += " | ".join(cell.strip() if cell else "" for cell in row) + "\n"
            page_text += f"\n{tables_text.strip()}"

            pages.append(page_text.strip())

    return "\n\n---\n\n".join(pages)

# -----------------------------
# Step 2: Build PromptTemplate
# -----------------------------
PROMPT_TEMPLATE = """SYSTEM:
You are a PDF parsing and compliance‑extraction assistant specialized in cybersecurity standards. 
Your task is to read the full text of one or more PDF standards documents (e.g. UAE IA Regulation, ISO 27001, 
NIST CSF, NISA Guidelines) and output a structured list of all controls, preserving their hierarchical numbering 
and labels.

RULES:

1. Identify each **Domain** (sometimes labeled “Section”, “Clause”, or “Category”) by its numbering and 
title (e.g. “1.0 Information Security Management” or “Annex A – Security Controls”).
2. Within each Domain, identify each **Sub‑Domain** (sometimes called “Sub‑Clause”, “Control Family”, or “Area”) 
by its numbering and title (e.g. “1.1 Risk Assessment” or “A.5 Access Control”).
3. For every control under a Sub‑Domain, extract:

   * **control\_number**: the exact hierarchical number (e.g. “1.1.2”, “A.5.1”).
   * **control\_title**: the short control name or label.
   * **description**: the full descriptive text of the control.
4. Domains and Sub‑Domains may span pages. Continue assigning controls to the most recent Domain/Sub‑Domain until a 
new one appears.
5. Different documents may use different labels (e.g. “Domain” vs. “Section”, “Sub‑Domain” vs. “Clause”), but **the 
numbering hierarchy is the single source of truth.** Always use the numeric/order prefixes to determine hierarchy.
6. Output a **single JSON array** of objects, each with these keys:
   • domain
   • sub\_domain
   • control\_number
   • control\_title
   • description
7. Do not output any additional text—only the JSON array. If a Domain or Sub‑Domain has no explicit title but only 
a number, use the number as the name (e.g. `"sub_domain": "A.6"`).
8. Remember you need to consider entire use content and create the json array by considering the entire document.
USER CONTENT:
{content}
"""

prompt = PromptTemplate(
    input_variables=["content"],
    template=PROMPT_TEMPLATE
)

# -----------------------------
# Step 3: Set Up Azure OpenAI Chat Model
# -----------------------------
chat_model = AzureChatOpenAI(
    openai_api_key="13p7qJwQxxNSbetSXlCoBpNQoJBIIMY35fUIQrdZ7ji7weqpM6K2JQQJ99BFACHYHv6XJ3w3AAAAACOGQaS7", 
    openai_api_base="https://param-mc26b6rc-eastus2.cognitiveservices.azure.com/",
    openai_api_version="2025-01-01-preview",
    deployment_name="o4-mini",
    temperature=1
)

# -----------------------------
# Step 4: Run the Extraction
# -----------------------------
if __name__ == "__main__":
    pdf_path = "iso.pdf"
    start_page = 1
    end_page = 26 # Adjust as needed

    content = extract_pdf_content_plaintext(pdf_path, start_page, end_page)
    formatted_prompt = prompt.format(content=content)
    response = chat_model.invoke(formatted_prompt)

    print(response.content)

    import json
    import csv

    # Step 1: Extract JSON string from AIMessage
    json_str = response.content

    # Step 2: Convert string to list of dictionaries
    try:
        control_data = json.loads(json_str)
    except json.JSONDecodeError as e:
        raise ValueError("The model response is not valid JSON. Check formatting.") from e

    # Step 3: Write to CSV
    output_file = "ec.csv"
    with open(output_file, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["domain", "sub_domain", "control_number", "control_title", "description"])
        writer.writeheader()
        writer.writerows(control_data)

    print(f"✅ CSV saved to {output_file}")



### If pages are more than 50 pages then we use same prompt but we pass chunks from the complete content

In [None]:
import pdfplumber
import json
import csv
from typing import List, Dict
from langchain.chat_models import AzureChatOpenAI
from langchain.prompts import PromptTemplate

# ----------------------------- CONFIG
OUTPUT_FILE = "extracted_controls_v5.csv"


# ----------------------------- CONTROL EXTRACTION PROMPT
PROMPT_TEMPLATE = """SYSTEM:
You are a cybersecurity compliance parser extracting controls from regulatory documents (like UAE IA, ISO 27001, etc).

OBJECTIVE:
Parse the provided document chunk and extract any **security controls** with their full hierarchy.

RULES:
1. Identify each Domain and Sub‑Domain using numbering (e.g. “1.0”, “A.5”).
2. Under each Sub‑Domain, extract:
   - control_number: e.g. “A.5.1.2”
   - control_title: short heading
   - description: full text

3. If no controls are present in this content, return an **empty JSON array**: []

4. Output format (strict):
[
  {{
    "domain": "A.5: Planning & Testing",
    "sub_domain": "A.5.1: Context and the leadership",
    "control_number": "A.5.1.1",
    "control_title": "Access Control Policy",
    "description": "Establish and review access control policies. this should contain controles and sub controles clubbed together"
  }}
  ...
]

REMEMBER:
- Use section numbering hierarchy to infer structure.
- Do not include any explanation, headers, or notes—return JSON array only.
- If you dont find domain, subdomain, controles number, control title and description then do not create the output for that content.

USER CONTENT:
{content}
"""

prompt = PromptTemplate(
    input_variables=["content"],
    template=PROMPT_TEMPLATE
)

# ----------------------------- EXTRACT TEXT + TABLES
def extract_pdf_chunk_content(pdf_path: str, start_page: int, end_page: int) -> str:
    pages = []
    with pdfplumber.open(pdf_path) as pdf:
        for idx in range(start_page - 2, end_page):
            page = pdf.pages[idx]
            text = page.extract_text() or ""
            page_text = f"Page {idx + 1}:\n{text.strip()}"

            tables_text = ""
            for table in page.extract_tables():
                if table:
                    tables_text += "\nTable:\n"
                    for row in table:
                        tables_text += " | ".join(cell.strip() if cell else "" for cell in row) + "\n"

            pages.append(page_text + "\n" + tables_text.strip())
    return "\n\n---\n\n".join(pages)

# ----------------------------- PROCESS EACH SECTION
def process_and_append_section(
    pdf_path: str,
    section: Dict[str, int],
    chat_model: AzureChatOpenAI,
    output_csv_path: str
):
    section_text = extract_pdf_chunk_content(pdf_path, section['start_page'], section['end_page'])
    formatted_prompt = prompt.format(content=section_text)
    response = chat_model.invoke(formatted_prompt).content.strip()
    time.sleep(100)

    try:
        parsed_json = json.loads(response)
        if not parsed_json:
            print(f"⛔ No controls found in section: {section['section_name']}")
            return
    except json.JSONDecodeError:
        print(f"⚠️ JSON parsing failed for section: {section['section_name']}")
        print(response)
        return

    print(f"✅ Found {len(parsed_json)} controls in section: {section['section_name']}")

    # Append to CSV
    with open(output_csv_path, "a", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["domain", "sub_domain", "control_number", "control_title", "description"])
        if f.tell() == 0:
            writer.writeheader()
        writer.writerows(parsed_json)

# ----------------------------- MAIN DRIVER
def extract_controls_from_sections(pdf_path: str, sections: List[Dict], chat_model: AzureChatOpenAI):
    for section in sections:
        process_and_append_section(pdf_path, section, chat_model, OUTPUT_FILE)

# ----------------------------- USAGE EXAMPLE
if __name__ == "__main__":
    pdf_path = "ia.pdf"

    # Example TOC output from previous step
    control_sections = ps_sections

    extract_controls_from_sections(pdf_path, control_sections, chat_model)
    print(f"\n📄 Final output written to {OUTPUT_FILE}")


### Final code for controles similarity using vector db since controles are more

import pandas as pd
import numpy as np
import faiss
import json
from tqdm import tqdm
from langchain.chat_models import AzureChatOpenAI
from langchain.prompts import PromptTemplate
from openai import AzureOpenAI

# -----------------------------
# CONFIGURATION
# -----------------------------
PRIMARY_CSV = "extracted_controls_v2.csv"
SECONDARY_CSV = "extracted_controls.csv"
OUTPUT_CSV = "uae_vs_iso_mapped_rag.csv"

PRIMARY_PREFIX = "UAE_IA"
SECONDARY_PREFIX = "ISO"
TOP_K = 5

# -----------------------------
# LangChain Chat Model (Azure)
# -----------------------------
chat_model = AzureChatOpenAI(
    openai_api_key="13p7qJwQxxNSbetSXlCoBpNQoJBIIMY35fUIQrdZ7ji7weqpM6K2JQQJ99BFACHYHv6XJ3w3AAAAACOGQaS7", 
    openai_api_base="https://param-mc26b6rc-eastus2.cognitiveservices.azure.com/",
    openai_api_version="2025-01-01-preview",
    deployment_name="o4-mini",
    temperature=1
)

# -----------------------------
# Embedding Setup
# -----------------------------
embedding_client = AzureOpenAI(
    api_key="13p7qJwQxxNSbetSXlCoBpNQoJBIIMY35fUIQrdZ7ji7weqpM6K2JQQJ99BFACHYHv6XJ3w3AAAAACOGQaS7",
    api_version="2024-05-01-preview",
    azure_endpoint="https://param-mc26b6rc-eastus2.cognitiveservices.azure.com/openai/deployments/text-embedding-ada-002/embeddings?api-version=2023-05-15"
)

EMBEDDING_DEPLOYMENT = "text-embedding-ada-002"

def get_embedding(text: str) -> list:
    response = embedding_client.embeddings.create(
        input=[text],
        model=EMBEDDING_DEPLOYMENT
    )
    return response.data[0].embedding

# -----------------------------
# Prompt Template
# -----------------------------
MATCHING_PROMPT_TEMPLATE = """You are an expert in cybersecurity compliance mappings.

Your task is to determine whether the following two regulatory controls are semantically aligned — meaning they have the same intent, coverage, or enforcement scope.

Please read both controls carefully and answer only with:
Yes — if they are aligned
No — if they are not

Primary Control:
Domain: {primary_domain}
Sub-domain: {primary_sub_domain}
Number: {primary_control_number}
Title: {primary_control_title}
Description: {primary_description}

Secondary Control:
Domain: {secondary_domain}
Sub-domain: {secondary_sub_domain}
Number: {secondary_control_number}
Title: {secondary_control_title}
Description: {secondary_description}

Are these controls aligned? Respond with Yes or No. Do not give any explaination or any placeholders.


PRIMARY CONTROL:
Domain: {primary_domain}
Sub-domain: {primary_sub_domain}
Control Number: {primary_control_number}
Title: {primary_control_title}
Description: {primary_description}

SECONDARY CONTROL:
Domain: {secondary_domain}
Sub-domain: {secondary_sub_domain}
Control Number: {secondary_control_number}
Title: {secondary_control_title}
Description: {secondary_description}
"""

match_prompt = PromptTemplate(
    input_variables=[
        "primary_domain", "primary_sub_domain", "primary_control_number",
        "primary_control_title", "primary_description",
        "secondary_domain", "secondary_sub_domain", "secondary_control_number",
        "secondary_control_title", "secondary_description"
    ],
    template=MATCHING_PROMPT_TEMPLATE
)

# -----------------------------
# Helper Functions
# -----------------------------
def load_controls(path: str) -> pd.DataFrame:
    return pd.read_csv(path).fillna("")

def control_to_text(row) -> str:
    return f"{row['domain']} {row['sub_domain']} {row['control_number']} {row['control_title']} {row['description']}"

def build_faiss_index(controls: pd.DataFrame):
    vectors = [get_embedding(control_to_text(row)) for _, row in tqdm(controls.iterrows(), total=len(controls), desc="🔍 Embedding Secondary")]
    dim = len(vectors[0])
    index = faiss.IndexFlatL2(dim)
    index.add(np.array(vectors).astype("float32"))
    return index, vectors

# -----------------------------
# Matching Logic
# -----------------------------
def map_controls(primary_df, secondary_df, top_k=TOP_K):
    secondary_index, _ = build_faiss_index(secondary_df)
    used_secondary = set()
    output_rows = []

    for i, primary in tqdm(primary_df.iterrows(), total=len(primary_df), desc="🧠 Matching"):
        primary_vec = np.array([get_embedding(control_to_text(primary))], dtype="float32")
        distances, indices = secondary_index.search(primary_vec, top_k)

        matched = False
        for idx in indices[0]:
            if idx in used_secondary:
                continue

            candidate = secondary_df.iloc[idx]
            formatted_prompt = match_prompt.format(
                primary_domain=primary["domain"],
                primary_sub_domain=primary["sub_domain"],
                primary_control_number=primary["control_number"],
                primary_control_title=primary["control_title"],
                primary_description=primary["description"],
                secondary_domain=candidate["domain"],
                secondary_sub_domain=candidate["sub_domain"],
                secondary_control_number=candidate["control_number"],
                secondary_control_title=candidate["control_title"],
                secondary_description=candidate["description"]
            )

            response = chat_model.invoke(formatted_prompt).content.strip().lower()

            if "yes" in response:
                matched = True
                used_secondary.add(idx)
                output_rows.append({
                    f"{PRIMARY_PREFIX}_domain": primary["domain"],
                    f"{PRIMARY_PREFIX}_sub_domain": primary["sub_domain"],
                    f"{PRIMARY_PREFIX}_control_number": primary["control_number"],
                    f"{PRIMARY_PREFIX}_control_title": primary["control_title"],
                    f"{PRIMARY_PREFIX}_description": primary["description"],
                    f"{SECONDARY_PREFIX}_domain": candidate["domain"],
                    f"{SECONDARY_PREFIX}_sub_domain": candidate["sub_domain"],
                    f"{SECONDARY_PREFIX}_control_number": candidate["control_number"],
                    f"{SECONDARY_PREFIX}_control_title": candidate["control_title"],
                    f"{SECONDARY_PREFIX}_description": candidate["description"]
                })
                break

        if not matched:
            output_rows.append({
                f"{PRIMARY_PREFIX}_domain": primary["domain"],
                f"{PRIMARY_PREFIX}_sub_domain": primary["sub_domain"],
                f"{PRIMARY_PREFIX}_control_number": primary["control_number"],
                f"{PRIMARY_PREFIX}_control_title": primary["control_title"],
                f"{PRIMARY_PREFIX}_description": primary["description"],
                f"{SECONDARY_PREFIX}_domain": "",
                f"{SECONDARY_PREFIX}_sub_domain": "",
                f"{SECONDARY_PREFIX}_control_number": "",
                f"{SECONDARY_PREFIX}_control_title": "",
                f"{SECONDARY_PREFIX}_description": ""
            })

    for j, row in secondary_df.iterrows():
        if j not in used_secondary:
            output_rows.append({
                f"{PRIMARY_PREFIX}_domain": "",
                f"{PRIMARY_PREFIX}_sub_domain": "",
                f"{PRIMARY_PREFIX}_control_number": "",
                f"{PRIMARY_PREFIX}_control_title": "",
                f"{PRIMARY_PREFIX}_description": "",
                f"{SECONDARY_PREFIX}_domain": row["domain"],
                f"{SECONDARY_PREFIX}_sub_domain": row["sub_domain"],
                f"{SECONDARY_PREFIX}_control_number": row["control_number"],
                f"{SECONDARY_PREFIX}_control_title": row["control_title"],
                f"{SECONDARY_PREFIX}_description": row["description"]
            })

    return output_rows

# -----------------------------
# Main Execution
# -----------------------------
if __name__ == "__main__":
    primary_df = load_controls(PRIMARY_CSV)
    secondary_df = load_controls(SECONDARY_CSV)

    mapped_rows = map_controls(primary_df, secondary_df)
    output_df = pd.DataFrame(mapped_rows)
    output_df.to_csv(OUTPUT_CSV, index=False)
    print(f"\n✅ Mapping completed and saved to: {OUTPUT_CSV}")
