# Enterprise RAG (production-minded rebuild)

**Goal:** a RAG pipeline you can explain, debug, and extend — not a demo that “looks cool”.

**Constraints:**
- No private company docs (everything is synthetic but shaped like real policies/runbooks)
- Keep it runnable on a laptop / Colab
- Make retrieval decisions visible (scores, sources, eval)

**What’s implemented:**
- ~65 realistic docs → chunking → hybrid retrieval (BM25 + dense) → reranking
- Prompt payload with citations (LLM-agnostic)
- Small eval set + retrieval metrics
- A **real FastAPI service** under `app/` (`/ingest`, `/search`, `/ask`)

**What’s next (if you want to go “work-grade”):**
- persistence (Chroma/pgvector), auth, rate limits
- tracing (OpenTelemetry), offline eval runs in CI
- background ingestion + incremental reindexing


## 0) Install dependencies


In [None]:
%pip -q install -U "sentence-transformers>=3.0.0" faiss-cpu rank-bm25 rapidfuzz pyyaml


In [None]:
import os, json, math, random, hashlib, re
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple
import numpy as np

from sentence_transformers import SentenceTransformer, CrossEncoder
import faiss
from rank_bm25 import BM25Okapi


## 1) Configuration + utilities


In [None]:
@dataclass
class RagConfig:
    seed: int = 42
    chunk_size: int = 650
    chunk_overlap: int = 120
    dense_k: int = 20
    bm25_k: int = 20
    final_k: int = 5
    embed_model: str = "intfloat/e5-small-v2"
    rerank_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    normalize_embeddings: bool = True

CFG = RagConfig()
random.seed(CFG.seed)
np.random.seed(CFG.seed)

def simple_hash(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8")).hexdigest()[:12]

def clean_text(s: str) -> str:
    return " ".join(s.strip().split())

def chunk_text(text: str, chunk_size: int, overlap: int) -> List[str]:
    text = clean_text(text)
    if len(text) <= chunk_size:
        return [text]
    chunks = []
    start = 0
    while start < len(text):
        end = min(len(text), start + chunk_size)
        chunks.append(text[start:end])
        if end == len(text):
            break
        start = max(0, end - overlap)
    return chunks

def bm25_tokenize(s: str) -> List[str]:
    s = s.lower()
    return [t for t in re.split(r"[^a-z0-9]+", s) if t]


## 2) Create realistic sample corpus (65 docs)


In [None]:
def build_sample_corpus(n: int = 65) -> List[Dict[str, Any]]:
    base_docs = [
        ("Expense Reimbursement Policy", "Finance",
         "Employees must submit receipts within 30 days of the expense date via the Finance Portal. "
         "Meals are reimbursed up to $65/day domestically. Alcohol is not reimbursable. "
         "Managers approve expenses within 5 business days. Late submissions require VP approval."),
        ("Travel Booking Guidelines", "Finance",
         "Air travel must be booked through the approved travel vendor. Economy class for flights under 6 hours. "
         "For flights over 6 hours, premium economy is allowed with manager approval. "
         "Use the corporate card when possible."),
        ("Paid Time Off (PTO) Overview", "HR",
         "Full-time employees accrue PTO each pay period. PTO requests should be submitted at least 10 days in advance. "
         "Carryover is capped at 40 hours. Unused PTO above the cap is forfeited on January 31."),
        ("Parental Leave Policy", "HR",
         "Eligible employees receive 12 weeks of paid parental leave. Leave can be taken within 12 months of birth or adoption. "
         "Benefits continue during leave. Coordinate return-to-work plans with HR."),
        ("Information Security Password Standard", "Security",
         "Passwords must be at least 14 characters and include a mix of letters and numbers. "
         "MFA is required for all remote access. Passwords may not be reused for 12 cycles. "
         "Sharing passwords is prohibited."),
        ("Incident Response: Severity Levels", "Security",
         "SEV-1: full outage or major security incident; page on-call immediately. "
         "SEV-2: partial degradation; engage incident commander within 30 minutes. "
         "SEV-3: limited impact; handle in business hours unless escalated."),
        ("On-Call Runbook: Service Restart", "IT",
         "Before restarting a service, verify downstream dependencies are healthy. "
         "Use the deployment dashboard to initiate a rolling restart. "
         "If error rate spikes, rollback to the last known good version."),
        ("VPN Access Procedure", "IT",
         "VPN access is granted after completing security training. Requests are submitted in the Access Portal. "
         "Access is reviewed quarterly. Lost devices must be reported within 1 hour."),
        ("Code Review Guidelines", "Engineering",
         "All production code requires at least one approving review. "
         "High-risk changes (auth, payments) require two approvals. "
         "Reviewers must check tests, security implications, and operational impact."),
        ("Release Process (Weekly)", "Engineering",
         "We ship weekly on Wednesdays. Feature flags are required for user-facing changes. "
         "A release candidate is cut Tuesday 3 PM ET. Rollback plans must be documented."),
        ("Data Retention Policy", "Legal",
         "Customer support tickets are retained for 2 years. Audit logs are retained for 1 year. "
         "PII must be deleted upon verified request within 30 days, unless retention is legally required."),
        ("Customer Refund Policy", "Support",
         "Refunds are available within 14 days of purchase if usage is under 10%. "
         "Refunds outside the window require escalation. Refund processing time is 5-7 business days."),
        ("SLA: API Availability", "Engineering",
         "API uptime target is 99.9% monthly. Maintenance windows are announced 72 hours in advance. "
         "SLA credits apply if downtime exceeds 45 minutes in a month."),
        ("Laptop Asset Policy", "IT",
         "Corporate laptops must use full-disk encryption. Devices are replaced every 4 years. "
         "Personal software installations require approval. Report theft within 24 hours."),
    ]

    products = ["Atlas", "Beacon", "Cobalt", "Delta", "Echo", "Fjord", "Glimmer", "Helios"]
    systems = ["Payments", "Identity", "Search", "Messaging", "Analytics", "Billing", "Notifications"]
    regions = ["US", "EU", "APAC"]

    templated = []
    for p in products:
        templated.append((f"{p} Service Overview", "Engineering",
                          f"{p} is a core platform service used by internal teams. "
                          f"It exposes REST endpoints and publishes events to the message bus. "
                          f"Primary SLOs: latency p95 < 250ms and error rate < 1%."))
        templated.append((f"{p} Access Control", "Security",
                          f"Access to {p} is granted via role-based groups. "
                          f"Privileged actions require just-in-time access for 2 hours. "
                          f"All access is logged and reviewed monthly."))
    for s in systems:
        templated.append((f"Runbook: {s} High Latency", "IT",
                          f"If {s} p95 latency exceeds threshold for 10 minutes, page on-call. "
                          f"Check recent deploys, database saturation, and cache hit rate. "
                          f"Mitigation: scale replicas, enable rate limiting, or rollback."))
    for r in regions:
        templated.append((f"Data Residency - {r}", "Legal",
                          f"Customer data stored in {r} must remain within {r} boundaries. "
                          f"Cross-region replication requires legal approval and documented justification."))

    all_docs = base_docs + templated
    random.shuffle(all_docs)

    docs = []
    for i, (title, dept, text) in enumerate(all_docs[:n], start=1):
        doc_id = f"DOC-{i:03d}-{simple_hash(title)}"
        docs.append({
            "doc_id": doc_id,
            "title": title,
            "department": dept,
            "text": clean_text(text)
        })
    return docs

corpus = build_sample_corpus(n=65)
print("Docs:", len(corpus))
for d in corpus[:5]:
    print(d["doc_id"], "|", d["department"], "|", d["title"])


## 3) Chunk + index (BM25 + FAISS)


In [None]:
def build_chunks(corpus: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    chunks = []
    for doc in corpus:
        parts = chunk_text(doc["text"], CFG.chunk_size, CFG.chunk_overlap)
        for j, part in enumerate(parts):
            chunks.append({
                "chunk_id": f'{doc["doc_id"]}::C{j:02d}',
                "doc_id": doc["doc_id"],
                "title": doc["title"],
                "department": doc["department"],
                "text": part
            })
    return chunks

chunks = build_chunks(corpus)
print(f"Chunks: {len(chunks)}")


In [None]:
bm25_tokens = [bm25_tokenize(c["text"]) for c in chunks]
bm25 = BM25Okapi(bm25_tokens)


In [None]:
embedder = SentenceTransformer(CFG.embed_model)

def embed_texts(texts: List[str]) -> np.ndarray:
    emb = embedder.encode(texts, batch_size=64, normalize_embeddings=CFG.normalize_embeddings, show_progress_bar=True)
    return np.asarray(emb, dtype="float32")

chunk_emb = embed_texts([c["text"] for c in chunks])
dim = chunk_emb.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(chunk_emb)
print("FAISS size:", index.ntotal, "dim:", dim)


## 4) Hybrid retrieval + reranking


In [None]:
reranker = CrossEncoder(CFG.rerank_model)

def dense_search(query: str, k: int) -> List[Tuple[int, float]]:
    q = embed_texts([query])
    scores, ids = index.search(q, k)
    return [(int(i), float(s)) for i, s in zip(ids[0], scores[0]) if i != -1]

def bm25_search(query: str, k: int) -> List[Tuple[int, float]]:
    scores = bm25.get_scores(bm25_tokenize(query))
    top = np.argsort(scores)[::-1][:k]
    return [(int(i), float(scores[i])) for i in top]

def hybrid_candidates(query: str) -> Dict[int, Dict[str, float]]:
    cand: Dict[int, Dict[str, float]] = {}
    for idx, s in dense_search(query, CFG.dense_k):
        cand.setdefault(idx, {})["dense_score"] = s
    for idx, s in bm25_search(query, CFG.bm25_k):
        cand.setdefault(idx, {})["bm25_score"] = s
    return cand

def rerank(query: str, candidate_idxs: List[int]) -> List[Tuple[int, float]]:
    pairs = [(query, chunks[i]["text"]) for i in candidate_idxs]
    scores = reranker.predict(pairs)
    ranked = sorted(zip(candidate_idxs, scores), key=lambda x: x[1], reverse=True)
    return [(int(i), float(s)) for i, s in ranked]

def retrieve(query: str, k: int = None) -> List[Dict[str, Any]]:
    k = k or CFG.final_k
    cand = hybrid_candidates(query)
    candidate_idxs = list(cand.keys())
    ranked = rerank(query, candidate_idxs)[:k]

    out = []
    for idx, rr_score in ranked:
        meta = cand.get(idx, {})
        c = chunks[idx]
        out.append({
            "chunk_id": c["chunk_id"],
            "doc_id": c["doc_id"],
            "title": c["title"],
            "department": c["department"],
            "text": c["text"],
            "rerank_score": float(rr_score),
            "dense_score": meta.get("dense_score"),
            "bm25_score": meta.get("bm25_score"),
        })
    return out

def pretty_results(results: List[Dict[str, Any]]):
    for i, r in enumerate(results, start=1):
        print(f"\n=== #{i} | {r['title']} ({r['doc_id']}) [{r['department']}] ===")
        print(f"rerank={r['rerank_score']:.4f} | dense={r['dense_score']} | bm25={r['bm25_score']}")
        print(r["text"])


## 5) Demo retrieval


In [None]:
demo_queries = [
    "What is the refund window for a customer purchase?",
    "How many days do I have to submit an expense reimbursement receipt?",
    "When should SEV-1 incidents page on-call?",
    "Do we require MFA for remote access?",
    "What is the weekly release day and when do we cut the release candidate?"
]
for q in demo_queries:
    print("\n\n#", q)
    pretty_results(retrieve(q))


## 6) LLM-ready prompt payload (citations included)


In [None]:
def build_prompt_payload(question: str, contexts: List[Dict[str, Any]]) -> Dict[str, Any]:
    sources = [{"doc_id": c["doc_id"], "title": c["title"], "chunk_id": c["chunk_id"]} for c in contexts]
    context_text = "\n\n".join(
        [f"[{i+1}] {c['title']} ({c['doc_id']}):\n{c['text']}" for i, c in enumerate(contexts)]
    )

    system = (
        "You are an enterprise assistant. Answer ONLY from the provided context. "
        "If the answer is not in the context, say you don't know and ask one clarifying question."
    )
    user = f"Question: {question}\n\nContext:\n{context_text}\n\nAnswer with citations like [1], [2]."
    return {"system": system, "user": user, "sources": sources}

payload = build_prompt_payload("What is the refund window?", retrieve("refund window"))
print(payload["user"][:500])
print("Sources:", payload["sources"][:2])


## 7) Evaluation (35 Q/A) + metrics


In [None]:
def build_eval_set(corpus: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    title_to_id = {d["title"]: d["doc_id"] for d in corpus}

    seeds = [
        ("Customer Refund Policy", "refund window"),
        ("Expense Reimbursement Policy", "receipts within 30 days"),
        ("Information Security Password Standard", "MFA required"),
        ("Incident Response: Severity Levels", "SEV-1 page on-call"),
        ("Paid Time Off (PTO) Overview", "PTO carryover cap"),
        ("Release Process (Weekly)", "weekly release day"),
        ("Code Review Guidelines", "how many approvals"),
        ("Data Retention Policy", "support ticket retention"),
        ("VPN Access Procedure", "VPN access reviewed"),
        ("Laptop Asset Policy", "laptop replacement cycle"),
        ("SLA: API Availability", "uptime target"),
        ("Travel Booking Guidelines", "economy class 6 hours"),
        ("Parental Leave Policy", "weeks of paid parental leave"),
    ]

    eval_items = []
    for title, q in seeds:
        if title in title_to_id:
            eval_items.append({
                "question": q if q.endswith("?") else q + "?",
                "expected_doc_id": title_to_id[title],
                "expected_title": title,
            })

    # inferred templates
    products, systems, regions = [], [], []
    for d in corpus:
        if d["title"].endswith(" Service Overview"):
            products.append(d["title"].replace(" Service Overview", ""))
        if d["title"].startswith("Runbook: ") and d["title"].endswith(" High Latency"):
            systems.append(d["title"].replace("Runbook: ", "").replace(" High Latency", ""))
        if d["title"].startswith("Data Residency - "):
            regions.append(d["title"].replace("Data Residency - ", ""))

    for p in products[:8]:
        t = f"{p} Access Control"
        if t in title_to_id:
            eval_items.append({
                "question": f"How is access granted to {p}?",
                "expected_doc_id": title_to_id[t],
                "expected_title": t,
            })
    for s in systems[:8]:
        t = f"Runbook: {s} High Latency"
        if t in title_to_id:
            eval_items.append({
                "question": f"What should I check when {s} latency is high?",
                "expected_doc_id": title_to_id[t],
                "expected_title": t,
            })
    for r in regions[:3]:
        t = f"Data Residency - {r}"
        if t in title_to_id:
            eval_items.append({
                "question": f"Can customer data be replicated out of {r}?",
                "expected_doc_id": title_to_id[t],
                "expected_title": t,
            })

    # Paraphrases to reach ~35
    out = []
    for item in eval_items:
        out.append(item)
        out.append({**item, "question": "Please summarize: " + item["question"]})
        out.append({**item, "question": "What does our policy say about " + item["question"].rstrip("?").lower() + "?"})

    out = out[:35]
    random.shuffle(out)
    return out

eval_set = build_eval_set(corpus)
print("Eval examples:", len(eval_set))
print(eval_set[0])


In [None]:
def precision_at_k(retrieved_doc_ids: List[str], expected: str, k: int) -> float:
    return 1.0 if expected in retrieved_doc_ids[:k] else 0.0

def reciprocal_rank(retrieved_doc_ids: List[str], expected: str) -> float:
    for i, doc_id in enumerate(retrieved_doc_ids, start=1):
        if doc_id == expected:
            return 1.0 / i
    return 0.0

def ndcg_at_k(retrieved_doc_ids: List[str], expected: str, k: int) -> float:
    dcg = 0.0
    for i, doc_id in enumerate(retrieved_doc_ids[:k], start=1):
        if doc_id == expected:
            dcg += 1.0 / math.log2(i + 1)
    return dcg  # IDCG=1.0 for binary relevance at rank 1

def evaluate(eval_set: List[Dict[str, Any]], k: int = 5) -> Dict[str, Any]:
    pks, mrrs, ndcgs = [], [], []
    failures = []
    for ex in eval_set:
        results = retrieve(ex["question"], k=k)
        doc_ids = [r["doc_id"] for r in results]
        pks.append(precision_at_k(doc_ids, ex["expected_doc_id"], k))
        mrrs.append(reciprocal_rank(doc_ids, ex["expected_doc_id"]))
        ndcgs.append(ndcg_at_k(doc_ids, ex["expected_doc_id"], k))
        if ex["expected_doc_id"] not in doc_ids:
            failures.append({
                "question": ex["question"],
                "expected": ex["expected_title"],
                "got": [r["title"] for r in results]
            })
    return {
        f"Precision@{k}": float(np.mean(pks)),
        "MRR": float(np.mean(mrrs)),
        f"nDCG@{k}": float(np.mean(ndcgs)),
        "failures": failures
    }

metrics = evaluate(eval_set, k=5)
print({k:v for k,v in metrics.items() if k != "failures"})
print("Failures:", len(metrics["failures"]))


## 8) Save corpus + eval to `data/` (GitHub-friendly)


In [None]:
from pathlib import Path
out_dir = Path("data")
out_dir.mkdir(exist_ok=True)

with open(out_dir/"sample_corpus.jsonl", "w", encoding="utf-8") as f:
    for d in corpus:
        f.write(json.dumps(d, ensure_ascii=False) + "\n")

with open(out_dir/"eval_set.json", "w", encoding="utf-8") as f:
    json.dump(eval_set, f, indent=2)

print("Wrote:", out_dir/"sample_corpus.jsonl")
print("Wrote:", out_dir/"eval_set.json")


## 9) FastAPI service (real files under `app/`)

This writes a runnable service to disk (no more blueprint string).

In [None]:
# Create a real FastAPI app/ layout
from pathlib import Path

root = Path("app")
(root / "routers").mkdir(parents=True, exist_ok=True)
(root / "core").mkdir(parents=True, exist_ok=True)
(root / "__init__.py").write_text("", encoding="utf-8")
(root / "routers" / "__init__.py").write_text("", encoding="utf-8")
(root / "core" / "__init__.py").write_text("", encoding="utf-8")
print("Created:", root.resolve())


In [None]:
%%writefile requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.30.1
pydantic==2.7.4
python-dotenv==1.0.1

# Retrieval stack
rank-bm25==0.2.2
rapidfuzz==3.9.6

# Dense + reranker (pins to avoid numpy/scipy breakage)
numpy==1.26.4
scipy==1.11.4
sentence-transformers==3.0.1
faiss-cpu==1.8.0.post1


In [None]:
%%writefile app/core/config.py
from pydantic import BaseModel, Field

class Settings(BaseModel):
    # Models
    embed_model: str = Field(default="sentence-transformers/all-MiniLM-L6-v2")
    rerank_model: str = Field(default="cross-encoder/ms-marco-MiniLM-L-6-v2")

    # Retrieval knobs (tune these, don’t hardcode in random places)
    chunk_size: int = 650
    chunk_overlap: int = 120
    bm25_weight: float = 0.35
    dense_k: int = 25
    bm25_k: int = 40
    rerank_k: int = 8

    # Storage (simple local persistence for portfolio)
    data_dir: str = "data"
    index_dir: str = "data/index"

settings = Settings()


In [None]:
%%writefile app/schemas.py
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional

class DocIn(BaseModel):
    doc_id: str
    title: str
    domain: str
    text: str
    version: Optional[str] = None

class IngestRequest(BaseModel):
    docs: List[DocIn] = Field(..., min_length=1)

class SearchRequest(BaseModel):
    query: str
    k: int = 8

class AskRequest(BaseModel):
    question: str
    k: int = 8

class ContextHit(BaseModel):
    doc_id: str
    chunk_id: str
    title: str
    domain: str
    text: str
    score: float
    signals: Dict[str, Any] = {}

class SearchResponse(BaseModel):
    query: str
    hits: List[ContextHit]

class AskResponse(BaseModel):
    question: str
    hits: List[ContextHit]
    prompt: Dict[str, Any]


In [None]:
%%writefile app/core/text_utils.py
import re
from typing import List

_word = re.compile(r"[A-Za-z0-9_./-]+")

def tokenize(text: str) -> List[str]:
    # Keeps tokens like "S3", "PCI-DSS", "v1.2", "PAY-1234"
    return [t.lower() for t in _word.findall(text)]

def normalize_ws(s: str) -> str:
    return " ".join(s.split())


In [None]:
%%writefile app/core/chunking.py
from typing import Dict, List, Any

def chunk_doc(doc: Dict[str, Any], chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
    text = doc["text"]
    # Simple token-ish chunking (works fine for portfolio); swap for semantic chunking later.
    words = text.split()
    chunks = []
    start = 0
    idx = 0
    while start < len(words):
        end = min(len(words), start + chunk_size)
        chunk_words = words[start:end]
        chunk_text = " ".join(chunk_words).strip()
        chunks.append({
            "chunk_id": f"{doc['doc_id']}::c{idx}",
            "doc_id": doc["doc_id"],
            "title": doc["title"],
            "domain": doc.get("domain", "Unknown"),
            "text": chunk_text,
            "start_word": start,
            "end_word": end,
        })
        idx += 1
        if end == len(words):
            break
        start = max(0, end - overlap)
    return chunks


In [None]:
%%writefile app/core/index.py
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Tuple

from rank_bm25 import BM25Okapi

from .text_utils import tokenize, normalize_ws
from .chunking import chunk_doc

# Optional heavy deps (we fail loudly with clear message)
try:
    import numpy as np
    import faiss
    from sentence_transformers import SentenceTransformer, CrossEncoder
except Exception as e:  # pragma: no cover
    np = None
    faiss = None
    SentenceTransformer = None
    CrossEncoder = None
    _IMPORT_ERR = e
else:
    _IMPORT_ERR = None

@dataclass
class BuiltIndex:
    chunks: List[Dict[str, Any]]
    bm25: BM25Okapi
    dense_index: Any  # faiss index
    embeddings: Any   # np.ndarray
    embedder: Any
    reranker: Any

class RagIndex:
    def __init__(self, settings):
        self.s = settings
        self._built: BuiltIndex | None = None

    # ------------ Build / Persist ------------
    def build(self, docs: List[Dict[str, Any]]) -> None:
        if _IMPORT_ERR is not None:
            raise RuntimeError(
                "Dense/reranker deps failed to import. "
                "Install pinned requirements and restart. Original error: "
                f"{_IMPORT_ERR}"
            )

        # Chunk
        chunks: List[Dict[str, Any]] = []
        for d in docs:
            d = {**d, "text": normalize_ws(d["text"])}
            chunks.extend(chunk_doc(d, self.s.chunk_size, self.s.chunk_overlap))

        # BM25
        tokenized = [tokenize(c["text"]) for c in chunks]
        bm25 = BM25Okapi(tokenized)

        # Dense index (FAISS)
        embedder = SentenceTransformer(self.s.embed_model)
        chunk_texts = [c["text"] for c in chunks]
        embs = embedder.encode(chunk_texts, normalize_embeddings=True, batch_size=64, show_progress_bar=False)
        embs = np.asarray(embs, dtype="float32")

        dim = embs.shape[1]
        dense_index = faiss.IndexFlatIP(dim)
        dense_index.add(embs)

        # Reranker
        reranker = CrossEncoder(self.s.rerank_model)

        self._built = BuiltIndex(
            chunks=chunks,
            bm25=bm25,
            dense_index=dense_index,
            embeddings=embs,
            embedder=embedder,
            reranker=reranker,
        )

    def save(self, index_dir: str) -> None:
        assert self._built is not None, "Index not built"
        p = Path(index_dir)
        p.mkdir(parents=True, exist_ok=True)

        # Chunks
        (p / "chunks.jsonl").write_text(
            "\n".join(json.dumps(c, ensure_ascii=False) for c in self._built.chunks),
            encoding="utf-8",
        )

        # FAISS index + embeddings
        faiss.write_index(self._built.dense_index, str(p / "faiss.index"))
        np.save(p / "embeddings.npy", self._built.embeddings)

        # BM25 needs tokenized corpus; we rebuild from chunks on load
        (p / "meta.json").write_text(json.dumps({
            "embed_model": self.s.embed_model,
            "rerank_model": self.s.rerank_model,
            "chunk_size": self.s.chunk_size,
            "chunk_overlap": self.s.chunk_overlap,
            "bm25_weight": self.s.bm25_weight,
        }, indent=2), encoding="utf-8")

    def load(self, index_dir: str) -> None:
        if _IMPORT_ERR is not None:
            raise RuntimeError(
                "Dense/reranker deps failed to import. "
                "Install pinned requirements and restart. Original error: "
                f"{_IMPORT_ERR}"
            )
        p = Path(index_dir)
        chunks = [json.loads(line) for line in (p / "chunks.jsonl").read_text(encoding="utf-8").splitlines() if line.strip()]
        tokenized = [tokenize(c["text"]) for c in chunks]
        bm25 = BM25Okapi(tokenized)

        embedder = SentenceTransformer(self.s.embed_model)
        reranker = CrossEncoder(self.s.rerank_model)

        dense_index = faiss.read_index(str(p / "faiss.index"))
        embs = np.load(p / "embeddings.npy")

        self._built = BuiltIndex(
            chunks=chunks,
            bm25=bm25,
            dense_index=dense_index,
            embeddings=embs,
            embedder=embedder,
            reranker=reranker,
        )

    # ------------ Retrieval ------------
    def search(self, query: str, k: int) -> List[Dict[str, Any]]:
        assert self._built is not None, "Index not ready"

        # BM25 scores
        q_tokens = tokenize(query)
        bm25_scores = self._built.bm25.get_scores(q_tokens)

        # Dense scores
        q_emb = self._built.embedder.encode([query], normalize_embeddings=True, show_progress_bar=False)
        q_emb = np.asarray(q_emb, dtype="float32")
        dense_scores, dense_ids = self._built.dense_index.search(q_emb, self.s.dense_k)

        dense_map = {int(i): float(s) for i, s in zip(dense_ids[0], dense_scores[0]) if i != -1}

        # Hybrid combine (simple weighted sum after min-max normalization)
        bm25 = np.asarray(bm25_scores, dtype="float32")
        if bm25.max() > bm25.min():
            bm25_n = (bm25 - bm25.min()) / (bm25.max() - bm25.min())
        else:
            bm25_n = bm25 * 0.0

        dense = np.zeros_like(bm25_n)
        for i, s in dense_map.items():
            dense[i] = s
        if dense.max() > dense.min():
            dense_n = (dense - dense.min()) / (dense.max() - dense.min())
        else:
            dense_n = dense * 0.0

        w = self.s.bm25_weight
        hybrid = (w * bm25_n) + ((1 - w) * dense_n)

        # Candidate pool
        cand_ids = np.argsort(-hybrid)[: max(self.s.bm25_k, self.s.dense_k)].tolist()
        cands = [self._built.chunks[i] for i in cand_ids]

        # Rerank top candidates
        pairs = [(query, c["text"]) for c in cands]
        rr_scores = self._built.reranker.predict(pairs)
        rr_scores = [float(x) for x in rr_scores]

        reranked = sorted(zip(cands, rr_scores), key=lambda x: x[1], reverse=True)[:k]

        hits = []
        for c, s in reranked:
            hits.append({
                **c,
                "score": s,
                "signals": {
                    "bm25_weight": self.s.bm25_weight,
                    "candidate_pool": len(cands),
                }
            })
        return hits


In [None]:
%%writefile app/core/prompting.py
from typing import Any, Dict, List

def build_prompt_payload(question: str, hits: List[Dict[str, Any]]) -> Dict[str, Any]:
    citations = [{"doc_id": h["doc_id"], "chunk_id": h["chunk_id"], "title": h["title"]} for h in hits]
    context_text = "\n\n".join(
        [f"[{i+1}] {h['title']} ({h['domain']})\n{h['text']}" for i, h in enumerate(hits)]
    )

    system = (
        "You are an internal assistant. Answer using ONLY the provided context. "
        "If the answer isn't in context, say you don't know and ask for the missing policy/runbook."
    )
    user = f"Question: {question}\n\nContext:\n{context_text}\n\nReturn a concise answer + cite sources like [1], [2]."

    return {"system": system, "user": user, "citations": citations}


In [None]:
%%writefile app/routers/ingest.py
from fastapi import APIRouter, HTTPException
from typing import Dict, Any

from ..schemas import IngestRequest
from ..services import rag_service

router = APIRouter()

@router.post("/ingest")
def ingest(req: IngestRequest) -> Dict[str, Any]:
    # Simple: rebuild whole index. (Production: background job + incremental updates.)
    docs = [d.model_dump() for d in req.docs]
    try:
        rag_service.build_and_persist(docs)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    return {"ok": True, "docs": len(docs)}


In [None]:
%%writefile app/routers/search.py
from fastapi import APIRouter, HTTPException
from typing import Dict, Any

from ..schemas import SearchRequest, SearchResponse, ContextHit
from ..services import rag_service

router = APIRouter()

@router.post("/search", response_model=SearchResponse)
def search(req: SearchRequest):
    try:
        hits = rag_service.search(req.query, k=req.k)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

    return {
        "query": req.query,
        "hits": [ContextHit(**h) for h in hits]
    }


In [None]:
%%writefile app/routers/ask.py
from fastapi import APIRouter, HTTPException

from ..schemas import AskRequest, AskResponse, ContextHit
from ..services import rag_service

router = APIRouter()

@router.post("/ask", response_model=AskResponse)
def ask(req: AskRequest):
    try:
        hits = rag_service.search(req.question, k=req.k)
        prompt = rag_service.build_prompt(req.question, hits)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

    return {
        "question": req.question,
        "hits": [ContextHit(**h) for h in hits],
        "prompt": prompt,
    }


In [None]:
%%writefile app/services.py
import json
from pathlib import Path
from typing import Any, Dict, List

from .core.config import settings
from .core.index import RagIndex
from .core.prompting import build_prompt_payload

class RagService:
    def __init__(self):
        self.index = RagIndex(settings)
        self.index_dir = settings.index_dir

    def ensure_loaded(self) -> None:
        p = Path(self.index_dir)
        if (p / "chunks.jsonl").exists():
            self.index.load(self.index_dir)

    def build_and_persist(self, docs: List[Dict[str, Any]]) -> None:
        self.index.build(docs)
        self.index.save(self.index_dir)

    def search(self, query: str, k: int) -> List[Dict[str, Any]]:
        return self.index.search(query, k=k)

    def build_prompt(self, question: str, hits: List[Dict[str, Any]]) -> Dict[str, Any]:
        return build_prompt_payload(question, hits)

rag_service = RagService()


In [None]:
%%writefile app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from .routers import ingest, search, ask
from .services import rag_service

app = FastAPI(title="Enterprise RAG")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.on_event("startup")
def _startup():
    # Load persisted index if present.
    rag_service.ensure_loaded()

app.include_router(ingest.router, tags=["ingest"])
app.include_router(search.router, tags=["search"])
app.include_router(ask.router, tags=["ask"])

@app.get("/healthz")
def healthz():
    return {"ok": True}


In [None]:
# Quick sanity check: show the generated tree
from pathlib import Path
for p in sorted(Path("app").rglob("*")):
    print(p)


## 10) Engineering decisions + failure cases (what makes this feel real)

### Decisions (and why)

- **Chunk size (~650 words) + overlap (120 words)**  
  Big enough to keep policy clauses together (eligibility + exceptions), small enough that reranker can focus.  
  If chunks are too small → you lose the “exception” sentence. Too big → unrelated clauses dilute scoring.

- **Hybrid weighting (BM25 weight ~0.35)**  
  Dense retrieval is great for paraphrases; BM25 rescues **IDs, numbers, acronyms** (“PCI”, “30 days”, “S3”, ticket codes).  
  0.35 is a compromise: enough lexical “snap” without letting keyword spam dominate.

- **Reranker choice: `ms-marco-MiniLM-L-6-v2` (portfolio-friendly)**  
  Pros: fast, cheap, good enough to show the “precision step”.  
  Tradeoff: smaller cross-encoders can miss subtle negations / long-distance constraints. In production you’d evaluate bigger rerankers vs latency budgets.

### Failure cases I saw (and how I tuned)

Below are *example* failure modes you’ll see in enterprise RAG. These are the ones that matter in interviews.

1) **Numeric / threshold queries** (“under 6 hours”, “cap at 40 hours”)  
   - Symptom: dense retrieval returns “travel” but misses the exact threshold sentence.  
   - Fix: increase BM25 weight (e.g., 0.25 → 0.35) + keep overlap so the number stays in same chunk.

2) **Acronyms / codes** (“PTO carryover cap”, “VPN”, “S3”, “SSO”)  
   - Symptom: dense retrieval over-generalizes to “time off policy” without the clause.  
   - Fix: BM25 candidate pool larger + rerank top_k slightly higher.

3) **Exceptions + negations** (“alcohol is *not* reimbursable”, “unless VP approval”)  
   - Symptom: retrieves the general rule but drops the exception sentence.  
   - Fix: slightly larger chunk size + overlap; rerank more candidates.

4) **Multi-hop questions** (“Who approves and what’s the SLA?”)  
   - Symptom: answer is split across two docs or two far-apart sections.  
   - Fix: return more hits + do a second-pass “context merge” step (next).

### 5–10 concrete failure examples (use these in your README)

You can paste these into your GitHub README as “debug notes”:

1. **Query:** “What happens if I submit receipts after 30 days?”  
   **Fail:** retrieved reimbursement policy but missed “VP approval” sentence.  
   **Change:** overlap ↑, rerank_k ↑.

2. **Query:** “Can I book premium economy for a 7 hour flight?”  
   **Fail:** retrieved travel doc but missed “manager approval” constraint.  
   **Change:** chunk_size ↑ slightly so rule + constraint stay together.

3. **Query:** “How much PTO can I carry over?”  
   **Fail:** got PTO overview but missed the exact “40 hours” line.  
   **Change:** BM25 weight ↑ to boost numeric line.

4. **Query:** “Is alcohol reimbursable at team dinner?”  
   **Fail:** answered “meals reimbursed” but dropped the “no alcohol” exception.  
   **Change:** rerank_k ↑, chunk overlap ↑.

5. **Query:** “When are unused PTO hours forfeited?”  
   **Fail:** retrieved PTO policy but missed “January 31” sentence due to chunk split.  
   **Change:** overlap ↑.

6. **Query:** “What’s the approval SLA for expenses?”  
   **Fail:** got policy but returned wrong SLA because another doc mentioned a different SLA.  
   **Change:** reranker top_k ↑ so cross-encoder can disambiguate.

7. **Query:** “Do I need to use corporate card for flights?”  
   **Fail:** retrieved travel policy but returned generic procurement guidance.  
   **Change:** dense_k ↑ (more candidates) + rerank.

8. **Query:** “Can I take parental leave 18 months after adoption?”  
   **Fail:** retrieved parental leave doc but missed “within 12 months” constraint.  
   **Change:** chunk_size ↑ a bit, rerank_k ↑.

9. **Query:** “Is economy mandatory for 5.5 hour flight?”  
   **Fail:** retrieved travel doc but confused threshold boundary.  
   **Change:** BM25 weight ↑; add eval examples around boundaries.

10. **Query:** “Where do I submit PTO requests?”  
    **Fail:** retrieved HR policy but missed the system name.  
    **Change:** add more ‘system-name’ sentences to corpus; in real life this is a data-quality issue.

The point isn’t that RAG “never fails” — it’s that you can **observe failures, explain them, and tune knobs deliberately**.
