In [1]:
import json
import re
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
with open("chunks.json", "r", encoding="utf-8") as f:
    chunks = json.load(f)

print("Total chunks loaded:", len(chunks))
print("Sample chunk:", chunks[0])

Total chunks loaded: 271
Sample chunk: {'id': 'doc0_sec0_chunk0', 'text': 'FinSolve Technologies Engineering Document FinSolve Technologies Engineering Document', 'meta': {'chunk_id': 'doc0_sec0_chunk0', 'source': 'files\\engineering\\engineering_master_doc.md', 'filename': 'engineering_master_doc.md', 'doc_id': 0, 'section_index': 0, 'section_heading': 'FinSolve Technologies Engineering Document', 'title': 'FinSolve Technologies Engineering Document', 'department': 'engineering', 'roles': ['Engineering', 'C-Level'], 'token_count': 8}}


In [3]:
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

In [4]:
client = chromadb.Client(
    Settings(
        persist_directory="./chroma_db"   
    )
)

collection = client.get_or_create_collection(
    name="company_documents"             
)

# metadata for Chroma
def clean_metadata(meta: dict):
    cleaned = {}
    for k, v in meta.items():
        if isinstance(v, list):
            cleaned[k] = ", ".join(map(str, v))
        elif v is None:
            continue
        else:
            cleaned[k] = v
    return cleaned


if collection.count() == 0:
    for idx, chunk in enumerate(tqdm(chunks)):
        collection.add(
            ids=[chunk["id"]],
            documents=[chunk["text"]],
            embeddings=[model.encode(chunk["text"]).tolist()],
            metadatas=[clean_metadata(chunk["meta"])]
        )

print("Total documents indexed:", collection.count())


100%|██████████| 271/271 [00:08<00:00, 31.59it/s]

Total documents indexed: 271





In [5]:
collection.peek(limit=1)["metadatas"]

[{'source': 'files\\engineering\\engineering_master_doc.md',
  'roles': 'Engineering, C-Level',
  'doc_id': 0,
  'section_heading': 'FinSolve Technologies Engineering Document',
  'token_count': 8,
  'title': 'FinSolve Technologies Engineering Document',
  'section_index': 0,
  'filename': 'engineering_master_doc.md',
  'department': 'engineering',
  'chunk_id': 'doc0_sec0_chunk0'}]

In [6]:
def normalize_query(query: str) -> str:
    query = query.lower()
    query = re.sub(r"[^a-z0-9\s]", "", query)
    return query.strip()

In [7]:
ROLE_HIERARCHY = {
    "C-Level": ["engineering", "finance", "hr", "marketing", "general"],
    "HR": ["hr", "general"],
    "Finance": ["finance", "general"],
    "Engineering": ["engineering", "general"],
    "Marketing": ["marketing", "general"]
}

def build_role_filter(user_role: str):
    if user_role not in ROLE_HIERARCHY:
        raise ValueError("Invalid role")

    return {
        "department": {"$in": ROLE_HIERARCHY[user_role]}
    }

In [8]:
def role_based_search(query, user_role, top_k=5):
    clean_query = normalize_query(query)
    query_embedding = model.encode([clean_query]).tolist()

    role_filter = build_role_filter(user_role)

    results = collection.query(
        query_embeddings=query_embedding,
        n_results=top_k,
        where=role_filter
    )
    return results

In [9]:
test_cases = [
    ("Finance", "leave policies", 0),
    ("HR", "leave policies", 1),
    ("Engineering", "leave policies", 0),
    ("C-Level", "employee handbook policies", 1)
]

for role, query, expected_min in test_cases:
    res = role_based_search(query, role)
    actual = len(res["documents"][0])

    print(f"Role: {role}")
    print(f"Query: {query}")
    print(f"Results: {actual}")

    if expected_min == 0:
        print("PASS" if actual == 0 else "FAIL")
    else:
        print("PASS" if actual >= expected_min else "FAIL")

    print("-" * 50)

Role: Finance
Query: leave policies
Results: 5
FAIL
--------------------------------------------------
Role: HR
Query: leave policies
Results: 5
PASS
--------------------------------------------------
Role: Engineering
Query: leave policies
Results: 5
FAIL
--------------------------------------------------
Role: C-Level
Query: employee handbook policies
Results: 5
PASS
--------------------------------------------------


In [26]:
import re

def tokenize(text: str):
    text = text.lower()
    text = re.sub(r"[^a-z0-9\s]", "", text)
    return set(text.split())


def get_top_k_context(results, top_k):
    return (
        results["documents"][0][:top_k],
        results["metadatas"][0][:top_k],
        results["distances"][0][:top_k]
    )


def clean_context_chunks(chunks, max_chars=300):
    cleaned = []
    for chunk in chunks:
        text = chunk.replace("\n", " ").strip()
        cleaned.append(text[:max_chars])
    return cleaned


In [28]:
def is_answer_supported(query: str, context_chunks: list, min_overlap=0.15):
    query_tokens = tokenize(query)
    context_tokens = set()

    for chunk in context_chunks:
        context_tokens |= tokenize(chunk)

    if not query_tokens:
        return False

    overlap_ratio = len(query_tokens & context_tokens) / len(query_tokens)
    return overlap_ratio >= min_overlap


In [51]:
def hallucination_check(answer: str, context_chunks: list, threshold=0.4):
    answer_tokens = tokenize(answer)
    context_tokens = set()

    for chunk in context_chunks:
        context_tokens |= tokenize(chunk)

    if not answer_tokens:
        return {"is_hallucinated": True, "reason": "Empty answer"}

    overlap = answer_tokens & context_tokens
    grounding_ratio = len(overlap) / len(answer_tokens)

    return {
        "is_hallucinated": grounding_ratio < threshold,
        "grounding_ratio": round(grounding_ratio, 2),
        "hallucination_score": round(1 - grounding_ratio, 2)
    }

In [30]:
def build_sources(metadatas, allowed_departments):
    seen = set()
    sources = []

    for m in metadatas:
        key = (m["title"], m["section_heading"])
        if key in seen:
            continue

        if m["department"] not in allowed_departments:
            continue

        seen.add(key)
        sources.append({
            "title": m["title"],
            "section": m["section_heading"],
            "department": m["department"]
        })

    return sources

In [38]:
def rag_pipeline(query, user_role, top_k=5):
    results = role_based_search(query, user_role, top_k)

    documents, metadatas, distances = get_top_k_context(results, top_k)

    if not documents:
        return {
            "answer": "Information not available in authorized documents.",
            "confidence": 0.0,
            "sources": [],
            "hallucination": {
                "is_hallucinated": True,
                "reason": "No relevant context"
            }
        }


    if not is_answer_supported(query, documents):
        return {
            "answer": "Information not available in authorized documents.",
            "confidence": 0.0,
            "sources": [],
            "hallucination": {
                "is_hallucinated": True,
                "reason": "Query not supported by retrieved context"
            }
        }

    cleaned_docs = clean_context_chunks(documents)

 
    answer_body = "\n\n---\n\n".join(cleaned_docs[:3])
    answer = "Based on authorized company documents:\n\n" + answer_body

 
    avg_distance = sum(distances) / len(distances)
    confidence = round(1 / (1 + avg_distance), 2)

    # Metadata-only sources
    sources = build_sources(
        metadatas,
        ROLE_HIERARCHY[user_role]
    )

    hallucination = hallucination_check(answer, documents)

    return {
        "answer": answer,
        "confidence": confidence,
        "sources": sources,
        "hallucination": hallucination
    }


In [None]:
def display_rag_response(response, role, query):
    print("=" * 90)
    print(f"ROLE   : {role}")
    print(f"QUERY  : {query}")
    print("-" * 90)

    print("ANSWER:\n")
    print(response["answer"])

    print("\nCONFIDENCE SCORE:", response["confidence"])

    print("\nSOURCES (Metadata-derived):")
    if not response["sources"]:
        print("No sources available (RBAC restricted or not found)")
    else:
        for i, src in enumerate(response["sources"], 1):
            print(
                f"{i}. Document   : {src['title']}\n"
                f"   Section    : {src['section']}\n"
                f"   Department : {src['department']}\n"
            )

    print("\nHALLUCINATION CHECK:")
    for k, v in response["hallucination"].items():
        print(f"{k}: {v}")

    print("=" * 90)


In [43]:
print("TEST CASE 1: SUCCESSFUL RAG RESPONSE (AUTHORIZED ROLE)")

role = "HR"
query = "leave entitlement"
response = rag_pipeline(query, role)
display_rag_response(response, role, query)

print("\nASSERTIONS:")
print("✔ PASS" if response["answer"].lower().startswith("based on authorized company documents") else "✘ FAIL")
print("✔ PASS" if len(response["sources"]) > 0 else "✘ FAIL")
print("✔ PASS" if not response["hallucination"]["is_hallucinated"] else "✘ FAIL")

TEST CASE 1: SUCCESSFUL RAG RESPONSE (AUTHORIZED ROLE)
ROLE   : HR
QUERY  : leave entitlement
------------------------------------------------------------------------------------------
ANSWER:

Based on authorized company documents:

Employee Handbook Leave Policies

---

Employee Handbook Exit Policy

---

Employee Handbook Statutory Benefits | Benefit | Details | |---------|---------| | **Employees’ Provident Fund (EPF)** | 12% employer & employee contribution; as per EPF Act, 1952 | | **Employee State Insurance (ESI)** | For employees earning ≤ ₹21,000/month; covers medical, disability, maternity | 

CONFIDENCE SCORE: 0.43

SOURCES (Metadata-derived):
1. Document   : Employee Handbook
   Section    : Leave Policies
   Department : general

2. Document   : Employee Handbook
   Section    : Exit Policy
   Department : general

3. Document   : Employee Handbook
   Section    : Statutory Benefits
   Department : general

4. Document   : Employee Handbook
   Section    : Reimbursement Po

In [44]:
print("TEST CASE 2: UNAUTHORIZED ACCESS (RBAC ENFORCEMENT)")


role = "Finance"
query = "leave entitlement"

response = rag_pipeline(query, role)

display_rag_response(response, role, query)

print("\nASSERTIONS:")
print("✔ PASS" if response["answer"] == "Information not available in authorized documents." else "✘ FAIL")
print("✔ PASS" if len(response["sources"]) == 0 else "✘ FAIL")
print("✔ PASS" if response["hallucination"]["is_hallucinated"] else "✘ FAIL")


TEST CASE 2: UNAUTHORIZED ACCESS (RBAC ENFORCEMENT)
ROLE   : Finance
QUERY  : leave entitlement
------------------------------------------------------------------------------------------
ANSWER:

Based on authorized company documents:

Employee Handbook Leave Policies

---

Employee Handbook Exit Policy

---

Employee Handbook Statutory Benefits | Benefit | Details | |---------|---------| | **Employees’ Provident Fund (EPF)** | 12% employer & employee contribution; as per EPF Act, 1952 | | **Employee State Insurance (ESI)** | For employees earning ≤ ₹21,000/month; covers medical, disability, maternity | 

CONFIDENCE SCORE: 0.43

SOURCES (Metadata-derived):
1. Document   : Employee Handbook
   Section    : Leave Policies
   Department : general

2. Document   : Employee Handbook
   Section    : Exit Policy
   Department : general

3. Document   : Employee Handbook
   Section    : Statutory Benefits
   Department : general

4. Document   : Employee Handbook
   Section    : Reimbursement 

In [46]:
print("=" * 90)
print("TEST CASE 4: SAME QUERY, DIFFERENT ROLES (RBAC ENFORCEMENT PROOF)")
print("=" * 90)

query = "security compliance"
roles = ["Engineering", "HR", "Finance", "C-Level"]

for role in roles:
    response = rag_pipeline(query, role)

    returned_departments = set(
        src["department"] for src in response["sources"]
    )

    allowed_departments = set(ROLE_HIERARCHY[role])

    print(f"\nROLE: {role}")
    print("Returned Departments :", returned_departments)
    print("Allowed Departments  :", allowed_departments)

    if returned_departments.issubset(allowed_departments):
        print("✔ PASS: RBAC enforced correctly")
    else:
        print("✘ FAIL: RBAC violation detected")


TEST CASE 4: SAME QUERY, DIFFERENT ROLES (RBAC ENFORCEMENT PROOF)

ROLE: Engineering
Returned Departments : {'engineering', 'general'}
Allowed Departments  : {'engineering', 'general'}
✔ PASS: RBAC enforced correctly

ROLE: HR
Returned Departments : {'general'}
Allowed Departments  : {'hr', 'general'}
✔ PASS: RBAC enforced correctly

ROLE: Finance
Returned Departments : {'general'}
Allowed Departments  : {'finance', 'general'}
✔ PASS: RBAC enforced correctly

ROLE: C-Level
Returned Departments : {'engineering', 'general'}
Allowed Departments  : {'marketing', 'engineering', 'finance', 'hr', 'general'}
✔ PASS: RBAC enforced correctly


HALLUCINATION TEST

In [None]:
print("NEGATIVE TEST — ANSWER NOT IN CONTEXT")

query = "What was the CEO's personal bonus in 2022?"
role = "HR"

response = rag_pipeline(query, role)

print("\nANSWER:\n", response["answer"])
print("\nCONFIDENCE:", response["confidence"])

print("\nHALLUCINATION CHECK:")
for k, v in response["hallucination"].items():
    print(f"{k}: {v}")

NEGATIVE TEST — ANSWER NOT IN CONTEXT

ANSWER:
 Information not available in authorized documents.

CONFIDENCE: 0.0

HALLUCINATION CHECK:
is_hallucinated: True
reason: Query not supported by retrieved context
