In [40]:
!pip install -q pdfplumber

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


In [6]:
# pdf_path = "/content/drive/MyDrive/pdf_processing/ISO_27001.pdf"
pdf_path = "uploads/test_file_1.pdf"
# pdf_path = "/content/drive/MyDrive/pdf_processing/test_file_2_short.pdf"

In [7]:
import pypdf
import pdfplumber

In [8]:
reader = pypdf.PdfReader(pdf_path)

In [9]:
text_page = []
with pdfplumber.open(pdf_path) as pdf:
    for page in pdf.pages[-20:]:
        normalize_text(text_page.append(page.extract_text(layout=True,x_tolerance=3)))

NameError: name 'normalize_text' is not defined

In [56]:
print(normalize_text(text_page[-20]))

NIST SP 800-53, REV. 5     SECURITY AND PRIVACY CONTROLS FOR INFORMATION SYSTEMS AND ORGANIZATIONS
_________________________________________________________________________________________________
This
publication
is
available
free
of
charge
from:
TABLE C-10: MEDIA PROTECTION FAMILY
CONTROL         CONTROL NAME           IMPLEMENTED ASSURANCE
NUMBER
CONTROL ENHANCEMENT NAME   BY
MP-1   Policy and Procedures               O         âˆš
MP-2   Media Access                        O
MP-2(1) AUTOMATED RESTRICTED ACCESS    W: Incorporated into MP-4(2).
MP-2(2) CRYPTOGRAPHIC PROTECTION       W: Incorporated into SC-28(1).
MP-3   Media Marking                       O
MP-4   Media Storage                       O
MP-4(1) CRYPTOGRAPHIC PROTECTION       W: Incorporated into SC-28(1).
MP-4(2) AUTOMATED RESTRICTED ACCESS        O
MP-5   Media Transport                     O
MP-5(1) PROTECTION OUTSIDE OF CONTROLLED AREAS W: Incorporated into MP-5.
MP-5(2) DOCUMENTATION OF ACTIVITIES    W: Incorporate

In [21]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig,pipeline

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

In [8]:
# model_id = "meta-llama/Meta-Llama-3-8B-Instruct" # not using this as it requires login in hugging_face
model_id = "Qwen/Qwen2.5-7B-Instruct"

In [9]:
tokenizer = AutoTokenizer.from_pretrained(model_id)

In [10]:
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True
)

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [11]:
import re
def normalize_text(raw_text):

    # checking for empty string incase pdf has some
    if not raw_text:
        return ""

    # splitting lines
    lines = raw_text.split('\n')
    cleaned_lines = []

    # Generic noise patterns found the pdf[nist and iso]
    # defining the list because incase new patterns need to be added based on the pdf file.
    # These are safe to remove from ANY document.
    # We include a length check (len < 30) to ensure we don't accidentally
    # delete a real control that happens to contain the word "Page".
    noise_patterns = [
        r"^Page\s+\d+$",               # Matches "Page 1"
        r"^Page\s+\d+\s+of\s+\d+$",    # Matches "Page 1 of 10"
        r"^\d+\s+of\s+\d+$",           # Matches "1 of 10"
        r"^https?://",                 # URL artifacts often in footers
        r"^www\.",                     # Web links
        r"^\(c\)\s+\d{4}",             # Copyright markers like "(c) 2023"
        r"^Copyright",                 # Copyright word
        r'\bAppendix\s+[A-Z]+\s+Page\s+\d+\b'
    ]

    for line in lines:
        line = line.strip()

        # Skip empty lines
        if not line:
            continue

        # Check if line is noise
        is_noise = False
        # Only check short lines to be safe. If a line is 100 chars long,
        # it's likely content, even if it has "Page" in it.
    
        for pattern in noise_patterns:
            if re.search(pattern, line, re.IGNORECASE):
                is_noise = True
                break

        if not is_noise:
            cleaned_lines.append(line)

    # Merging all the lines into one text
    # We join with '\n' to preserve the structure.
    # The LLM needs to see the newlines to understand the layout.
    return '\n'.join(cleaned_lines)

In [12]:

text_page = []

for page in reader.pages:
    page_cleaned = normalize_text(page.extract_text())
    text_page.append(page_cleaned)

In [25]:
def clean_and_validate_json(json):
    indices_to_remove = []
    seen_ids = set()
    for idx,control in enumerate(json):
        empty_string_type = ['','none',' ',None]
        if control['control_id'].lower() not in seen_ids:
            seen_ids.add(control['control_id'].lower())
        else:
            indices_to_remove.append(idx)
            continue
        if len(control['control_desc'])<3:
            indices_to_remove.append(idx)
            continue
        try:
            if control['control_id'].lower() in empty_string_type or control['control_title'].lower() in empty_string_type or control['control_desc'].lower() in empty_string_type:
                indices_to_remove.append(idx)
                continue
        except:
            # if it fails to find any one of the column in the json
            indices_to_remove.append(idx)
    json = [element for index, element in enumerate(json) if index not in indices_to_remove]
    return json

In [74]:
# Cell 5: Define the Extraction Logic

def extract_controls_from_page(page_text):
    # 1. The System Prompt (The Brains)
    system_prompt = """
    You are an expert Compliance Auditor specialized in ISO, NIST, and Regulatory frameworks.

    Task: Extract individual controls or regulatory requirements from the provided text. Ignore all boilerplate, headers, and irrelevant legal prose.

    Control ID Identification Logic:
    Identify control IDs using a pattern-matching approach for:
    1. Annex A style (e.g., A.5.1, A.12.1.1)
    2. NIST style (e.g., AC-1, PM-10)
    3. Legislative/Section style (e.g., Sec. 302, Sec. 404)

    Rules:
    - If controls are found, return ONLY a raw JSON list of objects.
    - Fields:
        - "control_id": The exact identifier (e.g., "Sec. 404", "A.5.1", or "AC-1").
        - "control_title": A concise title (5-10 words) as stated in the text.
        - "control_desc": The full description or specific requirement associated with that ID.
    - If NO controls are found, output strictly: []
    - Strict Formatting: No markdown, no "```json" blocks, no preamble. Just the raw JSON.
    """

    # 2. Structure the Chat
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"Analyze this text:\n\n{page_text}"}
    ]

    # 3. Prepare Inputs
    input_ids = tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        return_tensors="pt"
    ).to(model.device)

    # 4. Generate
    outputs = model.generate(
        input_ids,
        max_new_tokens=4096,      # Limit output size to save time
        do_sample=False,         
        temperature=.1,          #so the model halluinates less
    )

    # 5. Decode
    response = tokenizer.decode(outputs[0][input_ids.shape[-1]:], skip_special_tokens=True)
    return response.strip()

# Main Extraction Loop
- the extraction loop gives the whole page chunk to the LLM, and LLM then returns the json output in a snippet

In [75]:
import json
import time

all_extracted_data = []
responses = []

total_start_time = time.time()  # Start global timer
print("Starting extraction...")

for i, page_text in enumerate(text_page):
    print(f"--- Processing Page {i+1} ---")


    page_start_time = time.time()


    raw_response = extract_controls_from_page(page_text)


    clean_json_string = raw_response.replace("```json", "").replace("```", "").strip()

    page_end_time = time.time()
    elapsed_time = page_end_time - page_start_time
    print(f"   > Time taken: {elapsed_time:.2f} seconds")
    responses.append(clean_json_string)

    try:

        if not clean_json_string or clean_json_string == "[]":
            print(f"   > No controls found.")
            continue

        data = json.loads(clean_json_string)

        # Verify it's a list
        if isinstance(data, list):
            count = len(data)
            print(f"   > Success! Found {count} controls.")
            all_extracted_data.extend(data)
        else:
            print(f"   > Warning: Model returned valid JSON but not a list.")

    except json.JSONDecodeError:
        print(f"   > Error: Model output invalid JSON.\n   > Raw Output: {raw_response[:50]}...")

total_end_time = time.time()
total_duration = total_end_time - total_start_time
minutes = int(total_duration // 60)
seconds = int(total_duration % 60)


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Starting extraction...
--- Processing Page 1 ---
   > Time taken: 20.01 seconds
   > Success! Found 30 controls.
--- Processing Page 2 ---
   > Time taken: 33.09 seconds
   > Success! Found 42 controls.
--- Processing Page 3 ---
   > Time taken: 12.08 seconds
   > Success! Found 17 controls.
--- Processing Page 4 ---
   > Time taken: 12.00 seconds
   > Success! Found 17 controls.
--- Processing Page 5 ---
   > Time taken: 25.76 seconds
   > Success! Found 37 controls.
--- Processing Page 6 ---
   > Time taken: 12.39 seconds
   > Success! Found 18 controls.
--- Processing Page 7 ---
   > Time taken: 13.45 seconds
   > Success! Found 21 controls.
--- Processing Page 8 ---
   > Time taken: 18.41 seconds
   > Success! Found 26 controls.
--- Processing Page 9 ---
   > Time taken: 28.77 seconds
   > Success! Found 42 controls.
--- Processing Page 10 ---
   > Time taken: 33.31 seconds
   > Success! Found 43 controls.
--- Processing Page 11 ---
   > Time taken: 39.49 seconds
   > Success! Foun

In [35]:
clean_json_string = clean_and_validate_json(all_extracted_data)

print("\nDONE!")
print(f"Total controls found: {len(all_extracted_data)}")
print(f"Controls After cleaning and removing duplicates: {len(clean_json_string)}")


DONE!
Total controls found: 633
Controls After cleaning and removing duplicates: 583


In [36]:
print(json.dumps(clean_json_string, indent=2))

[
  {
    "control_id": "MP-1",
    "control_title": "Policy and Procedures",
    "control_desc": "Policy and Procedures"
  },
  {
    "control_id": "MP-2",
    "control_title": "Media Access",
    "control_desc": "Media Access"
  },
  {
    "control_id": "MP-2(1)",
    "control_title": "Automated Restricted Access",
    "control_desc": "Automated Restricted Access"
  },
  {
    "control_id": "MP-2(2)",
    "control_title": "Cryptographic Protection",
    "control_desc": "Cryptographic Protection"
  },
  {
    "control_id": "MP-3",
    "control_title": "Media Marking",
    "control_desc": "Media Marking"
  },
  {
    "control_id": "MP-4",
    "control_title": "Media Storage",
    "control_desc": "Media Storage"
  },
  {
    "control_id": "MP-4(1)",
    "control_title": "Cryptographic Protection",
    "control_desc": "Cryptographic Protection"
  },
  {
    "control_id": "MP-4(2)",
    "control_title": "Automated Restricted Access",
    "control_desc": "Automated Restricted Access"
  },


In [38]:
with open("output.json", "w") as f:
    json.dump(clean_json_string, f, indent=2)

In [14]:
import json
with open('output.json',"r") as f:
    all_extracted_data = json.load(f)

In [37]:
print(f"the extraction loop took {minutes}m {seconds}s to finish.")

the extraction loop took 8m 3s to finish.


In [15]:
from langchain_huggingface import HuggingFacePipeline, HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate

from langchain_classic.chains import LLMChain

In [130]:
from huggingface_hub import login
from dotenv import load_dotenv
import os
load_dotenv()
login(token=os.environ['hf_token'])

In [33]:
def load_model(model_id="Qwen/Qwen2.5-7B-Instruct"):

    bnb_config = BitsAndBytesConfig(load_in_4bit=True,
                                    bnb_4bit_use_double_quant=True,
                                    bnb_4bit_quant_type="nf4",
                                    bnb_4bit_compute_dtype=torch.float16)
    
    # tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_id)

    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
        
    # 2. Set padding side to 'left' for generation (Important for decoder-only models)
    tokenizer.padding_side = "left"

    # model
    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        quantization_config=bnb_config,
        device_map="auto",
        trust_remote_code=True
    )
    return tokenizer,model

In [57]:

def setup_langchain_rag(text_pages):
    # using langchain document extraction because it takes care of metadata
    # which makes sure that the model understands the context of the chunk (which page the chunk is from).
    documents = []
    for i,page_text in enumerate(text_pages):
        doc = Document(page_content=page_text, metadata={"page_number": i + 1, "source": "pdf_upload"})
        documents.append(doc)
    # using a lightweight embedding model
    # which stores the vector embeddings of the pdf file.
    embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
    vectorstore = Chroma.from_documents(
        documents=documents, 
        embedding=embeddings,
        collection_name="pdf_validation_store"
    )
    tokenizer,model = load_model()
    
    # Creating the Pipeline based on the model loaded already
    text_generation_pipeline = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=1024,
        temperature=0.1, # Low temp for strict validation
        do_sample=True,                # Switch to True to make penalty work better
        top_p=0.9,                     # Standard sampling
        repetition_penalty=1.15,      
        pad_token_id=tokenizer.eos_token_id
    
    )
    
    llm = HuggingFacePipeline(pipeline=text_generation_pipeline)
    
    return vectorstore, llm

In [122]:
def validate_extracted_controls(extracted_controls, vectorstore, llm):
    # prompt for the validation llm
    validation_template = """
    You are a Quality Assurance Auditor. Verify if the following control exists in the source text.
    
    SOURCE TEXT (Reference):
    {context}
    
    CLAIMED CONTROL (To Verify):
    ID: {control_id}
    Title: {control_title}
    Description: {control_desc}
    
    TASK:
    Determine if the "Claimed Control" is explicitly supported by the "Source Text".
    Also make sure the control given isnt just a change made in any other control
    
    OUTPUT FORMAT:
    Return only a JSON object with this format:
    {{
        "is_valid": true/false,
        "reason": "Short explanation citing the text",
        "confidence_score": 1-10
    }}
    """
    # put the prompt into proper template for the model pipeline
    prompt = PromptTemplate(
        template=validation_template,
        input_variables=["context", "control_id", "control_title", "control_desc"]
    )
    
    # Creating a chain manually for better control
    retriever = vectorstore.as_retriever(search_kwargs={"k": 1}) # Retrieve top 1 page
    
    validated_results = []
    
    for control in extracted_controls:
        # Retrieval of the relvant chunk using control id and control description for better accuracy
        query = f"{control['control_id']} {control['control_desc']}"
        relevant_docs = retriever.invoke(query)
        
        # taking the content of the most relevant chunk
        context_text = relevant_docs[0].page_content if relevant_docs else "No context found."
        found_page = relevant_docs[0].metadata['page_number'] if relevant_docs else "Unknown"

        # passing the content and control to the model to validate the control extracted.
        chain = prompt | llm
        
        try:
            # Run the validation
            response = chain.invoke({
                "context": context_text,
                "control_id": control['control_id'],
                "control_title": control['control_title'],
                "control_desc": control['control_desc']
            })
            
            # Clean up response to get JSON (Model might chatter)
            response_clean = response.split("{")[-1].split("}")[0]
            response_clean=response.split("```json")[1].split('```')[0].strip()
            response_json = json.loads(response_clean )
            
            # Add validation metadata to the control
            control['validation'] = {
                'status': 'PASS' if response_json.get('is_valid') else 'FAIL',
                'verified_on_page': found_page,
                'reason': response_json.get('reason')
            }
            
        except Exception as e:
            control['validation'] = {'status': 'ERROR', 'reason': str(e)}
            # print(response[:50],"...",response[-50:])
            
        validated_results.append(control)
        print(f" > Checked {control['control_id']}: {control['validation']['status']}")
        

    return validated_results

In [124]:

import gc
gc.collect()

0

In [112]:
# Setup Vector Store using the text_pages you already cleaned
vectorstore, langchain_llm = setup_langchain_rag(text_page)

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

Device set to use cuda:0


In [125]:



# Validate the extracted data
final_validated_data = validate_extracted_controls(all_extracted_data[:10], vectorstore, langchain_llm)



{
    "is_valid": true,
    "reason": "The source text describes 'Policy and Procedures' under the 'MA-1 POLICY AND PROCEDURES' section, which aligns with the claimed control ID MP-1 titled 'Policy and Procedures'.",
    "confidence_score": 10
}
 > Checked MP-1: PASS
{
    "is_valid": true,
    "reason": "The source text lists 'Media Access' as control number MP-2.",
    "confidence_score": 9
}
 > Checked MP-2: PASS
{
    "is_valid": true,
    "reason": "The source text lists 'MP-2(1) AUTOMATED RESTRICTED ACCESS' as implemented.",
    "confidence_score": 9
}
 > Checked MP-2(1): PASS
{
    "is_valid": false,
    "reason": "The source text does not contain the specific term 'Cryptographic Protection' nor any explicit reference to the control ID MP-2(2). While it mentions 'cryptographic module', which relates to cryptography, there is no direct mention of the control title or ID.",
    "confidence_score": 8
}
 > Checked MP-2(2): FAIL
{
    "is_valid": true,
    "reason": "The source text 

In [126]:
# 3. Save Final Result
with open(f"extracted_controls/output_validated.json", "w", encoding="utf-8") as f:
    json.dump(final_validated_data, f, indent=2, ensure_ascii=False)