In [None]:
from typing import Dict, List, Optional, Tuple

import fitz
import numpy as np
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class SimpleRAG:
    """
    A simple Retrieval-Augmented Generation (RAG) system.

     Attributes:
         pdf_path (str): Path to the PDF file.
         model_name (str): Name of the Ollama model to use
         chunk_size (int): Size of text chunks
         chunk_overlap (int): Overlap between chunks
         embeddings (Embeddings): Embedding model
         llm (Ollama): Language model for generation
         vector_store (FAISS): Vector store for embeddings
    """

    def __init__(
        self,
        pdf_path: str,
        model_name: str = "llama3.2:3b",
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
    ):
        """
        Initialize the SimpleRAG system.

        Args:
            pdf_path: Path to the PDF file
            model_name: Name of the Ollama model
            chunk_size: Size of text chunks
            chunk_overlap: Overlap between chunks
        """
        self.pdf_path = pdf_path
        self.model_name = model_name
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.embedding = None
        self.llm = None
        self.vector_store = None

    def setup_environment(self) -> None:
        """
        Initialize the Ollama embeddings and LLM client.
        """
        try:
            # Initialize embedding model
            self.embeddings = OllamaEmbeddings(model=self.model_name)

            # Initialize LLM for generation
            self.llm = ChatOllama(
                base_url="http://localhost:11434", model=self.model_name
            )
            print("Environment setup complete.")

        except Exception as e:
            print(f"Error setting up environment: {e}")
            raise

    def extract_text_from_pdf(self) -> str:
        """
        Extract text from a PDF file using PyMuPDF.

        Returns:
            Extracted text as a single string

        Raises:
            FileNotFoundError: If PDF doesn't exist
            Exception: For other extraction errors
        """
        try:
            text = ""
            with fitz.open(self.pdf_path) as doc:
                for page in doc:
                    text += page.get_text()

            print(f"Successfully extracted text from {self.pdf_path}")
            return text

        except FileNotFoundError:
            print(f"Error: PDF file not found at {self.pdf_path}")

        except Exception as e:
            print(f"Error extracting text from PDF: {e}")
            raise

    def chunk_text(self, text: str) -> List[Document]:
        """
        Split text into smaller chunks with overlap.

        Args:
            text: The text to split

        Returns:
            List of Document objects containing text chunks
        """
        try:
            # Initialize text splitter
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=self.chunk_size,
                chunk_overlap=self.chunk_overlap,
                length_function=len,
            )

            # Create documents from text
            documents = text_splitter.create_documents([text])
            print(f"Split text into {len(documents)} chunks")
            return documents

        except Exception as e:
            print(f"Error chunking text: {e}")
            raise

    def create_embeddings(self, documents: List[Document]) -> None:
        """
        Create embeddings for text chunks and store them in a vector database.

        Args:
            documents: List of Document objects to embed

        Raises:
            Exception: If embedding creation fails
        """
        try:
            if not self.embeddings:
                raise ValueError(
                    "Embeddings not initialized. Call setup_environment() first."
                )

            # Create vector store from documents
            self.vector_store = FAISS.from_documents(documents, self.embeddings)
            print("Successfully created embeddings and vector store")

        except Exception as e:
            print(f"Error creating embeddings: {e}")
            raise

    def semantic_search(self, query: str, k: int = 3) -> List[Document]:
        """
        Perform semantic search on the stored embeddings.

        Args:
            query: The search query
            k: Number of results to return (default: 3)

        Returns:
            List of relevant documents

        Raises:
            Exception: If search fails
        """
        try:
            if not self.vector_store:
                raise ValueError(
                    "Vector store not initialized. Call create_embeddings() first."
                )

            # Perform similarity search
            results = self.vector_store.similarity_search(query, k=k)
            print(f"Found {len(results)} relevant chunks for query: '{query}'")
            return results
        except Exception as e:
            print(f"Error performing semantic search: {e}")
            raise

    def generate_response(self, query: str, context: List[Document]) -> str:
        """
        Generate a response using the LLM based on retrieved context.

        Args:
            query: User query
            context: Retrieved relevant documents

        Returns:
            Generated response as a string

        Raises:
            Exception: If generation fails
        """
        try:
            if not self.llm:
                raise ValueError("LLM not initialized. Call setup_environment() first.")

            # Combine context documents into a single string
            context_str = "\n\n".join([doc.page_content for doc in context])

            # Create prompt with context and query
            prompt = f"""Answer the following question based on the provided context.
                        If you don't know the answer, say "I don't know".
                        
                        Context:
                        {context_str}
                        
                        Question: {query}
                        Answer:"""

            # Generate response
            response = self.llm.invoke(prompt)
            return response.content.strip()
        except Exception as e:
            print(f"Error generating response: {e}")
            raise

    def run_query(
        self, query: str, expected_answer: Optional[str] = None
    ) -> Tuple[str, Optional[float]]:
        """
        Run a complete RAG pipeline for a query.

        Args:
            query: The user query
            expected_answer: Optional expected answer for evaluation

        Returns:
            Tuple of (generated_response, evaluation_score)
        """
        try:
            # Step 1: Perform semantic search
            relevant_docs = self.semantic_search(query)

            # Step 2: Generate response
            response = self.generate_response(query, relevant_docs)
            return response
        except Exception as e:
            print(f"Error running query: {e}")
            raise

In [None]:
pdf_path = "./dataset/health supplements/1. dietary supplements - for whom.pdf"

In [None]:
rag = SimpleRAG(pdf_path=pdf_path)
rag.setup_environment()

In [None]:
text = rag.extract_text_from_pdf()

In [None]:
chunks = rag.chunk_text(text)

In [None]:
rag.create_embeddings(chunks)

In [None]:
query = "What is the main topic of this document?"

In [None]:
response = rag.run_query(query)

In [None]:
print("\n=== Query ===")
print(query)

print("\n=== Response ===")
print(response)