In [None]:
# 1. Install Dependencies
!pip install langchain faiss-cpu sentence-transformers transformers pypdf rank_bm25 rouge-score nltk -U langchain-community pymupdf

import os, re, time, textwrap
import numpy as np
from typing import List, Tuple, Union, Dict, Any
from sentence_transformers import SentenceTransformer
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.document_loaders import PyMuPDFLoader
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter, TokenTextSplitter
from rank_bm25 import BM25Okapi
from transformers import pipeline as hf_pipeline
from transformers import AutoTokenizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from rouge_score import rouge_scorer
from nltk.tokenize import sent_tokenize
import torch
import faiss
import nltk
nltk.download('punkt_tab')


Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting sentence-transformers
  Downloading sentence_transformers-4.1.0-py3-none-any.whl.metadata (13 kB)
Collecting pypdf
  Downloading pypdf-5.4.0-py3-none-any.whl.metadata (7.3 kB)
Collecting rank_bm25
  Downloading rank_bm25-0.2.2-py3-none-any.whl.metadata (3.2 kB)
Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting langchain-community
  Downloading langchain_community-0.3.21-py3-none-any.whl.metadata (2.4 kB)
Collecting pymupdf
  Downloading pymupdf-1.25.5-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (3.4 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.9.1-py3-none-any.whl.met

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [None]:
from google.colab import files
uploaded = files.upload()

Saving 13. Atlas of Diabetes Mellitus (3rd Edition).pdf to 13. Atlas of Diabetes Mellitus (3rd Edition).pdf


In [None]:
CACHE_DIR = "/content/cache"
os.makedirs(CACHE_DIR, exist_ok=True)
device = "cuda" if torch.cuda.is_available() else "cpu"

DataProcessor class that handles loading, cleaning, and chunking text data from PDF files using various strategies (recursive, sentence-based, paragraph-based, or token-based). The class returns a list of Document chunks, each containing clean, structured text ready for vectorization and retrieval in the RAG pipeline. Documents need to be broken down into manageable, meaningful chunks that preserve context. This class ensures those chunks are:
Clean (no noise or formatting issues),
Structured (sentence/paragraph boundaries),
Labeled (metadata added for tracking origin).

Cleans each page's text via _clean_text() (removing extra whitespace).
Adds file metadata (source_file) to trace chunk origins.
Based on the selected chunking_strategy, it chooses an appropriate method to split pages:
1) "recursive": uses LangChains recursive splitter.
2) "token": uses LangChains token splitter.
3) "sentence": uses _sentence_based_chunking() (defined below).
4) "paragraph": uses _paragraph_based_chunking() (defined below).

1) _clean_text(...)
Cleans text by collapsing multiple newlines/spaces and trimming extra whitespace.

2) _sentence_based_chunking(...)
Splits text into sentences using NLTK.
Groups sentences into chunks of a maximum combined character size.
Adds overlap between chunks by reusing the tail end of the previous chunk.

3) _paragraph_based_chunking(...)
Splits text into paragraphs using double newlines as a delimiter.
Groups paragraphs into chunks based on total length.
Also includes overlap to maintain context continuity across chunks.

In [None]:
# 2. Data Processor

class DataProcessor:
    @staticmethod
    def load_and_chunk(file_paths: Union[str, List[str]], chunking_strategy: str = "recursive", chunk_size: int = 512, chunk_overlap: int = 64) -> List[Document]:
        all_chunks = []
        for file_path in file_paths:
            loader = PyMuPDFLoader(file_path)
            pages = loader.load()
            for page in pages:
                page.page_content = DataProcessor._clean_text(page.page_content)
                page.metadata['source_file'] = os.path.basename(file_path)

            if chunking_strategy == "recursive":
                splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
                chunks = splitter.split_documents(pages)
            elif chunking_strategy == "token":
                splitter = TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
                chunks = splitter.split_documents(pages)
            elif chunking_strategy == "sentence":
                chunks = DataProcessor._sentence_based_chunking(pages, chunk_size, chunk_overlap)
            elif chunking_strategy == "paragraph":
                chunks = DataProcessor._paragraph_based_chunking(pages, chunk_size, chunk_overlap)
            else:
                raise ValueError("Unknown chunking strategy")

            all_chunks.extend(chunks)
        return all_chunks

    @staticmethod
    def _clean_text(text: str) -> str:
        text = re.sub(r'\n+', '\n', text)
        text = re.sub(r' +', ' ', text)
        return text.strip()

    @staticmethod
    def _sentence_based_chunking(pages: List[Document], chunk_size: int, chunk_overlap: int) -> List[Document]:
        chunks = []
        for page in pages:
            sentences = sent_tokenize(page.page_content)
            current_chunk = []
            current_size = 0
            for sentence in sentences:
                sentence_size = len(sentence)
                if current_size + sentence_size > chunk_size and current_chunk:
                    chunk_text = " ".join(current_chunk)
                    chunks.append(Document(page_content=chunk_text, metadata=page.metadata))
                    current_chunk = current_chunk[-chunk_overlap:]
                    current_size = sum(len(s) for s in current_chunk)
                current_chunk.append(sentence)
                current_size += sentence_size
            if current_chunk:
                chunk_text = " ".join(current_chunk)
                chunks.append(Document(page_content=chunk_text, metadata=page.metadata))
        return chunks

    @staticmethod
    def _paragraph_based_chunking(pages: List[Document], chunk_size: int, chunk_overlap: int) -> List[Document]:
        chunks = []
        for page in pages:
            paragraphs = re.split(r'\n\s*\n', page.page_content)
            paragraphs = [p for p in paragraphs if p.strip()]
            current_chunk = []
            current_size = 0
            for paragraph in paragraphs:
                paragraph_size = len(paragraph)
                if current_size + paragraph_size > chunk_size and current_chunk:
                    chunk_text = "\n\n".join(current_chunk)
                    chunks.append(Document(page_content=chunk_text, metadata=page.metadata))
                    current_chunk = current_chunk[-chunk_overlap:]
                    current_size = sum(len(p) for p in current_chunk)
                current_chunk.append(paragraph)
                current_size += paragraph_size
            if current_chunk:
                chunk_text = "\n\n".join(current_chunk)
                chunks.append(Document(page_content=chunk_text, metadata=page.metadata))
        return chunks


This AdvancedRetriever class is responsible for retrieving the most relevant document chunks based on a user's query, using multiple search strategies. It enables dense (semantic), sparse (keyword-based), and hybrid retrieval, making it flexible and robust across different query types.

Upon initialization, the class accepts a list of preprocessed chunks and a biomedical embedding model (pritamdeka/S-PubMedBert-MS-MARCO) tailored for medical text. The _create_indices() method computes dense embeddings for each chunk and creates multiple FAISS indexes: IP for cosine similarity, L2 for Euclidean distance, and HNSW for fast approximate search. It also sets up BM25 and TF-IDF indexes for sparse (keyword-based) retrieval. These indexes enable the retriever to support diverse search strategies depending on the use case or model preference.

The search() method acts as a unified interface to run any of the five supported retrieval strategies by specifying the method argument. The class is especially useful in RAG systems where accurate, flexible document retrieval significantly affects answer quality

Dense models understand semantic meaning, while sparse models catch exact keyword matches. This retriever blends both worlds and enables evaluation or ensemble methods (like hybrid/RRF), improving robustness across query types, especially in sensitive domains like healthcare.

Searches used:

1) Dense: _dense_search() -> Uses PubMedBERT to embed the query and chunks -> retrieves using cosine similarity (via FAISS IP index). Best for understanding semantics.
2) Sparse (BM25): _sparse_search() -> Tokenizes query and compares to documents using traditional term frequency logic. Good for exact keyword match. Fast.
3) TF-IDF: _tfidf_search() -> Weights rare terms higher ->  calculates cosine similarity of TF-IDF vectors. Often better for rare/technical keywords.
4) HNSW: _hnsw_search() ->	Approximate nearest neighbor search using graph structure. Much faster on large datasets. Slightly less accurate.
5) Hybrid (RRF): _hybrid_search() -> Combines dense and sparse scores using Reciprocal Rank Fusion. Helps when either method alone is insufficient. More balanced.

In [None]:
# 3. Advanced Retriever

class AdvancedRetriever:
    def __init__(self, chunks: List[Document], embedding_model: str = "pritamdeka/S-PubMedBert-MS-MARCO"):
        self.chunks = chunks
        print(f"Loading dense embedding model: {embedding_model}...")
        self.dense_model = SentenceTransformer(embedding_model, cache_folder=CACHE_DIR, device=device)

    def _create_indices(self):
        texts = [chunk.page_content for chunk in self.chunks]
        self.embeddings = self.dense_model.encode(texts, show_progress_bar=True, device=str(device))

        dimension = self.embeddings.shape[1]
        self.index_ip = faiss.IndexFlatIP(dimension)
        self.index_l2 = faiss.IndexFlatL2(dimension)
        self.index_ip.add(self.embeddings.astype('float32'))
        self.index_l2.add(self.embeddings.astype('float32'))

        print("Creating HNSW index...")
        self.index_hnsw = faiss.IndexHNSWFlat(dimension, 32)
        self.index_hnsw.add(self.embeddings.astype('float32'))

        print("Creating BM25 + TF-IDF indices...")
        tokenized = [text.split() for text in texts]
        self.bm25_index = BM25Okapi(tokenized)
        self.tfidf_vectorizer = TfidfVectorizer()
        self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(texts)

    def search(self, query: str, k: int = 5, method: str = "hybrid") -> List[Document]:
        if method == "dense":
            return self._dense_search(query, k)
        elif method == "sparse":
            return self._sparse_search(query, k)
        elif method == "tfidf":
            return self._tfidf_search(query, k)
        elif method == "hnsw":
            return self._hnsw_search(query, k)
        else:
            return self._hybrid_search(query, k)

    def _dense_search(self, query: str, k: int) -> List[Document]:
        emb = self.dense_model.encode([query], device=str(device))[0].astype('float32').reshape(1, -1)
        _, indices = self.index_ip.search(emb, k)
        return [self.chunks[i] for i in indices[0]]

    def _sparse_search(self, query: str, k: int) -> List[Document]:
        scores = self.bm25_index.get_scores(query.split())
        indices = np.argsort(scores)[::-1][:k]
        return [self.chunks[i] for i in indices]

    def _tfidf_search(self, query: str, k: int) -> List[Document]:
        vec = self.tfidf_vectorizer.transform([query])
        scores = cosine_similarity(vec, self.tfidf_matrix)[0]
        indices = np.argsort(scores)[::-1][:k]
        return [self.chunks[i] for i in indices]

    def _hnsw_search(self, query: str, k: int) -> List[Document]:
        emb = self.dense_model.encode([query], device=str(device))[0].astype('float32').reshape(1, -1)
        _, indices = self.index_hnsw.search(emb, k)
        return [self.chunks[i] for i in indices[0]]

    def _hybrid_search(self, query: str, k: int) -> List[Document]:
        dense = self._dense_search(query, k * 2)
        sparse = self._sparse_search(query, k * 2)

        scores = {}
        for rank, doc in enumerate(dense):
            idx = self.chunks.index(doc)
            scores[idx] = scores.get(idx, 0) + 1 / (60 + rank)
        for rank, doc in enumerate(sparse):
            idx = self.chunks.index(doc)
            scores[idx] = scores.get(idx, 0) + 1 / (60 + rank)

        sorted_indices = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:k]
        return [self.chunks[idx] for idx, _ in sorted_indices]


The ContextEnhancer class is responsible for refining the context retrieved by the retriever before passing it to the language model for answer generation. Its goal is to improve answer relevance, conciseness, and focus by preprocessing the raw retrieved chunks using several optional strategies:
1) Summarization
2) Reordering by relevance
3) Highlighting key terms
4) Removing redundancy

When a retriever fetches multiple chunks of information from the knowledge base, not all of it is equally useful or relevant. Raw context may be:
1) Verbose
2) Repetitive
3) Poorly ordered
4) Missing emphasis on query-related information

By enhancing the context:
1) The language model gets cleaner and more relevant input
2) It's more likely to produce faithful and concise answers
3) You reduce the chance of irrelevant or hallucinated content

1) _summarize_context(...)
What it does: Uses a summarization model (facebook/bart-large-cnn) to reduce long context into shorter, meaningful summaries.
Why it's useful: Keeps only the essential information, reducing token usage and model confusion.
Usecase: Makes large medical descriptions (e.g., from a textbook) digestible by the model.

2) _reorder_by_relevance(...)
What it does: Reorders the retrieved context based on how relevant each chunk is to the query.
Why it's useful: Pushes the most query-related information to the top so the model sees it first (especially helpful when context is long).
Usecase: If the query is about "insulin resistance," this method ensures chunks discussing insulin come first.

3) _highlight_key_info(...)
What it does: Adds visual emphasis by tagging sentences that contain query terms with *IMPORTANT:*.
Why it's useful: Directs the model's attention toward the most relevant parts of the context.
Usecase: Boosts LLM focus on critical keywords (e.g., "diabetes," "blood sugar").

4) _remove_redundancy(...)
What it does: Removes duplicate or near-duplicate sentences.
Why it's useful: Prevents wasted tokens and reduces confusion caused by repetitive information.
Usecase: Medical documents often have repeated warnings or definitions -> this removes that clutter.

final_context, meta = enhancer.enhance_context(retrieved_docs, query, methods=["reorder", "summarize", "highlight", "deduplicate"])
This line:
1) Prioritizes query-relevant info
2) Compresses it via summarization
3) Highlights key parts for the model
4) Removes noisy repetition

In [None]:
# 4. Context Enhancer

class ContextEnhancer:
    def __init__(self):
        print("Loading summarization model...")
        self.summarizer = hf_pipeline("summarization", model="facebook/bart-large-cnn", device_map="auto")
        self.tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn")

    def enhance_context(self, documents: List[Document], query: str, methods: List[str] = None) -> Tuple[str, Dict]:
        context_texts = [doc.page_content for doc in documents]
        context = "\n\n".join(context_texts)
        metadata = {"original_length": len(context), "methods_applied": methods}

        if "reorder" in methods:
            context, meta = self._reorder_by_relevance(context_texts, query)
            metadata["reordering"] = meta

        if "summarize" in methods:
            context, meta = self._summarize_context(context)
            metadata["summarization"] = meta

        if "highlight" in methods:
            context, meta = self._highlight_key_info(context, query)
            metadata["highlighting"] = meta

        if "deduplicate" in methods:
            context, meta = self._remove_redundancy(context)
            metadata["deduplication"] = meta

        metadata["final_length"] = len(context)
        return context, metadata

    def _summarize_context(self, context: str, query: str = "") -> Tuple[str, Dict]:
        chunks = textwrap.wrap(context, 1024, break_long_words=False, break_on_hyphens=False)
        summaries = []

        for chunk in chunks:
            token_length = len(self.tokenizer.encode(chunk, truncation=True))
            max_len = min(token_length // 2, 256)
            min_len = min(token_length // 4, 100)

            result = self.summarizer(
                chunk,
                max_length=max_len,
                min_length=min_len,
                do_sample=False
            )
            summaries.append(result[0]["summary_text"])

        summary = "\n\n".join(summaries)
        return summary, {
            "summary_length": len(summary),
            "chunks_summarized": len(chunks)
        }

    def _reorder_by_relevance(self, contexts: List[str], query: str) -> Tuple[str, Dict]:
        all_sentences = []
        sentence_to_chunk = {}
        for i, chunk in enumerate(contexts):
            for sent in sent_tokenize(chunk):
                if sent.strip():
                    all_sentences.append(sent)
                    sentence_to_chunk[sent] = i

        query_terms = set(query.lower().split())
        scored_sentences = sorted(all_sentences, key=lambda s: sum(t in s.lower() for t in query_terms), reverse=True)

        chunk_scores = {}
        for sent in scored_sentences:
            idx = sentence_to_chunk[sent]
            chunk_scores[idx] = chunk_scores.get(idx, 0) + 1

        sorted_chunks = sorted([(i, contexts[i], score) for i, score in chunk_scores.items()], key=lambda x: x[2], reverse=True)
        reordered_context = "\n\n".join([chunk for _, chunk, _ in sorted_chunks])
        return reordered_context, {"chunk_scores": chunk_scores}

    def _highlight_key_info(self, context: str, query: str) -> Tuple[str, Dict]:
        query_terms = set(query.lower().split())
        sentences = sent_tokenize(context)
        highlighted = [f"*IMPORTANT:* {s}" if any(t in s.lower() for t in query_terms) else s for s in sentences]
        return " ".join(highlighted), {"highlights_added": sum(s.startswith("*IMPORTANT:*") for s in highlighted)}

    def _remove_redundancy(self, context: str) -> Tuple[str, Dict]:
        sentences = sent_tokenize(context)
        seen = set()
        unique = []
        for s in sentences:
            norm = re.sub(r'[^\w]', '', s.lower())
            if norm not in seen:
                seen.add(norm)
                unique.append(s)
        return " ".join(unique), {"deduplicated_sentences": len(unique)}

The AdvancedAnswerGenerator class is designed to dynamically generate answers in a Retrieval-Augmented Generation (RAG) system using different pre-trained large language models (LLMs). It allows you to plug in various models like FLAN-T5, Qwen, Phi-2, and Falcon-7B and generate responses from retrieved, enhanced context. It also supports customizable prompting strategies for better alignment with medical domain tasks.

Different models behave differently:
1) Some are better at structured answering (e.g., FLAN-T5).
2) Others excel in creative dialogue (e.g., Qwen, Falcon).
3) Some are light and efficient (e.g., Phi-2).

1) flan-t5 -> good for text2text tasks, stable for structured answers.
2) qwen -> chat-like model, suitable for roleplay, dialogue, medical assistant prompts.
3) phi-2 -> lightweight but surprisingly capable, fast responses.
4) falcon-7b > strong open-ended generation, useful for free-form answers.

format_prompt(...)
Formats the prompt for the LLM based on the strategy.
Two strategies shown:
1) "medical_advisor": Adds an instruction like "You are a medical assistant..." to improve the model's understanding.
2) "standard": A generic format (context + question).

In [None]:
# 5. Answer Generator

class AdvancedAnswerGenerator:
    def __init__(self):
        self._loaded_models = {}

    def _get_model(self, model_type: str):
        if model_type not in self._loaded_models:
            print(f"Loading {model_type} model...")
            if model_type == "flan-t5":
                model = hf_pipeline("text2text-generation", model="google/flan-t5-base", device_map="auto")
            elif model_type == "qwen":
                model = hf_pipeline("text-generation", model="Qwen/Qwen1.5-0.5B-Chat", device_map="auto")
            elif model_type == "phi-2":
                model = hf_pipeline("text-generation", model="microsoft/phi-2", device_map="auto")
            elif model_type == "falcon-7b":
                model = hf_pipeline("text-generation", model="tiiuae/falcon-7b-instruct", device_map="auto", torch_dtype="auto")
            else:
                raise ValueError("Unsupported model type")
            self._loaded_models[model_type] = model
        return self._loaded_models[model_type]

    def generate(self, query: str, context: str, model_type: str = "flan-t5", prompt_strategy: str = "standard", max_new_tokens: int = 256, temperature: float = 0.7) -> Tuple[str, float]:
        model = self._get_model(model_type)
        prompt = self._format_prompt(query, context, model_type, prompt_strategy)

        start = time.time()
        if model_type == "flan-t5":
            result = model(prompt, max_length=max_new_tokens)[0]['generated_text']
        else:
            result = model(prompt, max_new_tokens=max_new_tokens, temperature=temperature)[0]['generated_text']
            if model_type == "qwen":
                match = re.search(r'<\|im_start\|>assistant\n(.*?)(?:<\|im_end\|>|$)', result, re.DOTALL)
                if match:
                    result = match.group(1).strip()
        elapsed = time.time() - start
        return result, elapsed

    def _format_prompt(self, query: str, context: str, model_type: str, strategy: str) -> str:
        if strategy == "medical_advisor":
            return f"""You are a medical assistant. Provide a clear and accurate answer.

    Context:
    {context}

    Question: {query}

    Answer:"""
        else:
            return f"Context:\n{context}\n\nQuestion: {query}"

    """prompt = f"Based on the context below, answer the question:\n\nContext:\n{final_context}\n\nQuestion: {query}"
answer = llm(prompt)
"""

The AdvancedRagEvaluator class is an automatic evaluation framework for assessing the quality of answers generated by a RAG system. It uses a combination of:
1) LLM-based scoring for human-like judgment (e.g., faithfulness, relevance)
2) Traditional NLP metrics like ROUGE for content overlap
3) Composite and overall scores to summarize answer quality

@property evaluator
Lazily loads a language model-based evaluator (Qwen1.5-0.5B-Chat).
Used to rate answer quality by prompting the model for a score from 1 to 5.
Loaded once, cached for future use (efficient design).

evaluate(...)
Input: answer, query, and its context.

Uses _get_llm_score() to score:
1) Relevance: Does the answer address the question?
2) Faithfulness: Is it supported by the context?

Optional extended metrics (full_metrics=True):
1) Conciseness: Is the answer short and to the point?
2) Completeness: Does the answer fully address all aspects of the question?
3) ROUGE: Lexical overlap with context (ROUGE-1, ROUGE-2, ROUGE-L).
4) Overall Quality: Average of all human-readable metrics (excluding ROUGE).

In [None]:
# 6. Evaluation Framework

class AdvancedRagEvaluator:
    def __init__(self):
        self._evaluator = None
        self.rouge_scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

    @property
    def evaluator(self):
        if self._evaluator is None:
            print("Loading evaluation model...")
            self._evaluator = hf_pipeline("text-generation", model="Qwen/Qwen1.5-0.5B-Chat", device_map="auto")
        return self._evaluator

    def evaluate(self, answer: str, context: str, query: str, full_metrics: bool = True) -> Dict[str, float]:
        metrics = {}
        metrics["relevance"] = self._get_llm_score(f"Rate answer relevance to question (1-5):\nQuestion: {query}\nAnswer: {answer}\nScore:")
        metrics["faithfulness"] = self._get_llm_score(f"Rate answer faithfulness to context (1-5):\nContext: {context[:2000]}\nAnswer: {answer}\nScore:")
        metrics["composite_score"] = (metrics["relevance"] + metrics["faithfulness"]) / 2

        if full_metrics:
            metrics["conciseness"] = self._get_llm_score(f"Rate answer conciseness (1-5):\nAnswer: {answer}\nScore:")
            metrics["completeness"] = self._get_llm_score(f"Rate answer completeness (1-5):\nQuestion: {query}\nAnswer: {answer}\nScore:")
            if context:
                rouge_scores = self.rouge_scorer.score(answer, context[:2000])
                metrics["rouge1"] = rouge_scores['rouge1'].fmeasure
                metrics["rouge2"] = rouge_scores['rouge2'].fmeasure
                metrics["rougeL"] = rouge_scores['rougeL'].fmeasure
            metrics["overall_quality"] = (
                metrics["relevance"] +
                metrics["faithfulness"] +
                metrics["conciseness"] +
                metrics["completeness"]
            ) / 4

        print("Evaluation Scores:")
        for k, v in metrics.items():
            print(f" - {k}: {v:.2f}" if isinstance(v, float) else f" - {k}: {v}")

        return metrics

    def _get_llm_score(self, prompt: str) -> float:
        try:
            response = self.evaluator(f"<|im_start|>user\n{prompt}<|im_end|>\n<|im_start|>assistant\n", max_new_tokens=2, do_sample=False)[0]['generated_text'].strip()
            for char in response:
                if char.isdigit():
                    return float(char)
            matches = re.findall(r'\d+(\.\d+)?', response)
            if matches:
                return min(5.0, max(1.0, float(matches[0])))
            return 3.0
        except Exception as e:
            print(f"Error in evaluation: {e}")
            return 3.0

In [None]:
# 7. Main RAG Execution

# Upload PDF to /content first, e.g., diabetes textbook
file_paths = ["/content/13. Atlas of Diabetes Mellitus (3rd Edition).pdf"]

# 1. Chunk and clean
processor = DataProcessor()
chunks = processor.load_and_chunk(file_paths, chunking_strategy="paragraph")

# 2. Initialize retriever
retriever = AdvancedRetriever(chunks)
retriever._create_indices()

# 3. Retrieve documents

# test reasoning + hybrid performance
query = "What are the different types of diabetes and how are they treated?"
query2 = "What is the difference between hypoglycemia and hyperglycemia?"

# sparse retrieval testing
query3 = "What is the normal range for blood sugar levels?"
query4 = "How is diabetes diagnosed?"

# dense retrieval testing
query5 = "What complications arise from chronic hyperglycemia?"

# test sequential understanding
query6 = "What is the lifecycle of insulin from secretion to absorption?"

# test embedding quality
query7 = "What does HbA1c mean and why is it important?"

# test noisy input
query8 = "H0w 2 treet dibetes wit diet?"

# role based query
query9 = "Advise a diabetic patient on how to manage their condition while fasting during Ramadan."
query10 = "Explain to an elderly patient how exercise can help control blood sugar levels."

docs = retriever.search(query, method="hybrid", k=4)

# 4. Enhance context
enhancer = ContextEnhancer()
context, meta = enhancer.enhance_context(docs, query, methods=["reorder", "summarize", "highlight", "deduplicate"])

# 5. Generate answer
generator = AdvancedAnswerGenerator()
answer, duration = generator.generate(query, context, model_type="falcon-7b", prompt_strategy="medical_advisor")
print("\nQuestion:", query)
print("\nAnswer:", answer)

# 6. Evaluate
evaluator = AdvancedRagEvaluator()
scores = evaluator.evaluate(answer, context, query)

# 7. Optional: Save logs
import csv
with open("rag_logs.csv", "a", newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    if f.tell() == 0:
        writer.writerow(["Query", "Answer", "Relevance", "Faithfulness", "Composite", "Time"])
    writer.writerow([query, answer[:200], scores['relevance'], scores['faithfulness'], scores['composite_score'], round(duration, 2)])


Loading dense embedding model: pritamdeka/S-PubMedBert-MS-MARCO...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/123 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/4.56k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/666 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/388 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/226k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/461k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/5 [00:00<?, ?it/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

Creating HNSW index...
Creating BM25 + TF-IDF indices...
Loading summarization model...


config.json:   0%|          | 0.00/1.58k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.63G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Device set to use cpu
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Loading falcon-7b model...


config.json:   0%|          | 0.00/1.05k [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/17.7k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/4.48G [00:00<?, ?B/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/9.95G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/117 [00:00<?, ?B/s]



tokenizer_config.json:   0%|          | 0.00/1.13k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.73M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/281 [00:00<?, ?B/s]

Device set to use cpu
Setting `pad_token_id` to `eos_token_id`:11 for open-end generation.



Question: What are the different types of diabetes and how are they treated?

Answer: You are a medical assistant. Provide a clear and accurate answer.

    Context:
    *IMPORTANT:* Insulin analogs have rightly secured a firm place in the insulin market. Diabetic specialist nurses have assumed a major role in helping patients reach appropriate targets. *IMPORTANT:* For the non-specialist clinician it is advisable to become familiar with commonly pre- scribed insulins. *IMPORTANT:* Only two classes of insulin are needed in the attempt to mimic physiologic insulin secretion. *IMPORTANT:* Rapid and longer-acting insulins can be combined in the same cartridge as a fixed mixture. *IMPORTANT:* In many countries, such as the UK, insulin delivery via a pen device, so called insulin pens, has become the norm. *IMPORTANT:* The simplest regimen is to inject insulin subcuta- neously twice a day, before breakfast and before the evening meal. *IMPORTANT:* In gen- eral two-thirds of the total daily

config.json:   0%|          | 0.00/661 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.24G [00:00<?, ?B/s]

Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


generation_config.json:   0%|          | 0.00/206 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/1.29k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

Device set to use cpu


Evaluation Scores:
 - relevance: 1.00
 - faithfulness: 1.00
 - composite_score: 1.00
 - conciseness: 1.00
 - completeness: 1.00
 - rouge1: 0.67
 - rouge2: 0.67
 - rougeL: 0.67
 - overall_quality: 1.00


Complete Flow of the RAG Pipeline

1) Input: Query and Documents
User provides a natural language query, e.g., "What are the types of diabetes and their treatments?"

PDF documents (Atlas of Diabetes Mellitus) are pre-loaded.

2) Data Preprocessing
a. Text Cleaning
Removes extra whitespace, normalizes line breaks.

b. Document Chunking
PDF text is chunked using strategies like:

    1) Paragraph-based (preserves natural structure)
    2) Sentence-based, recursive, or token-based (optional)

Outcome: Clean, semantically meaningful document chunks.

3) Embeddings and Indexing
a. Dense Embeddings
    1) Uses BioMed-specific model: pritamdeka/S-PubMedBert-MS-MARCO
    2) Chunks are converted into vector representations (embeddings)

b. FAISS Indexing
    1) Dense vectors are stored in a FAISS index for fast semantic retrieval.

c. Sparse Indexing (BM25 + TF-IDF)
    1) BM25 (for exact/keyword match)
    2) TF-IDF (captures term importance)

Outcome: The system supports multiple retrieval strategies (dense, sparse, hybrid).

4) Hybrid Document Retrieval
Query is encoded using the same dense embedding model.
Hybrid Retrieval is performed:
    1) Dense similarity (semantic closeness)
    2) BM25 sparse ranking (keyword overlap)
    3) Reciprocal Rank Fusion (RRF) combines both.

top-k relevant chunks are retrieved based on hybrid scoring

5) Context Enhancement Module
Enhances the retrieved content to make it more useful and focused for the LLM.
Techniques used:
    1) Reordering -> Ranks sentences by query term frequency.
    2) Summarization -> Compresses lengthy content using facebook/bart-large-cnn.
    3) Highlighting -> Tags sentences containing query-relevant terms.
    4) Deduplication -> Removes repetitive/overlapping sentences.

Final output is a clean, concise, query-aligned context.

6) Prompt Formatting
Depending on the prompt_strategy:
    1) Generic (context + question)
    2) Role-based (e.g., "You are a medical advisor...")

This improves the model's behavior and tailors its tone/style to the user's need.

7)  Answer Generation
A chosen LLM (e.g., falcon-7b, flan-t5, phi-2, qwen) is loaded dynamically.
The prompt + enhanced context is passed into the model using transformers.pipeline.
The model generates a fluent, informed answer.

8)  Answer Evaluation
Automatically scores the generated answer:
    1) Faithfulness: Does it match the context?
    2) Relevance: Does it answer the question?
    3) Conciseness, Completeness (optional)
    4) ROUGE-1/2/L: Measures lexical overlap with context.

Scored using another LLM (like Qwen) or ROUGE for reproducible evaluation.

9) Logging and Saving
Logs the query, answer, and scores to a .csv file.
Supports future analysis, model comparisons, and reproducibility.