## Policy Document Indexing for RAG

We start by creating the following functions: ```clean_text``` & ```load_files```. When looking at the given pdfs, we observe that the pdf is clean. There is no number of pages, images any extra context that should be removed. However we create a function that would lower the text, removes multiple spaces, etc. We decided not to get rid of punctions as they seem to be relevant. When it comes to the function ```load_files```, we assume that in the future the folder will be filled with other file extensions as well. Thus we create a generic function that would look at pdfs and other extensions as well.

In [21]:
import os
import re
import string
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from langchain_core.documents import Document
from typing import List
from langchain_community.document_loaders import TextLoader, Docx2txtLoader, CSVLoader

# Load environment variables from .env file
load_dotenv()

# Path to the documents
dir_path = 'assets/documents/'

# -------------------------------
# Text cleaning function
# -------------------------------
# Function to clean and remove noise from text
# We observe that the pdfs don't contain any page numbers, or images
def clean_text(text: str, lowercase: bool = True, remove_punct: bool = False) -> str:
    """
    Cleans extracted text for preprocessing:
    - Lowercase (optional)
    - Remove line breaks, tabs
    - Remove punctuation (optional)
    - Normalize spaces
    """
    if not text:
        return ""
    
    # Convert to lowercase
    if lowercase:
        text = text.lower()

    # Replace newlines and tabs with space 
    text = text.replace("\n", " ").replace("\t", " ")
    
    if remove_punct:
        text = text.translate(str.maketrans("", "", string.punctuation))
    
    # Remove multiple spaces
    text = re.sub(r"\s+", " ", text)
    
    return text.strip()

# -------------------------------
# File loader
# -------------------------------
def load_files(path: str) -> list[Document]:
    _, file_extension = os.path.splitext(path)
    file_extension = file_extension.lower()

    if file_extension == '.pdf':
        reader = PdfReader(path)
        all_text = "".join((p.extract_text() or "") for p in reader.pages)
        cleaned = clean_text(all_text, lowercase=True, remove_punct=False)
        return [Document(page_content=cleaned, metadata={"source": path})]

    elif file_extension == '.txt':
        docs = TextLoader(path, encoding='utf8').load()
        for d in docs:
            d.page_content = clean_text(d.page_content)
        return docs

    elif file_extension == '.docx':
        docs = Docx2txtLoader(path).load()
        for d in docs:
            d.page_content = clean_text(d.page_content)
        return docs
        
    elif file_extension == '.csv':
        docs = CSVLoader(path).load()
        for d in docs:
            d.page_content = clean_text(d.page_content)
        return docs

    else:
        raise ValueError(f"Unsupported file type: {file_extension}")

# -------------------------------
# Usage example
# -------------------------------
files = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]

# Collect all loaded documents
all_documents = []
for filename in files:
    full_path = os.path.join(dir_path, filename)
    try:
        docs = load_files(full_path)
        all_documents.extend(docs)
        print(f"Loaded & cleaned {filename}")
    except ValueError as e:
        print(e)

print(f"\nTotal loaded documents: {len(all_documents)}")

Loaded & cleaned tuition-reimbursement-policy.pdf
Loaded & cleaned health-insurance-policy.pdf
Loaded & cleaned work-from-home-policy.pdf
Loaded & cleaned gym-policy.pdf
Loaded & cleaned vacation-policy.pdf
Loaded & cleaned 401k-retirement-policy.pdf
Loaded & cleaned life-insurance-policy.pdf
Loaded & cleaned childcare-policy.pdf

Total loaded documents: 8


After loading and cleaning the documents, we split them into chunks. Firstly, we tried a function that would split the documents into section, to have another source of metadata - to refet to the document and a specific section, however if there are documents which have sections that are very long, that doesn't seem like a proper option. So we use ```RecursiveCharacterTextSplitter``` wuth chunk_overlap to keep the context between chunks and not lose meaning.

Afterwards, we define a function ```create_chroma_collection``` that would create a vector store using openai embeddings.  

In [None]:

# We start by splitting the document into sections for later text preprocessing
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Suppose `documents` is what you loaded from load_files()
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,    # max characters per chunk
    chunk_overlap=200,  # overlap between chunks (keeps context)
)

split_docs = splitter.split_documents(all_documents)

print(f"Original docs: {len(all_documents)}")
print(f"Split docs: {len(split_docs)}")

# Show first 2 chunks
for i, d in enumerate(split_docs[:2], 1):
    print(f"\n--- Chunk {i} ---")
    print(d.page_content[:300], "...")
    print("Metadata:", d.metadata)

from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings

# Create a new Chroma collection & split documents into chunks
def create_chroma_collection(
    name: str, 
    documents: List[Document], 
    directory: str
) -> Chroma:
    """
    Create or overwrite a Chroma collection with given documents.

    Args:
        name (str): Name of the collection.
        documents (List[Document]): List of LangChain Document objects.
        directory (str): Directory where the collection is persisted.

    Returns:
        Chroma: The created Chroma vectorstore.
    """
    persist_directory = os.path.join(directory, name)
    os.makedirs(persist_directory, exist_ok=True)

    embeddings = OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY"))

    # Create collection and persist it
    collection = Chroma.from_documents(
        documents=documents,
        embedding=embeddings,
        collection_name=name,
        persist_directory=persist_directory
    )
    collection.persist()
    return collection

collection = create_chroma_collection(
    name="benefits_collection",
    documents=split_docs,
    directory="./persist"
)

print("Collection created and persisted.")

Last functions to be created are ```load_chroma_collection```, ```add_documents_to_collection``` and ```load_retriever_from_collection```. 

With adding documents to the collection we use incremental updates. New PDFs, DOCX files, or CSVs may arrive over time. Instead of rebuilding the entire collection from scratch, we can add only the new documents. This saves time and computation, especially for large collections. Preserve embeddings for existing docs. Lastly, we can aggregate multiple new documents and add them in one go, improving efficiency.

```load_retriever_from_collection``` helps not to recreate the existing vectorstore when restarting the script. The function had configurable retrieval parameters, where we can set things like score_threshold, search_type, or top_k when loading the retriever. This allows us to tune retrieval behavior without changing the underlying vectorstore.

In [None]:

# Load the collection
def load_chroma_collection(name: str, directory: str) -> Chroma:
    """
    Load an existing Chroma collection.

    Args:
        name (str): Name of the collection.
        directory (str): Directory where the collection is persisted.

    Returns:
        Chroma: The loaded Chroma vectorstore.
    """
    persist_directory = os.path.join(directory, name)
    if not os.path.exists(persist_directory):
        raise ValueError(f"Collection '{name}' does not exist in '{directory}'.")

    embeddings = OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY"))

    collection = Chroma(
        collection_name=name,
        embedding_function=embeddings,
        persist_directory=persist_directory
    )
    return collection

# Add documents to the collection
def add_documents_to_collection(collection: Chroma, new_documents: List[Document]) -> None:
    """
    Add new documents to an existing Chroma collection.

    Args:
        collection (Chroma): The Chroma vectorstore to add documents to.
        new_documents (List[Document]): List of new LangChain Document objects to add.
    """
    if not new_documents:
        print("No new documents to add.")
        return

    collection.add_documents(new_documents)
    collection.persist()
    print(f"Added {len(new_documents)} documents to the collection and persisted changes.")
    
# Load retriever from the collection
def load_retriever_from_collection(
    collection_name: str,
    search_type: str = "similarity_score_threshold",
    score_threshold: float = 0.3,
    top_k: int = 5
):
    """
    Load a retriever from a Chroma collection with configurable retrieval behavior.

    Args:
        collection_name (str): Name of the Chroma collection.
        search_type (str): Retrieval type (similarity_score_threshold or mmr).
        score_threshold (float): Minimum similarity score for retrieval.
        top_k (int): Number of documents to return.

    Returns:
        Retriever: Configured retriever.
    """

    # Load the persisted collection
    collection = load_chroma_collection(name=collection_name, directory="./persist")
    
    # Build retriever with configurable behavior
    retriever = collection.as_retriever(
        search_type=search_type,
        search_kwargs={
            "score_threshold": score_threshold,
            "k": top_k
        }
    )
    return retriever

Example of dynamically storing documents:
The script will run only if there is a new file to be added

In [None]:
# Load existing collection
collection = load_chroma_collection("benefits_collection", "./persist")

# Load new PDFs
new_docs = []
new_files = ["assets/new_policy.pdf"]
for f in new_files:
    new_docs.extend(load_files(f))

# Add to collection
add_documents_to_collection(collection, new_docs)

print("Collection updated with new documents!")

In [26]:
retriever = load_retriever_from_collection("benefits_collection", score_threshold = 0.6, top_k=3)


queries = [
    "What's the maternity leave policy?",
    "What is the eligibility for Tuition Reimbursement",
    "How much can employees contribute to 401-k?",
    "Do I have to manually enroll for 401-k?",
    "I work in Finance, can I work remotely?"
]

for i in queries:
    print(f"\n\nQuery: {i}")
    query = i
    results = retriever.get_relevant_documents(query)

    print(f" Found {len(results)} results")
    for i, r in enumerate(results, 1):
        print(f"\n--- Result {i} ---")
        print(r.page_content[:300], "...")
        print("Metadata:", r.metadata)




Query: What's the maternity leave policy?
 Found 3 results

--- Result 1 ---
of paid maternity leave, while non-birth parents receive six weeks of paid paternity leave. adoptive parents receive eight weeks of paid leave that can be shared between both parents. employees must have been with the company for at least 12 months to qualify for paid parental leave, though unpaid l ...
Metadata: {'source': 'assets/documents/childcare-policy.pdf'}

--- Result 2 ---
of paid maternity leave, while non-birth parents receive six weeks of paid paternity leave. adoptive parents receive eight weeks of paid leave that can be shared between both parents. employees must have been with the company for at least 12 months to qualify for paid parental leave, though unpaid l ...
Metadata: {'source': 'assets/documents/childcare-policy.pdf'}

--- Result 3 ---
for ﬁnding specialized care providers in the community. this policy is eﬀective as of [current date] and may be modiﬁed as business needs and legal req

## Try different types of retrieval

### Basic

In [None]:

queries = [
    "What's the maternity leave policy?",
    "What is the eligibility for Tuition Reimbursement",
    "How much can employees contribute to 401-k?",
    "Do I have to manually enroll for 401-k?",
    "I work in Finance, can I work remotely?"
]

for q in queries:
    results = vectorstore.similarity_search(q, k=3)
    print(f"Query: {q}\n")
    
    for i, doc in enumerate(results):
        source = doc.metadata.get("source", "unknown")
        section = doc.metadata.get("section", "unknown")
        print(f"Result {i+1} (from {source}, section: {section}):\n{doc.page_content}\n")
    
    print("="*50 + "\n")


### Similarity search with score

In [None]:
query = "What's the maternity leave policy?"

results = vectorstore.similarity_search_with_score(query, k=5)

for i, (doc, score) in enumerate(results):
    print(f"\n{i+1}. {doc.metadata['source']} - {doc.metadata['section']} (score={score:.4f})")
    print(doc.page_content[:300], "...\n")

### Max Marginal Relevance (MMR) search

In [None]:
results = vectorstore.max_marginal_relevance_search(query, k=5, fetch_k=15)

for i, doc in enumerate(results):
    print(f"\n{i+1}. {doc.metadata['source']} - {doc.metadata['section']}")
    print(doc.page_content[:300], "...\n")


### Advanced RAG Methods

Metadata Filtering - Useful if we want to use only specific files for our answers or we want to search in specific section of the data.

In [None]:
# Metadata Filtering
query = "What is the maternity leave policy?"

# Filter chunks where file = 'childcare-policy.pdf'
results = vectorstore.similarity_search(
    query, 
    k=3,
    filter={"source": "vacation-policy.pdf"}  # Metadata filter
)

for i, doc in enumerate(results):
    print(f"{i+1}. {doc.metadata['source']} - {doc.metadata['section']}")
    print(doc.page_content, "\n")


Query exapnsion - Automatically expand your query with related terms to improve retrieval.

In [None]:
query = "Maternity leave policy"

# Simple query expansion (you could also use an LLM to generate expansions)
expanded_terms = ["parental leave", "pregnancy leave", "childcare leave"]
expanded_query = query + ", " + ", ".join(expanded_terms)

results = vectorstore.similarity_search(expanded_query, k=3)

for i, doc in enumerate(results):
    print(f"{i+1}. {doc.metadata['source']} - {doc.metadata['section']}")
    print(doc.page_content, "\n")


HyDE - Generate a “hypothetical answer” for the query, then retrieve documents closest to that answer.

In [None]:
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)

# Step 1: Generate hypothetical answer
prompt = f"Generate a concise hypothetical answer to this question: '{query}'"
hypothetical_answer = llm.predict(prompt)

# Step 2: Retrieve documents using embedding of the hypothetical answer
embedding_fn = OpenAIEmbeddings()
hypothetical_vector = embedding_fn.embed_query(hypothetical_answer)

# Chroma supports querying via embedding directly
results = vectorstore.similarity_search_by_vector(hypothetical_vector, k=3)

for i, doc in enumerate(results):
    print(f"{i+1}. {doc.metadata['source']} - {doc.metadata['section']}")
    print(doc.page_content, "\n")


# Make the code more modualar

### Create a function that will check last modified time of the files and if it is not new we won't need to re-create the vector store
Below there are functions that are included in ```rag.py``` - they check for the vector store and if it is up to date with pdf documents in a given folder. If no, the store is recreated. Furthermore, there is a processing of the documents.

In [None]:
import os
import re
import string
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import json
from datetime import datetime
from langchain_community.vectorstores import Chroma
from langchain.embeddings.openai import OpenAIEmbeddings

load_dotenv()  # Load environment variables from .env

class PDFProcessor:
    """
    Processes PDF documents:
    - Extracts raw text
    - Splits text into sections based on headings
    - Cleans text for NLP
    - Splits text into chunks with metadata
    """
    def __init__(self, pdf_folder: str, chunk_size: int = 500, chunk_overlap: int = 50):
        self.pdf_folder = pdf_folder
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap
        )
        self.files = [f for f in os.listdir(pdf_folder) if f.lower().endswith(".pdf")]

    def extract_text(self, file_path: str) -> str:
        """Extracts raw text from a PDF using PyPDF2."""
        text = ""
        reader = PdfReader(file_path)
        for page in reader.pages:
            text += page.extract_text() or ""
        return text

    def split_into_sections(self, text: str) -> dict:
        """Splits text into sections based on detected headings."""
        lines = text.splitlines()
        sections = {}
        current_heading = "Document"
        current_content = []

        for line in lines:
            stripped = line.strip()
            if stripped and len(stripped.split()) <= 6 and stripped[0].isupper() and not stripped.endswith("."):
                if current_content:
                    sections[current_heading] = " ".join(current_content).strip()
                current_heading = stripped
                current_content = []
            else:
                current_content.append(stripped)

        if current_content:
            sections[current_heading] = " ".join(current_content).strip()

        return sections

    def clean_text(self, text: str, lowercase: bool = True) -> str:
        """Cleans text: lowercases, removes punctuation, normalizes spaces."""
        if lowercase:
            text = text.lower()
        text = text.replace("\n", " ").replace("\t", " ")
        text = text.translate(str.maketrans("", "", string.punctuation))
        text = re.sub(r"\s+", " ", text)
        return text.strip()

    def process_pdfs(self):
        """
        Processes all PDFs in the folder:
        - Extracts text
        - Splits into sections
        - Splits sections into chunks
        - Returns chunks and metadata
        """
        all_chunks = []
        all_metadatas = []

        for file in self.files:
            pdf_path = os.path.join(self.pdf_folder, file)
            raw_text = self.extract_text(pdf_path)
            sections = self.split_into_sections(raw_text)

            for section_title, content in sections.items():
                cleaned_content = self.clean_text(content)
                chunks = self.text_splitter.split_text(cleaned_content)
                all_chunks.extend(chunks)
                all_metadatas.extend([{"source": file, "section": section_title}] * len(chunks))

        return all_chunks, all_metadatas


In [None]:
# rag.py
# Contains PDFProcessor and VectorStoreManager definitions

# main.py
from rag import PDFProcessor, VectorStoreManager
from langchain.embeddings.openai import OpenAIEmbeddings
import os
from dotenv import load_dotenv

load_dotenv()

PDF_FOLDER = "assets/documents"
PERSIST_DIR = "./persist"
embeddings = OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY"))

processor = PDFProcessor(pdf_folder=PDF_FOLDER)
chunks, metadatas = processor.process_pdfs()

manager = VectorStoreManager(
    pdf_folder=PDF_FOLDER,
    persist_dir=PERSIST_DIR,
    embeddings=embeddings,
    chunks=chunks,
    metadatas=metadatas
)

vectorstore = manager.load_or_create()


In [None]:
class SearchHelper:
    def __init__(self, vectorstore):
        self.vectorstore = vectorstore

    def search(self, query: str, method: str = "similarity", k: int = 3, expanded_terms=None, filter_dict=None):
        """
        Runs different search strategies on the vectorstore.
        
        Args:
            query (str): main query
            method (str): one of ["similarity", "similarity_score", "mmr", "expansion", "filter"]
            k (int): number of results to return
            expanded_terms (list[str]): optional extra terms for expansion
            filter_dict (dict): optional metadata filter

        Returns:
            list of documents (or docs+scores if method="similarity_score")
        """
        if method == "similarity":
            return self.vectorstore.similarity_search(query, k=k)

        elif method == "similarity_score":
            return self.vectorstore.similarity_search_with_score(query, k=k)

        elif method == "mmr":
            return self.vectorstore.max_marginal_relevance_search(query, k=k, fetch_k=15)

        elif method == "expansion":
            terms = [query] + (expanded_terms or [])
            all_results = []
            for term in terms:
                all_results.extend(self.vectorstore.similarity_search(term, k=k))
            # Deduplicate by (source, content)
            unique_results = { (doc.metadata['source'], doc.page_content): doc for doc in all_results }
            return list(unique_results.values())

        elif method == "filter":
            return self.vectorstore.similarity_search(query, k=k, filter=filter_dict or {})

        else:
            raise ValueError(f"Unknown search method: {method}")

    @staticmethod
    def pretty_print(results, with_score=False):
        """Helper to print results cleanly."""
        for i, item in enumerate(results):
            if with_score:
                doc, score = item
                print(f"\n{i+1}. {doc.metadata.get('source','?')} - {doc.metadata.get('section','?')} (score={score:.4f})")
                print(doc.page_content[:300], "...\n")
            else:
                doc = item
                print(f"\n{i+1}. {doc.metadata.get('source','?')} - {doc.metadata.get('section','?')}")
                print(doc.page_content[:300], "...\n")

In [None]:
searcher = SearchHelper(vectorstore)

# 1. Simple similarity
results = searcher.search("Maternity leave policy", method="similarity", k=3)
searcher.pretty_print(results)

# 2. Similarity with score
results = searcher.search("Maternity leave policy", method="similarity_score", k=5)
searcher.pretty_print(results, with_score=True)

# 3. MMR
results = searcher.search("Maternity leave policy", method="mmr", k=5)
searcher.pretty_print(results)

# 4. Query expansion
results = searcher.search("Maternity leave policy", method="expansion", expanded_terms=["parental leave", "pregnancy leave"])
searcher.pretty_print(results)

# 5. With metadata filter
results = searcher.search("Maternity leave policy", method="filter", filter_dict={"section": "HR Policies"})
searcher.pretty_print(results)


In [None]:
from openai import OpenAI
client = OpenAI()

def expand_query(query: str, n_terms: int = 5) -> list[str]:
        """
        Use LLM to generate related terms for query expansion.
        """
        prompt = f"""
        Generate {n_terms} synonyms of the core word/phrase of the following query for use in document retrieval. Keep them short, noun-phrases.

        Query: "{query}"
        """
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role":"user","content": prompt}],
            max_tokens=100
        )
        text = response.choices[0].message.content.strip()
        return [t.strip("-• ") for t in text.split("\n") if t.strip()]
exp_terms = expand_query("Maternity leave policy")
results = searcher.search("Maternity leave policy", method="expansion", expanded_terms=exp_terms)
searcher.pretty_print(results)